From 1111399d117bffe350c8ceb678c7dc07550b09ad Mon Sep 17 00:00:00 2001 From: eeb Date: Wed, 31 Aug 2005 21:34:02 +0000 Subject: [PATCH] * Applied Andreas' tcpnal compiler optimization bugfix patch from HEAD portals (different way of constructing tcp HELLO header to avoid pointer aliasing) to lnet * Applied qswnal build fix to lustre-portals.m4 from HEAD portals to lnet * lnet version of gmnal running @ HP * fixed bad 64bit cast in acceptor.c * fixed lconf to work with newconfig modules under lnet --- lnet/autoconf/lustre-lnet.m4 | 122 ++- lnet/klnds/gmlnd/autoMakefile.am | 2 +- lnet/klnds/gmlnd/gm-reg-phys.patch | 105 +++ lnet/klnds/gmlnd/gmlnd.h | 562 +++++-------- lnet/klnds/gmlnd/gmlnd_api.c | 419 ++++------ lnet/klnds/gmlnd/gmlnd_cb.c | 397 +++------ lnet/klnds/gmlnd/gmlnd_comm.c | 1625 +++++++++--------------------------- lnet/klnds/gmlnd/gmlnd_module.c | 162 ++-- lnet/klnds/gmlnd/gmlnd_utils.c | 1523 +++++++++++++-------------------- lnet/lnet/acceptor.c | 2 +- lnet/ulnds/connection.c | 34 +- lnet/ulnds/socklnd/connection.c | 34 +- 12 files changed, 1791 insertions(+), 3196 deletions(-) create mode 100644 lnet/klnds/gmlnd/gm-reg-phys.patch diff --git a/lnet/autoconf/lustre-lnet.m4 b/lnet/autoconf/lustre-lnet.m4 index 40d5f1c..e5c3f40 100644 --- a/lnet/autoconf/lustre-lnet.m4 +++ b/lnet/autoconf/lustre-lnet.m4 @@ -107,6 +107,16 @@ if test -d $QSNET/drivers/net/qsnet ; then QSWCPPFLAGS="-I$QSNET/include/linux" fi fi + + if test x$QSNET = x$LINUX ; then + LB_LINUX_CONFIG([QSNET],[],[ + LB_LINUX_CONFIG([QSNET_MODULE],[],[ + AC_MSG_WARN([QSNET is not enabled in this kernel; not building qswnal.]) + QSWNAL="" + QSWCPPFLAGS="" + ]) + ]) + fi else AC_MSG_RESULT([no]) QSWNAL="" @@ -121,28 +131,99 @@ AC_SUBST(QSWNAL) # # check if GM support is available # -AC_DEFUN([LN_CONFIG_GM], -[LB_ARG_LIBS_INCLUDES([Myrinet],[gm]) -if test x$gm_includes != x ; then - GMCPPFLAGS="-I$gm_includes" - if test -d "$gm/drivers" ; then - GMCPPFLAGS="$GMCPPFLAGS -I$gm/drivers -I$gm/drivers/linux/gm" - fi -fi -AC_SUBST(GMCPPFLAGS) - -if test x$gm_libs != x ; then - GMLIBS="-L$gm_libs" -fi -AC_SUBST(GMLIBS) +AC_DEFUN([LN_CONFIG_GM],[ +AC_MSG_CHECKING([whether to enable GM support]) +AC_ARG_WITH([gm], + AC_HELP_STRING([--with-gm=path-to-gm-source-tree], + [build gmnal against path]), + [ + case $with_gm in + no) ENABLE_GM=0 + ;; + *) ENABLE_GM=1 + GM_SRC="$with_gm" + ;; + esac + ],[ + ENABLE_GM=0 + ]) +AC_ARG_WITH([gm-install], + AC_HELP_STRING([--with-gm-install=path-to-gm-install-tree], + [say where GM has been installed]), + [ + GM_INSTALL=$with_gm_install + ],[ + GM_INSTALL="/opt/gm" + ]) +if test $ENABLE_GM -eq 0; then + AC_MSG_RESULT([no]) +else + AC_MSG_RESULT([yes]) -ENABLE_GM=0 -if test x$gm != x ; then GMNAL="gmnal" - ENABLE_GM=1 + GMCPPFLAGS="-I$GM_SRC/include -I$GM_SRC/drivers -I$GM_SRC/drivers/linux/gm" + + if test -f $GM_INSTALL/lib/libgm.a -o \ + -f $GM_INSTALL/lib64/libgm.a; then + GMLIBS="-L$GM_INSTALL/lib -L$GM_INSTALL/lib64" + else + AC_MSG_ERROR([Cant find GM libraries under $GM_INSTALL]) + fi + + EXTRA_KCFLAGS_save="$EXTRA_KCFLAGS" + EXTRA_KCFLAGS="$GMCPPFLAGS -DGM_KERNEL $EXTRA_KCFLAGS" + + AC_MSG_CHECKING([that code using GM compiles with given path]) + LB_LINUX_TRY_COMPILE([ + #define GM_STRONG_TYPES 1 + #ifdef VERSION + #undef VERSION + #endif + #include "gm.h" + #include "gm_internal.h" + ],[ + struct gm_port *port = NULL; + gm_recv_event_t *rxevent = gm_blocking_receive_no_spin(port); + return 0; + ],[ + AC_MSG_RESULT([yes]) + ],[ + AC_MSG_RESULT([no]) + AC_MSG_ERROR([Bad --with-gm path]) + ]) + + AC_MSG_CHECKING([that GM has gm_register_memory_ex_phys()]) + LB_LINUX_TRY_COMPILE([ + #define GM_STRONG_TYPES 1 + #ifdef VERSION + #undef VERSION + #endif + #include "gm.h" + #include "gm_internal.h" + ],[ + gm_status_t gmrc; + struct gm_port *port = NULL; + gm_u64_t phys = 0; + gm_up_t pvma = 0; + + gmrc = gm_register_memory_ex_phys(port, phys, 100, pvma); + return 0; + ],[ + AC_MSG_RESULT([yes]) + ],[ + AC_MSG_RESULT([no. +Please patch the GM sources as follows... + cd $GM_SRC + patch -p0 < $PWD/lnet/knals/gmnal/gm-reg-phys.patch +...then rebuild and re-install them]) + AC_MSG_ERROR([Can't build GM without gm_register_memory_ex_phys()]) + ]) + + EXTRA_KCFLAGS="$EXTRA_KCFLAGS_save" fi +AC_SUBST(GMCPPFLAGS) +AC_SUBST(GMLIBS) AC_SUBST(GMNAL) -AC_SUBST(ENABLE_GM) ]) # @@ -228,11 +309,6 @@ AC_SUBST(OPENIBNAL) # # check for infinicon infiniband support # -# -# LN_CONFIG_IIB -# -# check for infinicon infiniband support -# AC_DEFUN([LN_CONFIG_IIB],[ AC_MSG_CHECKING([whether to enable Infinicon support]) # set default diff --git a/lnet/klnds/gmlnd/autoMakefile.am b/lnet/klnds/gmlnd/autoMakefile.am index d8b9edb..8c3b7c0 100644 --- a/lnet/klnds/gmlnd/autoMakefile.am +++ b/lnet/klnds/gmlnd/autoMakefile.am @@ -11,5 +11,5 @@ endif endif endif -MOSTLYCLEANFILES = *.o *.ko *.mod.c +MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ DIST_SOURCES = $(kgmnal-objs:%.o=%.c) gmnal.h diff --git a/lnet/klnds/gmlnd/gm-reg-phys.patch b/lnet/klnds/gmlnd/gm-reg-phys.patch new file mode 100644 index 0000000..0847a13 --- /dev/null +++ b/lnet/klnds/gmlnd/gm-reg-phys.patch @@ -0,0 +1,105 @@ +Index: libgm/gm_register.c +=================================================================== +RCS file: /repository/gm/libgm/gm_register.c,v +retrieving revision 1.9.16.3 +diff -u -r1.9.16.3 gm_register.c +--- libgm/gm_register.c 9 Aug 2005 14:37:02 -0000 1.9.16.3 ++++ libgm/gm_register.c 25 Aug 2005 21:35:58 -0000 +@@ -77,20 +77,14 @@ + + */ + +-GM_ENTRY_POINT +-gm_status_t +-gm_register_memory_ex (gm_port_t *p, void *_ptr, gm_size_t length, void *_pvma) ++static gm_status_t ++_gm_register_memory (gm_port_t *p, int is_physical, gm_u64_t ptr, gm_size_t length, gm_up_t pvma) + { + gm_status_t status; +- gm_up_t ptr; +- gm_up_t pvma; + + GM_CALLED_WITH_ARGS (("%p,%p,"GM_U64_TMPL",%p", + p, _ptr, GM_U64_ARG (length), _pvma)); + +- ptr = GM_PTR_TO_UP (_ptr); +- pvma = GM_PTR_TO_UP (_pvma); +- + #if !GM_KERNEL && !GM_CAN_REGISTER_MEMORY + GM_PARAMETER_MAY_BE_UNUSED (p); + GM_PARAMETER_MAY_BE_UNUSED (ptr); +@@ -160,7 +154,7 @@ + status = gm_add_mapping_to_page_table (ps, + ptr + offset, + pvma + offset, +- GM_INVALID_DMA_PAGE); ++ is_physical ? ptr + offset : GM_INVALID_DMA_PAGE); + if (status != GM_SUCCESS) + { + status = GM_INVALID_PARAMETER; +@@ -317,13 +311,31 @@ + + */ + ++#if GM_KERNEL && (GM_CPU_x86 || GM_CPU_x86_64 || GM_CPU_ia64) ++/* only architecture where pci bus addr == physical address can use ++ such a simple scheme */ ++GM_ENTRY_POINT gm_status_t ++gm_register_memory_ex_phys (struct gm_port *p, ++ gm_u64_t phys, gm_size_t length, ++ gm_up_t pvma) ++{ ++ return _gm_register_memory(p, 1, phys, length, (gm_size_t)pvma); ++} ++#endif ++ ++GM_ENTRY_POINT gm_status_t ++gm_register_memory_ex (gm_port_t *p, void *ptr, gm_size_t length, void *pvma) ++{ ++ return _gm_register_memory(p, 0, (gm_size_t)ptr, length, (gm_size_t)pvma); ++} ++ + GM_ENTRY_POINT gm_status_t + gm_register_memory (gm_port_t *p, void *ptr, gm_size_t length) + { + gm_status_t status; + + GM_CALLED_WITH_ARGS (("%p,%p,"GM_U64_TMPL, p, ptr, GM_U64_ARG (length))); +- status = gm_register_memory_ex (p, ptr, length, ptr); ++ status = _gm_register_memory(p, 0, (gm_size_t)ptr, length, (gm_size_t)ptr); + GM_RETURN_STATUS (status); + } + +Index: include/gm.h +=================================================================== +RCS file: /repository/gm/include/gm.h,v +retrieving revision 1.25.10.11 +diff -u -r1.25.10.11 gm.h +--- include/gm.h 14 Mar 2005 21:42:41 -0000 1.25.10.11 ++++ include/gm.h 25 Aug 2005 21:35:58 -0000 +@@ -2676,6 +2676,10 @@ + GM_ENTRY_POINT gm_status_t gm_register_memory_ex (struct gm_port *p, + void *ptr, gm_size_t length, + void *pvma); ++ ++GM_ENTRY_POINT gm_status_t gm_register_memory_ex_phys (struct gm_port *p, ++ gm_u64_t phys, gm_size_t length, ++ gm_up_t pvma); + #endif /* GM_API_VERSION >= GM_API_VERSION_2_0_6 */ + + #if GM_API_VERSION >= GM_API_VERSION_2_1_0 +Index: libgm/gm_reference_api.c +=================================================================== +RCS file: /repository/gm/libgm/gm_reference_api.c,v +retrieving revision 1.3.14.1 +diff -u -r1.3.14.1 gm_reference_api.c +--- libgm/gm_reference_api.c 23 Apr 2004 20:27:29 -0000 1.3.14.1 ++++ libgm/gm_reference_api.c 25 Aug 2005 22:39:20 -0000 +@@ -154,6 +154,7 @@ + GM_REF (gm_register_buffer); + GM_REF (gm_register_memory); + GM_REF (gm_register_memory_ex); ++GM_REF (gm_register_memory_ex_phys); + GM_REF (gm_resume_sending); + GM_REF (gm_send); + GM_REF (gm_send_to_peer); diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index d23eeaf..a4b007b 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -21,9 +21,9 @@ /* - * Portals GM kernel NAL header file - * This file makes all declaration and prototypes - * for the API side and CB side of the NAL + * Portals GM kernel NAL header file + * This file makes all declaration and prototypes + * for the API side and CB side of the NAL */ #ifndef __INCLUDE_GMNAL_H__ #define __INCLUDE_GMNAL_H__ @@ -47,7 +47,13 @@ #include "linux/string.h" #include "linux/stat.h" #include "linux/errno.h" +#include "linux/version.h" +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0) +#include "linux/buffer_head.h" +#include "linux/fs.h" +#else #include "linux/locks.h" +#endif #include "linux/unistd.h" #include "linux/init.h" #include "linux/sem.h" @@ -56,14 +62,16 @@ #define DEBUG_SUBSYSTEM S_NAL -#include "portals/nal.h" -#include "portals/api.h" -#include "portals/errno.h" #include "libcfs/kp30.h" -#include "portals/p30.h" +#include "lnet/p30.h" +#include "lnet/lib-p30.h" -#include "portals/nal.h" -#include "portals/lib-p30.h" +/* undefine these before including the GM headers which clash */ +#undef PACKAGE_BUGREPORT +#undef PACKAGE_NAME +#undef PACKAGE_STRING +#undef PACKAGE_TARNAME +#undef PACKAGE_VERSION #define GM_STRONG_TYPES 1 #ifdef VERSION @@ -72,345 +80,207 @@ #include "gm.h" #include "gm_internal.h" +/* Default Tunable Values */ +#define GMNAL_PORT 4 /* which port to use */ +#define GMNAL_NTX 32 /* # tx descs */ +#define GMNAL_NTX_NBLK 256 /* # reserved tx descs */ +#define GMNAL_NRX_SMALL 128 /* # small receives to post */ +#define GMNAL_NRX_LARGE 64 /* # large receives to post */ +#define GMNAL_NLARGE_TX_BUFS 32 /* # large tx buffers */ + +/* Fixed tunables */ +#define GMNAL_RESCHED 100 /* # busy loops to force scheduler to yield */ +#define GMNAL_NETADDR_BASE 0x10000000 /* where we start in network VM */ +#define GMNAL_LARGE_PRIORITY GM_LOW_PRIORITY /* large message GM priority */ +#define GMNAL_SMALL_PRIORITY GM_LOW_PRIORITY /* small message GM priority */ + +/* Wire protocol */ +typedef struct { + ptl_hdr_t gmim_hdr; /* portals header */ + char gmim_payload[0]; /* payload */ +} gmnal_immediate_msg_t; + +typedef struct { + /* First 2 fields fixed FOR ALL TIME */ + __u32 gmm_magic; /* I'm a GM message */ + __u16 gmm_version; /* this is my version number */ + + __u16 gmm_type; /* msg type */ + __u64 gmm_srcnid; /* sender's NID */ + __u64 gmm_dstnid; /* destination's NID */ + union { + gmnal_immediate_msg_t immediate; + } gmm_u; +} WIRE_ATTR gmnal_msg_t; + +#define GMNAL_MSG_MAGIC 0x6d797269 /* 'myri'! */ +#define GMNAL_MSG_VERSION 1 +#define GMNAL_MSG_IMMEDIATE 1 + +typedef struct netbuf { + __u64 nb_netaddr; /* network VM address */ + struct page *nb_pages[1]; /* the pages (at least 1) */ +} gmnal_netbuf_t; + +#define GMNAL_NETBUF_MSG(nb) ((gmnal_msg_t *)page_address((nb)->nb_pages[0])) +#define GMNAL_NETBUF_LOCAL_NETADDR(nb) ((void *)((unsigned long)(nb)->nb_netaddr)) + +typedef struct gmnal_txbuf { + struct list_head txb_list; /* queue on gmni_idle_ltxbs */ + struct gmnal_txbuf *txb_next; /* stash on gmni_ltxs */ + gmnal_netbuf_t txb_buf; /* space */ +} gmnal_txbuf_t; + +typedef struct gmnal_tx { + struct list_head tx_list; /* queue */ + int tx_isnblk:1; /* reserved for non-blocking? */ + int tx_credit:1; /* consumed a credit? */ + int tx_large_iskiov:1; /* large is in kiovs? */ + struct gmnal_ni *tx_gmni; /* owning NI */ + lnet_nid_t tx_nid; /* destination NID */ + int tx_gmlid; /* destination GM local ID */ + ptl_msg_t *tx_ptlmsg; /* ptlmsg to finalize on completion */ + + gmnal_netbuf_t tx_buf; /* small tx buffer */ + gmnal_txbuf_t *tx_ltxb; /* large buffer (to free on completion) */ + int tx_msgnob; /* message size (so far) */ + + int tx_large_nob; /* # bytes large buffer payload */ + int tx_large_offset; /* offset within frags */ + int tx_large_niov; /* # VM frags */ + union { + struct iovec *iov; /* mapped frags */ + lnet_kiov_t *kiov; /* page frags */ + } tx_large_frags; + struct gmnal_tx *tx_next; /* stash on gmni_txs */ +} gmnal_tx_t; + +typedef struct gmnal_rx { + struct list_head rx_list; /* enqueue on gmni_rxq for handling */ + int rx_islarge:1; /* large receive buffer? */ + unsigned int rx_recv_nob; /* bytes received */ + __u16 rx_recv_gmid; /* sender */ + __u8 rx_recv_port; /* sender's port */ + __u8 rx_recv_type; /* ?? */ + struct gmnal_rx *rx_next; /* stash on gmni_rxs */ + gmnal_netbuf_t rx_buf; /* the buffer */ +} gmnal_rx_t; + +typedef struct gmnal_ni { + ptl_ni_t *gmni_ni; /* generic NI */ + struct gm_port *gmni_port; /* GM port */ + spinlock_t gmni_gm_lock; /* serialise GM calls */ + int gmni_large_pages; /* # pages in a large message buffer */ + int gmni_large_msgsize; /* nob in large message buffers */ + int gmni_large_gmsize; /* large message GM bucket */ + int gmni_small_msgsize; /* nob in small message buffers */ + int gmni_small_gmsize; /* small message GM bucket */ + __u64 gmni_netaddr_base; /* base of mapped network VM */ + int gmni_netaddr_size; /* # bytes of mapped network VM */ + + gmnal_tx_t *gmni_txs; /* all txs */ + gmnal_rx_t *gmni_rxs; /* all rx descs */ + gmnal_txbuf_t *gmni_ltxbs; /* all large tx bufs */ + + atomic_t gmni_nthreads; /* total # threads */ + gm_alarm_t gmni_alarm; /* alarm to wake caretaker */ + int gmni_shutdown; /* tell all threads to exit */ + + struct list_head gmni_idle_txs; /* idle tx's */ + struct list_head gmni_nblk_idle_txs; /* reserved for non-blocking callers */ + wait_queue_head_t gmni_idle_tx_wait; /* block here for idle tx */ + int gmni_tx_credits; /* # transmits still possible */ + struct list_head gmni_idle_ltxbs; /* idle large tx buffers */ + struct list_head gmni_buf_txq; /* tx's waiting for buffers */ + struct list_head gmni_cred_txq; /* tx's waiting for credits */ + spinlock_t gmni_tx_lock; /* serialise */ + + struct gm_hash *gmni_rx_hash; /* buffer->rx lookup */ + struct semaphore gmni_rx_mutex; /* serialise blocking on GM */ +} gmnal_ni_t; + +typedef struct { + int *gm_port; + int *gm_ntx; + int *gm_ntx_nblk; + int *gm_nlarge_tx_bufs; + int *gm_nrx_small; + int *gm_nrx_large; + +#if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM + struct ctl_table_header *gm_sysctl; /* sysctl interface */ +#endif +} gmnal_tunables_t; -/* - * Defines for the API NAL - */ - -/* - * Small message size is configurable - * insmod can set small_msg_size - * which is used to populate nal_data.small_msg_size - */ -#define GMNAL_SMALL_MESSAGE 1078 -#define GMNAL_LARGE_MESSAGE_INIT 1079 -#define GMNAL_LARGE_MESSAGE_ACK 1080 -#define GMNAL_LARGE_MESSAGE_FINI 1081 - -extern int gmnal_small_msg_size; -extern int num_rx_threads; -extern int num_stxds; -extern int gm_port_id; -#define GMNAL_SMALL_MSG_SIZE(a) a->small_msg_size -#define GMNAL_IS_SMALL_MESSAGE(n,a,b,c) gmnal_is_small_msg(n, a, b, c) -#define GMNAL_MAGIC 0x1234abcd -/* - * The gm_port to use for gmnal - */ -#define GMNAL_GM_PORT_ID gm_port_id - - -/* - * Small Transmit Descriptor - * A structre to keep track of a small transmit operation - * This structure has a one-to-one relationship with a small - * transmit buffer (both create by gmnal_stxd_alloc). - * There are two free list of stxd. One for use by clients of the NAL - * and the other by the NAL rxthreads when doing sends. - * This helps prevent deadlock caused by stxd starvation. - */ -typedef struct _gmnal_stxd_t { - void *buffer; - int buffer_size; - gm_size_t gm_size; - int msg_size; - int gm_target_node; - int gm_priority; - int type; - struct _gmnal_data_t *nal_data; - ptl_msg_t *cookie; - int niov; - struct iovec iov[PTL_MD_MAX_IOV]; - struct _gmnal_stxd_t *next; - int rxt; - int kniov; - struct iovec *iovec_dup; -} gmnal_stxd_t; - -/* - * keeps a transmit token for large transmit (gm_get) - * and a pointer to rxd that is used as context for large receive - */ -typedef struct _gmnal_ltxd_t { - struct _gmnal_ltxd_t *next; - struct _gmnal_srxd_t *srxd; -} gmnal_ltxd_t; - - -/* - * as for gmnal_stxd_t - * a hash table in nal_data find srxds from - * the rx buffer address. hash table populated at init time - */ -typedef struct _gmnal_srxd_t { - void *buffer; - int size; - gm_size_t gmsize; - unsigned int gm_source_node; - gmnal_stxd_t *source_stxd; - int type; - int nsiov; - int nriov; - struct iovec *riov; - int ncallbacks; - spinlock_t callback_lock; - int callback_status; - ptl_msg_t *cookie; - struct _gmnal_srxd_t *next; - struct _gmnal_data_t *nal_data; -} gmnal_srxd_t; - -/* - * Header which lmgnal puts at the start of each message - * watch alignment for ia32/64 interaction - */ -typedef struct _gmnal_msghdr { - int magic; - int type; - unsigned int sender_node_id; - int niov; - gm_remote_ptr_t stxd_remote_ptr; /* 64 bits */ - } gmnal_msghdr_t; -#define GMNAL_MSGHDR_SIZE sizeof(gmnal_msghdr_t) - -/* - * the caretaker thread (ct_thread) gets receive events - * (and other events) from the myrinet device via the GM2 API. - * caretaker thread populates one work entry for each receive event, - * puts it on a Q in nal_data and wakes a receive thread to - * process the receive. - * Processing a portals receive can involve a transmit operation. - * Because of this the caretaker thread cannot process receives - * as it may get deadlocked when supply of transmit descriptors - * is exhausted (as caretaker thread is responsible for replacing - * transmit descriptors on the free list) - */ -typedef struct _gmnal_rxtwe { - void *buffer; - unsigned snode; - unsigned sport; - unsigned type; - unsigned length; - struct _gmnal_rxtwe *next; -} gmnal_rxtwe_t; - -/* - * 1 receive thread started on each CPU - */ -#define NRXTHREADS 10 /* max number of receiver threads */ - -typedef struct _gmnal_data_t { - int refcnt; - spinlock_t cb_lock; - spinlock_t stxd_lock; - struct semaphore stxd_token; - gmnal_stxd_t *stxd; - spinlock_t rxt_stxd_lock; - struct semaphore rxt_stxd_token; - gmnal_stxd_t *rxt_stxd; - spinlock_t ltxd_lock; - struct semaphore ltxd_token; - gmnal_ltxd_t *ltxd; - spinlock_t srxd_lock; - struct semaphore srxd_token; - gmnal_srxd_t *srxd; - struct gm_hash *srxd_hash; - ptl_ni_t *ni; - struct gm_port *gm_port; - unsigned int gm_local_nid; - unsigned int gm_global_nid; - spinlock_t gm_lock; - long rxthread_pid[NRXTHREADS]; - int rxthread_stop_flag; - spinlock_t rxthread_flag_lock; - long rxthread_flag; - long ctthread_pid; - int ctthread_flag; - gm_alarm_t ctthread_alarm; - int small_msg_size; - int small_msg_gmsize; - gmnal_rxtwe_t *rxtwe_head; - gmnal_rxtwe_t *rxtwe_tail; - spinlock_t rxtwe_lock; - struct semaphore rxtwe_wait; - struct ctl_table_header *sysctl; -} gmnal_data_t; - -/* - * Flags to start/stop and check status of threads - * each rxthread sets 1 bit (any bit) of the flag on startup - * and clears 1 bit when exiting - */ -#define GMNAL_THREAD_RESET 0 -#define GMNAL_THREAD_STOP 666 -#define GMNAL_CTTHREAD_STARTED 333 -#define GMNAL_RXTHREADS_STARTED ( (1<stxd_lock); -#define GMNAL_TXD_LOCK(a) spin_lock(&a->stxd_lock); -#define GMNAL_TXD_UNLOCK(a) spin_unlock(&a->stxd_lock); -#define GMNAL_TXD_TOKEN_INIT(a, n) sema_init(&a->stxd_token, n); -#define GMNAL_TXD_GETTOKEN(a) down(&a->stxd_token); -#define GMNAL_TXD_TRYGETTOKEN(a) down_trylock(&a->stxd_token) -#define GMNAL_TXD_RETURNTOKEN(a) up(&a->stxd_token); - -#define GMNAL_RXT_TXD_LOCK_INIT(a) spin_lock_init(&a->rxt_stxd_lock); -#define GMNAL_RXT_TXD_LOCK(a) spin_lock(&a->rxt_stxd_lock); -#define GMNAL_RXT_TXD_UNLOCK(a) spin_unlock(&a->rxt_stxd_lock); -#define GMNAL_RXT_TXD_TOKEN_INIT(a, n) sema_init(&a->rxt_stxd_token, n); -#define GMNAL_RXT_TXD_GETTOKEN(a) down(&a->rxt_stxd_token); -#define GMNAL_RXT_TXD_TRYGETTOKEN(a) down_trylock(&a->rxt_stxd_token) -#define GMNAL_RXT_TXD_RETURNTOKEN(a) up(&a->rxt_stxd_token); - -#define GMNAL_LTXD_LOCK_INIT(a) spin_lock_init(&a->ltxd_lock); -#define GMNAL_LTXD_LOCK(a) spin_lock(&a->ltxd_lock); -#define GMNAL_LTXD_UNLOCK(a) spin_unlock(&a->ltxd_lock); -#define GMNAL_LTXD_TOKEN_INIT(a, n) sema_init(&a->ltxd_token, n); -#define GMNAL_LTXD_GETTOKEN(a) down(&a->ltxd_token); -#define GMNAL_LTXD_TRYGETTOKEN(a) down_trylock(&a->ltxd_token) -#define GMNAL_LTXD_RETURNTOKEN(a) up(&a->ltxd_token); - -#define GMNAL_RXD_LOCK_INIT(a) spin_lock_init(&a->srxd_lock); -#define GMNAL_RXD_LOCK(a) spin_lock(&a->srxd_lock); -#define GMNAL_RXD_UNLOCK(a) spin_unlock(&a->srxd_lock); -#define GMNAL_RXD_TOKEN_INIT(a, n) sema_init(&a->srxd_token, n); -#define GMNAL_RXD_GETTOKEN(a) down(&a->srxd_token); -#define GMNAL_RXD_TRYGETTOKEN(a) down_trylock(&a->srxd_token) -#define GMNAL_RXD_RETURNTOKEN(a) up(&a->srxd_token); - -#define GMNAL_GM_LOCK_INIT(a) spin_lock_init(&a->gm_lock); -#define GMNAL_GM_LOCK(a) spin_lock(&a->gm_lock); -#define GMNAL_GM_UNLOCK(a) spin_unlock(&a->gm_lock); -#define GMNAL_CB_LOCK_INIT(a) spin_lock_init(&a->cb_lock); - - -/* - * Memory Allocator - */ - -/* - * CB NAL - */ - -int gmnal_cb_send(ptl_ni_t *, void *, ptl_msg_t *, ptl_hdr_t *, - int, lnet_nid_t, lnet_pid_t, unsigned int, struct iovec *, size_t, size_t); - -int gmnal_cb_send_pages(ptl_ni_t *, void *, ptl_msg_t *, ptl_hdr_t *, - int, lnet_nid_t, lnet_pid_t, unsigned int, lnet_kiov_t *, size_t, size_t); - -int gmnal_cb_recv(ptl_ni_t *, void *, ptl_msg_t *, - unsigned int, struct iovec *, size_t, size_t, size_t); - -int gmnal_cb_recv_pages(ptl_ni_t *, void *, ptl_msg_t *, - unsigned int, lnet_kiov_t *, size_t, size_t, size_t); - +/* gmnal_api.c */ int gmnal_init(void); - -void gmnal_fini(void); - -/* - * Small and Large Transmit and Receive Descriptor Functions - */ -int gmnal_alloc_txd(gmnal_data_t *); -void gmnal_free_txd(gmnal_data_t *); -gmnal_stxd_t* gmnal_get_stxd(gmnal_data_t *, int); -void gmnal_return_stxd(gmnal_data_t *, gmnal_stxd_t *); -gmnal_ltxd_t* gmnal_get_ltxd(gmnal_data_t *); -void gmnal_return_ltxd(gmnal_data_t *, gmnal_ltxd_t *); - -int gmnal_alloc_srxd(gmnal_data_t *); -void gmnal_free_srxd(gmnal_data_t *); -gmnal_srxd_t* gmnal_get_srxd(gmnal_data_t *, int); -void gmnal_return_srxd(gmnal_data_t *, gmnal_srxd_t *); - -/* - * general utility functions - */ -gmnal_srxd_t *gmnal_rxbuffer_to_srxd(gmnal_data_t *, void*); -void gmnal_stop_rxthread(gmnal_data_t *); -void gmnal_stop_ctthread(gmnal_data_t *); -void gmnal_drop_sends_callback(gm_port_t *, void *, gm_status_t); -void gmnal_resume_sending_callback(gm_port_t *, void *, gm_status_t); -char *gmnal_gm_error(gm_status_t); -char *gmnal_rxevent(gm_recv_event_t*); -int gmnal_is_small_msg(gmnal_data_t*, int, struct iovec*, int); -void gmnal_yield(int); -int gmnal_start_kernel_threads(gmnal_data_t *); - - -/* - * Communication functions - */ - -/* - * Receive threads - */ -int gmnal_ct_thread(void *); /* caretaker thread */ -int gmnal_rx_thread(void *); /* receive thread */ -int gmnal_pre_receive(gmnal_data_t*, gmnal_rxtwe_t*, int); -int gmnal_rx_bad(gmnal_data_t *, gmnal_rxtwe_t *, gmnal_srxd_t*); -int gmnal_rx_requeue_buffer(gmnal_data_t *, gmnal_srxd_t *); -int gmnal_add_rxtwe(gmnal_data_t *, gm_recv_t *); -gmnal_rxtwe_t * gmnal_get_rxtwe(gmnal_data_t *); -void gmnal_remove_rxtwe(gmnal_data_t *); - - -/* - * Small messages - */ -int gmnal_small_rx(ptl_ni_t *, void *, ptl_msg_t *); -int gmnal_small_tx(ptl_ni_t *, void *, ptl_msg_t *, ptl_hdr_t *, - int, lnet_nid_t, lnet_pid_t, - gmnal_stxd_t*, int); -void gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t); - - - -/* - * Large messages - */ -int gmnal_large_rx(ptl_ni_t *, void *, ptl_msg_t *, unsigned int, - struct iovec *, size_t, size_t, size_t); - -int gmnal_large_tx(ptl_ni_t *, void *, ptl_msg_t *, ptl_hdr_t *, - int, lnet_nid_t, lnet_pid_t, unsigned int, - struct iovec*, size_t, int); - -void gmnal_large_tx_callback(gm_port_t *, void *, gm_status_t); - -int gmnal_remote_get(gmnal_srxd_t *, int, struct iovec*, int, - struct iovec*); - -void gmnal_remote_get_callback(gm_port_t *, void *, gm_status_t); - -int gmnal_copyiov(int, gmnal_srxd_t *, int, struct iovec*, int, - struct iovec*); - -void gmnal_large_tx_ack(gmnal_data_t *, gmnal_srxd_t *); -void gmnal_large_tx_ack_callback(gm_port_t *, void *, gm_status_t); -void gmnal_large_tx_ack_received(gmnal_data_t *, gmnal_srxd_t *); +void gmnal_fini(void); +int gmnal_ctl(ptl_ni_t *ni, unsigned int cmd, void *arg); +int gmnal_startup(ptl_ni_t *ni); +void gmnal_shutdown(ptl_ni_t *ni); + +/* gmnal_cb.c */ +int gmnal_recv(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen); +int gmnal_recv_pages(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + unsigned int nkiov, lnet_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen); +int gmnal_send(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + ptl_hdr_t *hdr, int type, + lnet_process_id_t tgt, int routing, + unsigned int niov, struct iovec *iov, + size_t offset, size_t len); +int gmnal_send_pages(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + ptl_hdr_t *hdr, int type, + lnet_process_id_t tgt, int routing, + unsigned int nkiov, lnet_kiov_t *kiov, + size_t offset, size_t len); + +/* gmnal_util.c */ +void gmnal_free_ltxbufs(gmnal_ni_t *gmni); +int gmnal_alloc_ltxbufs(gmnal_ni_t *gmni); +void gmnal_free_txs(gmnal_ni_t *gmni); +int gmnal_alloc_txs(gmnal_ni_t *gmni); +void gmnal_free_rxs(gmnal_ni_t *gmni); +int gmnal_alloc_rxs(gmnal_ni_t *gmni); +char *gmnal_gmstatus2str(gm_status_t status); +char *gmnal_rxevent2str(gm_recv_event_t *ev); +void gmnal_yield(int delay); + +void gmnal_copy_tofrom_netbuf(int niov, struct iovec *iov, lnet_kiov_t *kiov, int offset, + int nb_pages, gmnal_netbuf_t *nb, int nb_offset, + int nob, int from_nb); + +static inline void +gmnal_copy_from_netbuf(int niov, struct iovec *iov, lnet_kiov_t *kiov, int offset, + int nb_pages, gmnal_netbuf_t *nb, int nb_offset, int nob) +{ + gmnal_copy_tofrom_netbuf(niov, iov, kiov, offset, + nb_pages, nb, nb_offset, nob, 1); +} + +static inline void +gmnal_copy_to_netbuf(int nb_pages, gmnal_netbuf_t *nb, int nb_offset, + int niov, struct iovec *iov, lnet_kiov_t *kiov, int offset, + int nob) +{ + gmnal_copy_tofrom_netbuf(niov, iov, kiov, offset, + nb_pages, nb, nb_offset, nob, 0); +} + +/* gmnal_comm.c */ +void gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx); +gmnal_tx_t *gmnal_get_tx(gmnal_ni_t *gmni, int may_block); +void gmnal_tx_done(gmnal_tx_t *tx, int rc); +void gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg, + lnet_nid_t dstnid, int type); +void gmnal_stop_threads(gmnal_ni_t *gmni); +int gmnal_start_threads(gmnal_ni_t *gmni); +void gmnal_check_txqueues_locked (gmnal_ni_t *gmni); + +/* Module Parameters */ +extern gmnal_tunables_t gmnal_tunables; #endif /*__INCLUDE_GMNAL_H__*/ diff --git a/lnet/klnds/gmlnd/gmlnd_api.c b/lnet/klnds/gmlnd/gmlnd_api.c index 2279efcb..a2621c8 100644 --- a/lnet/klnds/gmlnd/gmlnd_api.c +++ b/lnet/klnds/gmlnd/gmlnd_api.c @@ -25,316 +25,239 @@ #include "gmnal.h" +ptl_nal_t gmnal_nal = +{ + .nal_type = GMNAL, + .nal_startup = gmnal_startup, + .nal_shutdown = gmnal_shutdown, + .nal_ctl = gmnal_ctl, + .nal_send = gmnal_send, + .nal_send_pages = gmnal_send_pages, + .nal_recv = gmnal_recv, + .nal_recv_pages = gmnal_recv_pages, +}; +gmnal_ni_t *the_gmni = NULL; -gmnal_data_t *global_nal_data = NULL; -#define GLOBAL_NID_STR_LEN 16 -char global_nid_str[GLOBAL_NID_STR_LEN] = {0}; +int +gmnal_ctl(ptl_ni_t *ni, unsigned int cmd, void *arg) +{ + struct portal_ioctl_data *data = arg; + + switch (cmd) { + case IOC_PORTAL_REGISTER_MYNID: + if (data->ioc_nid == ni->ni_nid) + return 0; + + LASSERT (PTL_NIDNET(data->ioc_nid) == PTL_NIDNET(ni->ni_nid)); + + CERROR("obsolete IOC_PORTAL_REGISTER_MYNID for %s(%s)\n", + libcfs_nid2str(data->ioc_nid), + libcfs_nid2str(ni->ni_nid)); + return 0; + + default: + return (-EINVAL); + } +} -extern int gmnal_ctl(ptl_ni_t *ni, unsigned int cmd, void *arg); +int +gmnal_set_local_nid (gmnal_ni_t *gmni) +{ + ptl_ni_t *ni = gmni->gmni_ni; + __u32 local_gmid; + __u32 global_gmid; + gm_status_t gm_status; -/* - * Write the global nid /proc/sys/gmnal/globalnid - */ -#define GMNAL_SYSCTL 201 -#define GMNAL_SYSCTL_GLOBALNID 1 - -static ctl_table gmnal_sysctl_table[] = { - {GMNAL_SYSCTL_GLOBALNID, "globalnid", - global_nid_str, GLOBAL_NID_STR_LEN, - 0444, NULL, &proc_dostring}, - { 0 } -}; + /* Called before anything initialised: no need to lock */ + gm_status = gm_get_node_id(gmni->gmni_port, &local_gmid); + if (gm_status != GM_SUCCESS) + return 0; + CDEBUG(D_NET, "Local node id is [%u]\n", local_gmid); + + gm_status = gm_node_id_to_global_id(gmni->gmni_port, + local_gmid, + &global_gmid); + if (gm_status != GM_SUCCESS) + return 0; + + CDEBUG(D_NET, "Global node id is [%u]\n", global_gmid); -static ctl_table gmnalnal_top_sysctl_table[] = { - {GMNAL_SYSCTL, "gmnal", NULL, 0, 0555, gmnal_sysctl_table}, - { 0 } -}; + ni->ni_nid = PTL_MKNID(PTL_NIDNET(ni->ni_nid), global_gmid); + return 1; +} -/* - * gmnal_shutdown - * Close down this interface and free any resources associated with it - * nal_t nal our nal to shutdown - */ void gmnal_shutdown(ptl_ni_t *ni) { - gmnal_data_t *nal_data; + gmnal_ni_t *gmni = ni->ni_data; + + CDEBUG(D_TRACE, "gmnal_api_shutdown: gmni [%p]\n", gmni); - LASSERT(ni->ni_data == global_nal_data); + LASSERT (gmni == the_gmni); - nal_data = (gmnal_data_t *)ni->ni_data; - LASSERT(nal_data == global_nal_data); - CDEBUG(D_TRACE, "gmnal_shutdown: nal_data [%p]\n", nal_data); + /* stop processing messages */ + gmnal_stop_threads(gmni); - /* XXX for shutdown "under fire" we probably need to set a shutdown - * flag so when portals calls us we fail immediately and dont queue any - * more work but our threads can still call into portals OK. THEN - * shutdown our threads, THEN ptl_fini() */ + /* stop all network callbacks */ + gm_close(gmni->gmni_port); + gmni->gmni_port = NULL; - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - GMNAL_GM_LOCK(nal_data); - gm_close(nal_data->gm_port); gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - if (nal_data->sysctl) - unregister_sysctl_table (nal_data->sysctl); - /* Don't free 'nal'; it's a static struct */ - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - global_nal_data = NULL; + gmnal_free_ltxbufs(gmni); + gmnal_free_txs(gmni); + gmnal_free_rxs(gmni); - PORTAL_MODULE_UNUSE; -} + PORTAL_FREE(gmni, sizeof(*gmni)); + the_gmni = NULL; +} int gmnal_startup(ptl_ni_t *ni) { - gmnal_data_t *nal_data = NULL; - gmnal_srxd_t *srxd = NULL; - gm_status_t gm_status; - unsigned int local_nid = 0, global_nid = 0; - - CDEBUG(D_TRACE, "startup\n"); + gmnal_ni_t *gmni = NULL; + gmnal_rx_t *rx = NULL; + gm_status_t gm_status; + int rc; - LASSERT(ni->ni_nal == &gmnal_nal); - - if (global_nal_data != NULL) { - /* Already got 1 instance */ - CERROR("Can't support > 1 instance of this NAL\n"); - return -EPERM; + LASSERT (ni->ni_nal == &gmnal_nal); + + if (the_gmni != NULL) { + CERROR("Only 1 instance supported\n"); + return -EINVAL; } - if (ni->ni_interfaces[0] != NULL) { - CERROR("Explicit interface config not supported\n"); - return -EPERM; + PORTAL_ALLOC(gmni, sizeof(*gmni)); + if (gmni == NULL) { + CERROR("can't allocate gmni\n"); + return -ENOMEM; } + + ni->ni_data = gmni; + + memset(gmni, 0, sizeof(*gmni)); + gmni->gmni_ni = ni; + spin_lock_init(&gmni->gmni_tx_lock); + spin_lock_init(&gmni->gmni_gm_lock); + init_waitqueue_head(&gmni->gmni_idle_tx_wait); + INIT_LIST_HEAD(&gmni->gmni_idle_txs); + INIT_LIST_HEAD(&gmni->gmni_nblk_idle_txs); + INIT_LIST_HEAD(&gmni->gmni_idle_ltxbs); + INIT_LIST_HEAD(&gmni->gmni_buf_txq); + INIT_LIST_HEAD(&gmni->gmni_cred_txq); + sema_init(&gmni->gmni_rx_mutex, 1); - PORTAL_ALLOC(nal_data, sizeof(gmnal_data_t)); - if (!nal_data) { - CDEBUG(D_ERROR, "can't get memory\n"); - return(-ENOMEM); - } - memset(nal_data, 0, sizeof(gmnal_data_t)); /* - * set the small message buffer size + * initialise the interface, */ + CDEBUG(D_NET, "Calling gm_init\n"); + if (gm_init() != GM_SUCCESS) { + CERROR("call to gm_init failed\n"); + goto failed_0; + } - CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data); - CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size); + CDEBUG(D_NET, "Calling gm_open with port [%d], version [%d]\n", + *gmnal_tunables.gm_port, GM_API_VERSION); - /* - * String them all together - */ - ni->ni_data = nal_data; - nal_data->ni = ni; + gm_status = gm_open(&gmni->gmni_port, 0, *gmnal_tunables.gm_port, + "gmnal", GM_API_VERSION); - GMNAL_GM_LOCK_INIT(nal_data); + if (gm_status != GM_SUCCESS) { + CERROR("Can't open GM port %d: %d (%s)\n", + *gmnal_tunables.gm_port, gm_status, + gmnal_gmstatus2str(gm_status)); + goto failed_1; + } + CDEBUG(D_NET,"gm_open succeeded port[%p]\n",gmni->gmni_port); - /* - * initialise the interface, - */ - CDEBUG(D_INFO, "Calling gm_init\n"); - if (gm_init() != GM_SUCCESS) { - CDEBUG(D_ERROR, "call to gm_init failed\n"); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENETDOWN); - } + if (!gmnal_set_local_nid(gmni)) + goto failed_2; + CDEBUG(D_NET, "portals_nid is %s\n", libcfs_nid2str(ni->ni_nid)); - CDEBUG(D_NET, "Calling gm_open with port [%d], " - "name [%s], version [%d]\n", GMNAL_GM_PORT_ID, - "gmnal", GM_API_VERSION); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_open(&nal_data->gm_port, 0, GMNAL_GM_PORT_ID, "gmnal", - GM_API_VERSION); - GMNAL_GM_UNLOCK(nal_data); - - CDEBUG(D_INFO, "gm_open returned [%d]\n", gm_status); - if (gm_status == GM_SUCCESS) { - CDEBUG(D_INFO, "gm_open succeeded port[%p]\n", - nal_data->gm_port); - } else { - switch(gm_status) { - case(GM_INVALID_PARAMETER): - CDEBUG(D_ERROR, "gm_open Failure. Invalid Parameter\n"); - break; - case(GM_BUSY): - CDEBUG(D_ERROR, "gm_open Failure. GM Busy\n"); - break; - case(GM_NO_SUCH_DEVICE): - CDEBUG(D_ERROR, "gm_open Failure. No such device\n"); - break; - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - CDEBUG(D_ERROR, "gm_open Failure. Incompatile lib " - "and driver\n"); - break; - case(GM_OUT_OF_MEMORY): - CDEBUG(D_ERROR, "gm_open Failure. Out of Memory\n"); - break; - default: - CDEBUG(D_ERROR, "gm_open Failure. Unknow error " - "code [%d]\n", gm_status); - break; - } - GMNAL_GM_LOCK(nal_data); - gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENETDOWN); - } + gmni->gmni_large_msgsize = + offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[PTL_MTU]); + gmni->gmni_large_gmsize = + gm_min_size_for_length(gmni->gmni_large_msgsize); + gmni->gmni_large_pages = + (gmni->gmni_large_msgsize + PAGE_SIZE - 1)/PAGE_SIZE; + + gmni->gmni_small_msgsize = MIN(GM_MTU, PAGE_SIZE); + gmni->gmni_small_gmsize = + gm_min_size_for_length(gmni->gmni_small_msgsize); - nal_data->small_msg_size = gmnal_small_msg_size; - nal_data->small_msg_gmsize = - gm_min_size_for_length(gmnal_small_msg_size); - - if (gmnal_alloc_srxd(nal_data) != GMNAL_STATUS_OK) { - CDEBUG(D_ERROR, "Failed to allocate small rx descriptors\n"); - gmnal_free_txd(nal_data); - GMNAL_GM_LOCK(nal_data); - gm_close(nal_data->gm_port); - gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENOMEM); - } + gmni->gmni_netaddr_base = GMNAL_NETADDR_BASE; + gmni->gmni_netaddr_size = 0; + CWARN("Msg size %08x/%08x [%d/%d]\n", + gmni->gmni_large_msgsize, gmni->gmni_small_msgsize, + gmni->gmni_large_gmsize, gmni->gmni_small_gmsize); - /* - * Hang out a bunch of small receive buffers - * In fact hang them all out - */ - while((srxd = gmnal_get_srxd(nal_data, 0))) { - CDEBUG(D_NET, "giving [%p] to gm_provide_recvive_buffer\n", - srxd->buffer); - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, - srxd->buffer, srxd->gmsize, - GM_LOW_PRIORITY, 0); - GMNAL_GM_UNLOCK(nal_data); + if (gmnal_alloc_rxs(gmni) != 0) { + CERROR("Failed to allocate rx descriptors\n"); + goto failed_2; } - - /* - * Allocate pools of small tx buffers and descriptors - */ - if (gmnal_alloc_txd(nal_data) != GMNAL_STATUS_OK) { - CDEBUG(D_ERROR, "Failed to allocate small tx descriptors\n"); - GMNAL_GM_LOCK(nal_data); - gm_close(nal_data->gm_port); - gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENOMEM); + + if (gmnal_alloc_txs(gmni) != 0) { + CERROR("Failed to allocate tx descriptors\n"); + goto failed_2; } - gmnal_start_kernel_threads(nal_data); + if (gmnal_alloc_ltxbufs(gmni) != 0) { + CERROR("Failed to allocate large tx buffers\n"); + goto failed_2; + } - while (nal_data->rxthread_flag != GMNAL_RXTHREADS_STARTED) { - gmnal_yield(1); - CDEBUG(D_INFO, "Waiting for receive thread signs of life\n"); - } + rc = gmnal_start_threads(gmni); + if (rc != 0) { + CERROR("Can't start threads: %d\n", rc); + goto failed_2; + } - CDEBUG(D_INFO, "receive thread seems to have started\n"); - - - CDEBUG(D_NET, "Getting node id\n"); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_get_node_id(nal_data->gm_port, &local_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - CDEBUG(D_ERROR, "can't determine node id\n"); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - GMNAL_GM_LOCK(nal_data); - gm_close(nal_data->gm_port); - gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENETDOWN); - } + /* Start listening */ + for (rx = gmni->gmni_rxs; rx != NULL; rx = rx->rx_next) + gmnal_post_rx(gmni, rx); - nal_data->gm_local_nid = local_nid; - CDEBUG(D_INFO, "Local node id is [%u]\n", local_nid); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, local_nid, - &global_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_ERROR, "failed to obtain global id\n"); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - gmnal_free_txd(nal_data); - gmnal_free_srxd(nal_data); - GMNAL_GM_LOCK(nal_data); - gm_close(nal_data->gm_port); - gm_finalize(); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(nal_data, sizeof(gmnal_data_t)); - return(-ENETDOWN); - } - CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid); - nal_data->gm_global_nid = global_nid; - snprintf(global_nid_str, GLOBAL_NID_STR_LEN, "%u", global_nid); + the_gmni = gmni; -/* - pid = gm_getpid(); -*/ - ni->ni_nid = global_nid; + CDEBUG(D_NET, "gmnal_init finished\n"); + return 0; - CDEBUG(D_INFO, "portals_pid is [%u]\n", ni->ni_pid); - CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", ni->ni_nid); - - /* might be better to initialise this at module load rather than in - * NAL startup */ - nal_data->sysctl = NULL; - nal_data->sysctl = register_sysctl_table (gmnalnal_top_sysctl_table, 0); + failed_2: + gm_close(gmni->gmni_port); + gmni->gmni_port = NULL; - CDEBUG(D_INFO, "finished\n"); + failed_1: + gm_finalize(); - global_nal_data = nal_data; + failed_0: + /* safe to free descriptors after network has been shut down */ + gmnal_free_ltxbufs(gmni); + gmnal_free_txs(gmni); + gmnal_free_rxs(gmni); - PORTAL_MODULE_USE; - return(0); -} + PORTAL_FREE(gmni, sizeof(*gmni)); -ptl_nal_t the_gm_nal = { - .nal_type = GMNAL, - .nal_startup = gmnal_startup, - .nal_shutdown = gmnal_shutdown, - .nal_cmd = gmnal_ctl, - .nal_send = gmnal_cb_send, - .nal_send_pages = gmnal_cb_send_pages, - .nal_recv = gmnal_cb_recv, - .nal_recv_pages = gmnal_cb_recv_pages, -}; + return -EIO; +} /* * Called when module loaded */ int gmnal_init(void) { - ptl_register_nal(&the_gm_nal); - return (0); + ptl_register_nal(&gmnal_nal); + return 0; } - /* * Called when module removed */ void gmnal_fini() { - CDEBUG(D_TRACE, "gmnal_fini\n"); - - ptl_unregister_nal(&the_gm_nal); - LASSERT(global_nal_data == NULL); + ptl_unregister_nal(&gmnal_nal); } diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index 06c862b..4dfa721 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -27,296 +27,153 @@ #include "gmnal.h" -int gmnal_cb_recv(ptl_ni_t *ni, void *private, ptl_msg_t *cookie, - unsigned int niov, struct iovec *iov, size_t offset, - size_t mlen, size_t rlen) +int +gmnal_recvmsg(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + size_t offset, size_t mlen) { - void *buffer = NULL; - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = 0; - - CDEBUG(D_TRACE, "gmnal_cb_recv ni [%p], private[%p], cookie[%p], " - "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - ni, private, cookie, niov, iov, offset, mlen, rlen); - - switch(srxd->type) { - case(GMNAL_SMALL_MESSAGE): - CDEBUG(D_INFO, "gmnal_cb_recv got small message\n"); - /* HP SFS 1380: Proactively change receives to avoid a receive - * side occurrence of filling pkmap_count[]. - */ - buffer = srxd->buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); + gmnal_ni_t *gmni = ni->ni_data; + gmnal_rx_t *rx = (gmnal_rx_t*)private; + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1; + int payload_offset = offsetof(gmnal_msg_t, + gmm_u.immediate.gmim_payload[0]); + int nob = payload_offset + mlen; + + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); + LASSERT (iov == NULL || kiov == NULL); + + if (rx->rx_recv_nob < nob) { + CERROR("Short message from nid %s: got %d, need %d\n", + libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob); + return -EIO; + } - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else if (offset > 0) { - CDEBUG(D_INFO, "processing [%p] base [%p] " - "len %d, offset %d, len ["LPSZ"]\n", iov, - iov->iov_base + offset, iov->iov_len, - offset, iov->iov_len - offset); - gm_bcopy(buffer, iov->iov_base + offset, - iov->iov_len - offset); - buffer += iov->iov_len - offset; - offset = 0; - } else { - CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", - iov, iov->iov_len); - gm_bcopy(buffer, iov->iov_base, iov->iov_len); - buffer += iov->iov_len; - } - iov++; - } - status = gmnal_small_rx(ni, private, cookie); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n"); - status = gmnal_large_rx(ni, private, cookie, niov, - iov, offset, mlen, rlen); - } + gmnal_copy_from_netbuf(niov, iov, kiov, offset, + npages, &rx->rx_buf, payload_offset, + mlen); - CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status); - return(status); + ptl_finalize(ni, private, ptlmsg, 0); + return 0; } -int gmnal_cb_recv_pages(ptl_ni_t *ni, void *private, - ptl_msg_t *cookie, unsigned int kniov, - lnet_kiov_t *kiov, size_t offset, size_t mlen, - size_t rlen) +int +gmnal_recv(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = 0; - char *ptr = NULL; - void *buffer = NULL; - - - CDEBUG(D_TRACE, "gmnal_cb_recv_pages ni [%p],private[%p], " - "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - ni, private, cookie, kniov, kiov, offset, mlen, rlen); - - if (srxd->type == GMNAL_SMALL_MESSAGE) { - buffer = srxd->buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - /* - * map each page and create an iovec for it - */ - while (kniov--) { - /* HP SFS 1380: Proactively change receives to avoid a - * receive side occurrence of filling pkmap_count[]. - */ - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", - kniov, kiov); - - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - CDEBUG(D_INFO, "kniov page [%p] len [%d] " - "offset[%d]\n", kiov->kiov_page, - kiov->kiov_len, kiov->kiov_offset); - CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page); - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; - - if (offset > 0) { - CDEBUG(D_INFO, "processing [%p] base " - "[%p] len %d, offset %d, len [" - LPSZ"]\n", ptr, ptr + offset, - kiov->kiov_len, offset, - kiov->kiov_len - offset); - gm_bcopy(buffer, ptr + offset, - kiov->kiov_len - offset); - buffer += kiov->kiov_len - offset; - offset = 0; - } else { - CDEBUG(D_INFO, "processing [%p] len [" - LPSZ"]\n", ptr, kiov->kiov_len); - gm_bcopy(buffer, ptr, kiov->kiov_len); - buffer += kiov->kiov_len; - } - kunmap(kiov->kiov_page); - CDEBUG(D_INFO, "Stored in [%p]\n", ptr); - } - kiov++; - } - CDEBUG(D_INFO, "calling gmnal_small_rx\n"); - status = gmnal_small_rx(ni, private, cookie); - } - - CDEBUG(D_INFO, "gmnal_return status [%d]\n", status); - return(status); + return gmnal_recvmsg(ni, private, ptlmsg, + niov, iov, NULL, offset, mlen); } - -int gmnal_cb_send(ptl_ni_t *ni, void *private, ptl_msg_t *cookie, - ptl_hdr_t *hdr, int type, lnet_process_id_t target, - int routing, unsigned int niov, struct iovec *iov, - size_t offset, size_t len) +int +gmnal_recv_pages(ptl_ni_t *ni, void *private, + ptl_msg_t *ptlmsg, + unsigned int nkiov, lnet_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { - - gmnal_data_t *nal_data; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; - - - CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ - "] target %s\n", niov, offset, len, libcfs_id2str(target)); - nal_data = ni->ni_data; - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - LASSERT (nal_data != NULL); - - if (routing) { - CERROR ("Can't route\n"); - return -EIO; - } - - if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) { - CDEBUG(D_INFO, "This is a small message send\n"); - /* - * HP SFS 1380: With the change to gmnal_small_tx, need to get - * the stxd and do relevant setup here - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t); - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else if (offset > 0) { - CDEBUG(D_INFO, "processing iov [%p] base [%p] " - "len ["LPSZ"] to [%p]\n", - iov, iov->iov_base + offset, - iov->iov_len - offset, buffer); - gm_bcopy(iov->iov_base + offset, buffer, - iov->iov_len - offset); - buffer+= iov->iov_len - offset; - offset = 0; - } else { - CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ - "] to [%p]\n", iov, iov->iov_len,buffer); - gm_bcopy(iov->iov_base, buffer, iov->iov_len); - buffer+= iov->iov_len; - } - iov++; - } - gmnal_small_tx(ni, private, cookie, hdr, type, target.nid, target.pid, - stxd, len); - } else { - CDEBUG(D_ERROR, "Large message send is not supported\n"); - ptl_finalize(ni, private, cookie, -EIO); - return(-EIO); - gmnal_large_tx(ni, private, cookie, hdr, type, target.nid, target.pid, - niov, iov, offset, len); - } - return(0); + return gmnal_recvmsg(ni, private, ptlmsg, + nkiov, NULL, kiov, offset, mlen); } -int gmnal_cb_send_pages(ptl_ni_t *ni, void *private, - ptl_msg_t *cookie, ptl_hdr_t *hdr, int type, - lnet_process_id_t target, int routing, - unsigned int kniov, lnet_kiov_t *kiov, - size_t offset, size_t len) +int +gmnal_sendmsg(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + ptl_hdr_t *hdr, int type, lnet_process_id_t pid, + unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + size_t offset, size_t len) { - - gmnal_data_t *nal_data; - char *ptr; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; - int status = 0; - - CDEBUG(D_TRACE, "gmnal_cb_send_pages target %s niov[%d] offset[" - LPSZ"] len["LPSZ"]\n", libcfs_id2str(target), kniov, offset, len); - nal_data = ni->ni_data; - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - LASSERT (nal_data != NULL); - - if (routing) { - CERROR ("Can't route\n"); + gmnal_ni_t *gmni = ni->ni_data; + gm_status_t gmrc; + gmnal_tx_t *tx; + + LASSERT (iov == NULL || kiov == NULL); + + /* I may not block for a tx if I'm responding to an incoming message */ + tx = gmnal_get_tx(gmni, + !(type == PTL_MSG_ACK || type == PTL_MSG_REPLY)); + if (tx == NULL) { + if (!gmni->gmni_shutdown) + CERROR ("Can't get tx for msg type %d for %s\n", + type, libcfs_nid2str(pid.nid)); return -EIO; } - /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap - * more aggressively. This is the fix for a livelock situation under - * load on ia32 that occurs when there are no more available entries in - * the pkmap_count array. Just fill the buffer and let gmnal_small_tx - * put the headers in after we pass it the stxd pointer. - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t); + tx->tx_nid = pid.nid; - if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) { - CDEBUG(D_INFO, "This is a small message send\n"); + gmrc = gm_global_id_to_node_id(gmni->gmni_port, PTL_NIDADDR(pid.nid), + &tx->tx_gmlid); + if (gmrc != GM_SUCCESS) { + CERROR("Can't map Nid %s to a GM local ID: %d\n", + libcfs_nid2str(pid.nid), gmrc); + /* NB tx_ptlmsg not set => doesn't finalize */ + gmnal_tx_done(tx, -EIO); + return -EIO; + } - while(kniov--) { - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov); - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n", - kiov->kiov_page, kiov->kiov_len, - kiov->kiov_offset); + gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), + pid.nid, GMNAL_MSG_IMMEDIATE); + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr; + tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]); + + if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) { + /* whole message fits in tx_buf */ + char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]); + + if (iov != NULL) + ptl_copy_iov2buf(buffer, niov, iov, offset, len); + else + ptl_copy_kiov2buf(buffer, niov, kiov, offset, len); + + tx->tx_msgnob += len; + tx->tx_large_nob = 0; + + /* We've copied everything... */ + LASSERT(tx->tx_ptlmsg == NULL); + ptl_finalize(ni, NULL, ptlmsg, 0); + } else { + /* stash payload pts to copy later */ + tx->tx_large_nob = len; + tx->tx_large_iskiov = (kiov != NULL); + tx->tx_large_niov = niov; + if (tx->tx_large_iskiov) + tx->tx_large_frags.kiov = kiov; + else + tx->tx_large_frags.iov = iov; + + /* finalize later */ + tx->tx_ptlmsg = ptlmsg; + } + + spin_lock(&gmni->gmni_tx_lock); - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; + list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq); + gmnal_check_txqueues_locked(gmni); - if (offset > 0) { - CDEBUG(D_INFO, "processing [%p] base " - "[%p] len ["LPSZ"] to [%p]\n", - ptr, ptr + offset, - kiov->kiov_len - offset, buffer); - gm_bcopy(ptr + offset, buffer, - kiov->kiov_len - offset); - buffer+= kiov->kiov_len - offset; - offset = 0; - } else { - CDEBUG(D_INFO, "processing kmapped [%p]" - " len ["LPSZ"] to [%p]\n", - ptr, kiov->kiov_len, buffer); - gm_bcopy(ptr, buffer, kiov->kiov_len); + spin_unlock(&gmni->gmni_tx_lock); - buffer += kiov->kiov_len; - } - kunmap(kiov->kiov_page); - } - kiov++; - } - status = gmnal_small_tx(ni, private, cookie, hdr, type, target.nid, - target.pid, stxd, len); - } else { - int i = 0; - struct iovec *iovec = NULL, *iovec_dup = NULL; - lnet_kiov_t *kiov_dup = kiov; + return 0; +} - PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec)); - iovec_dup = iovec; - CDEBUG(D_ERROR, "Large message send it is not supported yet\n"); - PORTAL_FREE(iovec, kniov*sizeof(struct iovec)); - return(-EIO); - for (i=0; ikiov_page, kiov->kiov_len, - kiov->kiov_offset); +int +gmnal_send(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + ptl_hdr_t *hdr, int type, + lnet_process_id_t pid, int routing, + unsigned int niov, struct iovec *iov, + size_t offset, size_t len) +{ + return gmnal_sendmsg(ni, private, ptlmsg, + hdr, type, pid, + niov, iov, NULL, offset, len); +} - iovec->iov_base = kmap(kiov->kiov_page) - + kiov->kiov_offset; - iovec->iov_len = kiov->kiov_len; - iovec++; - kiov++; - } - gmnal_large_tx(ni, private, cookie, hdr, type, target.nid, - target.pid, kniov, iovec, offset, len); - for (i=0; ikiov_page); - kiov_dup++; - } - PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec)); - } - return(status); +int +gmnal_send_pages(ptl_ni_t *ni, void *private, ptl_msg_t *ptlmsg, + ptl_hdr_t *hdr, int type, + lnet_process_id_t pid, int routing, + unsigned int nkiov, lnet_kiov_t *kiov, + size_t offset, size_t len) +{ + return gmnal_sendmsg(ni, private, ptlmsg, + hdr, type, pid, + nkiov, NULL, kiov, offset, len); } diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 8a09d5a..48f3566 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -25,1315 +25,484 @@ #include "gmnal.h" -/* - * The caretaker thread - * This is main thread of execution for the NAL side - * This guy waits in gm_blocking_recvive and gets - * woken up when the myrinet adaptor gets an interrupt. - * Hands off receive operations to the receive thread - * This thread Looks after gm_callbacks etc inline. - */ -int -gmnal_ct_thread(void *arg) -{ - gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; - - if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); - return(-1); - } - - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); - - sprintf(current->comm, "gmnal_ct"); - - daemonize(); - - nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED; - - GMNAL_GM_LOCK(nal_data); - while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { - CDEBUG(D_NET, "waiting\n"); - rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_INFO, "time to exit\n"); - break; - } - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); - switch (GM_RECV_EVENT_TYPE(rxevent)) { - - case(GM_RECV_EVENT): - CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); - recv = (gm_recv_t*)&rxevent->recv; - GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, recv); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); - break; - case(_GM_SLEEP_EVENT): - /* - * Blocking receive above just returns - * immediatly with _GM_SLEEP_EVENT - * Don't know what this is - */ - CDEBUG(D_NET, "Sleeping in gm_unknown\n"); - GMNAL_GM_UNLOCK(nal_data); - gm_unknown(nal_data->gm_port, rxevent); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_INFO, "Awake from gm_unknown\n"); - break; - - default: - /* - * Don't know what this is - * gm_unknown will make sense of it - * Should be able to do something with - * FAST_RECV_EVENTS here. - */ - CDEBUG(D_NET, "Passing event to gm_unknown\n"); - GMNAL_GM_UNLOCK(nal_data); - gm_unknown(nal_data->gm_port, rxevent); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_INFO, "Processed unknown event\n"); - } - } - GMNAL_GM_UNLOCK(nal_data); - nal_data->ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); -} - - -/* - * process a receive event - */ -int gmnal_rx_thread(void *arg) +void +gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg, + lnet_nid_t dstnid, int type) { - gmnal_data_t *nal_data; - void *buffer; - gmnal_rxtwe_t *we = NULL; - int rank; - - if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); - return(-1); - } - - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); - - for (rank=0; rankrxthread_pid[rank] == current->pid) - break; - - sprintf(current->comm, "gmnal_rx_%d", rank); - - daemonize(); - /* - * set 1 bit for each thread started - * doesn't matter which bit - */ - spin_lock(&nal_data->rxthread_flag_lock); - if (nal_data->rxthread_flag) - nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1; - else - nal_data->rxthread_flag = 1; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); - - while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) { - CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n"); - we = gmnal_get_rxtwe(nal_data); - if (!we) { - CDEBUG(D_INFO, "Receive thread time to exit\n"); - break; - } - - buffer = we->buffer; - switch(((gmnal_msghdr_t*)buffer)->type) { - case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, we, - GMNAL_SMALL_MESSAGE); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data, we, - GMNAL_LARGE_MESSAGE_INIT); - break; - case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, we, - GMNAL_LARGE_MESSAGE_ACK); - break; - default: - CDEBUG(D_ERROR, "Unsupported message type\n"); - gmnal_rx_bad(nal_data, we, NULL); - } - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); - } - - spin_lock(&nal_data->rxthread_flag_lock); - nal_data->rxthread_flag/=2; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); + /* CAVEAT EMPTOR! this only sets the common message fields. */ + msg->gmm_magic = GMNAL_MSG_MAGIC; + msg->gmm_version = GMNAL_MSG_VERSION; + msg->gmm_type = type; + msg->gmm_srcnid = gmni->gmni_ni->ni_nid; + msg->gmm_dstnid = dstnid; } - - -/* - * Start processing a small message receive - * Get here from gmnal_receive_thread - * Hand off to ptl_parse, which calls cb_recv - * which hands back to gmnal_small_receive - * Deal with all endian stuff here. - */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) +gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx) { - gmnal_srxd_t *srxd = NULL; - void *buffer = NULL; - unsigned int snode, sport, type, length; - gmnal_msghdr_t *gmnal_msghdr; - ptl_hdr_t *portals_hdr; - int rc; - - CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", - nal_data, we, gmnal_type); - - buffer = we->buffer; - snode = we->snode; - sport = we->sport; - type = we->type; - buffer = we->buffer; - length = we->length; - - gmnal_msghdr = (gmnal_msghdr_t*)buffer; - portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); - - CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], " - "type [%d], length [%d], buffer [%p]\n", - snode, sport, type, length, buffer); - CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], " - "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, - gmnal_msghdr->magic, gmnal_msghdr->type); - CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], " - "dest_node ["LPD64"]\n", portals_hdr->src_nid, - portals_hdr->dest_nid); - - - /* - * Get a receive descriptor for this message - */ - srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer); - CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n"); - if (!srxd) { - CDEBUG(D_ERROR, "Failed to get receive descriptor\n"); - /* I think passing a NULL srxd to ptl_parse will crash - * gmnal_recv() */ - LBUG(); - ptl_parse(nal_data->ni, portals_hdr, srxd); - return(GMNAL_STATUS_FAIL); - } - - /* - * no need to bother portals with this - */ - if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) { - gmnal_large_tx_ack_received(nal_data, srxd); - return(GMNAL_STATUS_OK); - } - - srxd->nal_data = nal_data; - srxd->type = gmnal_type; - srxd->nsiov = gmnal_msghdr->niov; - srxd->gm_source_node = gmnal_msghdr->sender_node_id; - - CDEBUG(D_PORTALS, "Calling ptl_parse buffer is [%p]\n", - buffer+GMNAL_MSGHDR_SIZE); - /* - * control passes to portals, which calls nal_recv - * nal_recv is responsible for returning the buffer - * for future receive - */ - rc = ptl_parse(nal_data->ni, portals_hdr, srxd); - - if (rc != 0) { - /* I just received garbage; take appropriate action... */ - LBUG(); + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + const int hdr_size = offsetof(gmnal_msg_t, gmm_u); + int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize : + gmni->gmni_small_msgsize; + int flip; + + /* GM may not overflow our buffer */ + LASSERT (rx->rx_recv_nob <= buffnob); + + /* 6 bytes are enough to have received magic + version */ + if (rx->rx_recv_nob < 6) { + CERROR("Short message from gmid %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; } - return(GMNAL_STATUS_OK); -} - - + if (msg->gmm_magic == GMNAL_MSG_MAGIC) { + flip = 0; + } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) { + flip = 1; + } else { + CERROR("Bad magic from gmid %u: %08x\n", + rx->rx_recv_gmid, msg->gmm_magic); + return -EPROTO; + } -/* - * After a receive has been processed, - * hang out the receive buffer again. - * This implicitly returns a receive token. - */ -int -gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) -{ - CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n"); + if (msg->gmm_version != + (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) { + CERROR("Bad version from gmid %u: %d\n", + rx->rx_recv_gmid, msg->gmm_version); + return -EPROTO; + } - CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data); + if (rx->rx_recv_nob < hdr_size) { + CERROR("Short message from %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; + } - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0 ); - GMNAL_GM_UNLOCK(nal_data); + if (flip) { + /* leave magic unflipped as a clue to peer endianness */ + __swab16s(&msg->gmm_version); + __swab16s(&msg->gmm_type); + __swab64s(&msg->gmm_srcnid); + __swab64s(&msg->gmm_dstnid); + } + + if (msg->gmm_srcnid == LNET_NID_ANY) { + CERROR("Bad src nid from %u: %s\n", + rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid)); + return -EPROTO; + } - return(GMNAL_STATUS_OK); + if (msg->gmm_dstnid != gmni->gmni_ni->ni_nid) { + CERROR("Bad dst nid from %u: %s\n", + rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid)); + return -EPROTO; + } + + switch (msg->gmm_type) { + default: + CERROR("Unknown message type from %u: %x\n", + rx->rx_recv_gmid, msg->gmm_type); + return -EPROTO; + + case GMNAL_MSG_IMMEDIATE: + if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) { + CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", + rx->rx_recv_gmid, rx->rx_recv_nob, + offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])); + return -EPROTO; + } + break; + } + return 0; } - -/* - * Handle a bad message - * A bad message is one we don't expect or can't interpret - */ -int -gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) +gmnal_tx_t * +gmnal_get_tx(gmnal_ni_t *gmni, int may_block) { - CDEBUG(D_TRACE, "Can't handle message\n"); - - if (!srxd) - srxd = gmnal_rxbuffer_to_srxd(nal_data, - we->buffer); - if (srxd) { - gmnal_rx_requeue_buffer(nal_data, srxd); - } else { - CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n"); - /* - * get rid of it ? - */ - return(GMNAL_STATUS_FAIL); - } - - return(GMNAL_STATUS_OK); + gmnal_tx_t *tx = NULL; + + spin_lock(&gmni->gmni_tx_lock); + + while (!gmni->gmni_shutdown) { + + if (!list_empty(&gmni->gmni_idle_txs)) { + tx = list_entry(gmni->gmni_idle_txs.next, + gmnal_tx_t, tx_list); + break; + } + + if (!may_block) { + if (!list_empty(&gmni->gmni_nblk_idle_txs)) + tx = list_entry(gmni->gmni_nblk_idle_txs.next, + gmnal_tx_t, tx_list); + break; + } + + spin_unlock(&gmni->gmni_tx_lock); + wait_event(gmni->gmni_idle_tx_wait, + gmni->gmni_shutdown || + !list_empty(&gmni->gmni_idle_txs)); + spin_lock(&gmni->gmni_tx_lock); + } + + if (tx != NULL) { + LASSERT (tx->tx_ptlmsg == NULL); + LASSERT (tx->tx_ltxb == NULL); + LASSERT (!tx->tx_credit); + + list_del(&tx->tx_list); + } + + spin_unlock(&gmni->gmni_tx_lock); + + return tx; } - - -/* - * Process a small message receive. - * Get here from gmnal_receive_thread, gmnal_pre_receive - * ptl_parse, cb_recv - * Put data from prewired receive buffer into users buffer(s) - * Hang out the receive buffer again for another receive - * Call ptl_finalize - */ -int -gmnal_small_rx(ptl_ni_t *ni, void *private, ptl_msg_t *cookie) +void +gmnal_tx_done(gmnal_tx_t *tx, int rc) { - gmnal_srxd_t *srxd = NULL; - gmnal_data_t *nal_data = (gmnal_data_t*)ni->ni_data; - + gmnal_ni_t *gmni = tx->tx_gmni; + int wake_sched = 0; + int wake_idle = 0; + + LASSERT(tx->tx_ptlmsg == NULL); - if (!private) { - CDEBUG(D_ERROR, "gmnal_small_rx no context\n"); - ptl_finalize(ni, private, cookie, -EIO); - return(-EIO); - } + spin_lock(&gmni->gmni_tx_lock); + + if (tx->tx_ltxb != NULL) { + wake_sched = 1; + list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs); + tx->tx_ltxb = NULL; + } + + if (tx->tx_credit) { + wake_sched = 1; + gmni->gmni_tx_credits++; + tx->tx_credit = 0; + } + + if (tx->tx_isnblk) { + list_add_tail(&tx->tx_list, &gmni->gmni_nblk_idle_txs); + } else { + list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs); + wake_idle = 1; + } - srxd = (gmnal_srxd_t*)private; - - /* - * let portals know receive is complete - */ - CDEBUG(D_PORTALS, "calling ptl_finalize\n"); - ptl_finalize(ni, private, cookie, 0); - /* - * return buffer so it can be used again - */ - CDEBUG(D_NET, "calling gm_provide_receive_buffer\n"); - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - GMNAL_GM_UNLOCK(nal_data); - - return(0); -} + if (wake_sched) + gmnal_check_txqueues_locked(gmni); + spin_unlock(&gmni->gmni_tx_lock); -/* - * Start a small transmit. - * Use the given send token (and wired transmit buffer). - * Copy headers to wired buffer and initiate gm_send from the wired buffer. - * The callback function informs when the send is complete. - */ -int -gmnal_small_tx(ptl_ni_t *ni, void *private, ptl_msg_t *cookie, - ptl_hdr_t *hdr, int type, lnet_nid_t global_nid, lnet_pid_t pid, - gmnal_stxd_t *stxd, int size) -{ - gmnal_data_t *nal_data = (gmnal_data_t*)ni->ni_data; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - int tot_size = 0; - unsigned int local_nid; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_TRACE, "gmnal_small_tx ni [%p] private [%p] cookie [%p] " - "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] " - "size [%d]\n", ni, private, cookie, hdr, type, - global_nid, pid, stxd, size); - - CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n", - hdr->dest_nid, hdr->src_nid); - - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - LASSERT(nal_data != NULL); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_ERROR, "Failed to obtain local id\n"); - return(-ENETDOWN); - } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - - stxd->type = GMNAL_SMALL_MESSAGE; - stxd->cookie = cookie; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then send the message, as the data has previously been copied in - * (HP SFS 1380). - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_SMALL_MESSAGE; - msghdr->sender_node_id = nal_data->gm_global_nid; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - buffer += sizeof(gmnal_msghdr_t); - - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); - - buffer += sizeof(ptl_hdr_t); - - CDEBUG(D_INFO, "sending\n"); - tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t); - stxd->msg_size = tot_size; - - - CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, global_nid, local_nid, stxd); - - GMNAL_GM_LOCK(nal_data); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, - gmnal_small_tx_callback, (void*)stxd); - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "done\n"); - - return(0); + if (wake_idle) + wake_up(&gmni->gmni_idle_tx_wait); } - -/* - * A callback to indicate the small transmit operation is compete - * Check for erros and try to deal with them. - * Call ptl_finalise to inform the client application that the send - * is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ void -gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) +gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, + gm_status_t status) { - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - ptl_msg_t *cookie = stxd->cookie; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; - ptl_ni_t *ni = nal_data->ni; - unsigned gnid = 0; - gm_status_t gm_status = 0; - - if (!stxd) { - CDEBUG(D_TRACE, "send completion event for unknown stxd\n"); - return; - } - if (status != GM_SUCCESS) { - GMNAL_GM_LOCK(nal_data); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, - stxd->gm_target_node,&gnid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n", - gm_status); - gnid = 0; - } - CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s] to [%u]\n", - stxd, gmnal_gm_error(status), gnid); - } + gmnal_tx_t *tx = (gmnal_tx_t*)context; - switch(status) { - case(GM_SUCCESS): - break; - - - - case(GM_SEND_DROPPED): - /* - * do a resend on the dropped ones - */ - CDEBUG(D_ERROR, "send stxd [%p] was dropped " - "resending\n", context); - GMNAL_GM_LOCK(nal_data); - gm_send_to_peer_with_callback(nal_data->gm_port, - stxd->buffer, - stxd->gm_size, - stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, - gmnal_small_tx_callback, - context); - GMNAL_GM_UNLOCK(nal_data); - - return; - case(GM_TIMED_OUT): - case(GM_SEND_TIMED_OUT): - /* - * drop these ones - */ - CDEBUG(D_INFO, "calling gm_drop_sends\n"); - GMNAL_GM_LOCK(nal_data); - gm_drop_sends(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, GMNAL_GM_PORT_ID, - gmnal_drop_sends_callback, context); - GMNAL_GM_UNLOCK(nal_data); + CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", + tx, status, gmnal_gmstatus2str(status), + libcfs_nid2str(tx->tx_nid)); - return; + gmnal_tx_done(tx, -EIO); +} +void +gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) +{ + gmnal_tx_t *tx = (gmnal_tx_t*)context; + gmnal_ni_t *gmni = tx->tx_gmni; - /* - * abort on these ? - */ - case(GM_TRY_AGAIN): - case(GM_INTERRUPTED): - case(GM_FAILURE): - case(GM_INPUT_BUFFER_TOO_SMALL): - case(GM_OUTPUT_BUFFER_TOO_SMALL): - case(GM_BUSY): - case(GM_MEMORY_FAULT): - case(GM_INVALID_PARAMETER): - case(GM_OUT_OF_MEMORY): - case(GM_INVALID_COMMAND): - case(GM_PERMISSION_DENIED): - case(GM_INTERNAL_ERROR): - case(GM_UNATTACHED): - case(GM_UNSUPPORTED_DEVICE): - case(GM_SEND_REJECTED): - case(GM_SEND_TARGET_PORT_CLOSED): - case(GM_SEND_TARGET_NODE_UNREACHABLE): - case(GM_SEND_PORT_CLOSED): - case(GM_NODE_ID_NOT_YET_SET): - case(GM_STILL_SHUTTING_DOWN): - case(GM_CLONE_BUSY): - case(GM_NO_SUCH_DEVICE): - case(GM_ABORTED): - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - case(GM_UNTRANSLATED_SYSTEM_ERROR): - case(GM_ACCESS_DENIED): - case(GM_NO_DRIVER_SUPPORT): - case(GM_PTE_REF_CNT_OVERFLOW): - case(GM_NOT_SUPPORTED_IN_KERNEL): - case(GM_NOT_SUPPORTED_ON_ARCH): - case(GM_NO_MATCH): - case(GM_USER_ERROR): - case(GM_DATA_CORRUPTED): - case(GM_HARDWARE_FAULT): - case(GM_SEND_ORPHANED): - case(GM_MINOR_OVERFLOW): - case(GM_PAGE_TABLE_FULL): - case(GM_UC_ERROR): - case(GM_INVALID_PORT_NUMBER): - case(GM_DEV_NOT_FOUND): - case(GM_FIRMWARE_NOT_RUNNING): - case(GM_YP_NO_MATCH): - default: - gm_resume_sending(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, GMNAL_GM_PORT_ID, - gmnal_resume_sending_callback, context); + switch(status) { + case GM_SUCCESS: + gmnal_tx_done(tx, 0); return; - } - - /* - * TO DO - * If this is a large message init, - * we're not finished with the data yet, - * so can't call ptl_finalise. - * However, we're also holding on to a - * stxd here (to keep track of the source - * iovec only). Should use another structure - * to keep track of iovec and return stxd to - * free list earlier. - */ - if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) { - CDEBUG(D_INFO, "large transmit done\n"); + case GM_SEND_DROPPED: + CERROR("Dropped tx %p to %s\n", tx, libcfs_nid2str(tx->tx_nid)); + /* Another tx failed and called gm_drop_sends() which made this + * one complete immediately */ + gmnal_tx_done(tx, -EIO); + return; + + default: + /* Some error; NB don't complete tx yet; we need its credit for + * gm_drop_sends() */ + CERROR("tx %p error %d(%s), nid %s\n", tx, + status, gmnal_gmstatus2str(status), + libcfs_nid2str(tx->tx_nid)); + + spin_lock(&gmni->gmni_gm_lock); + gm_drop_sends(gmni->gmni_port, + tx->tx_ltxb != NULL ? + GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY, + tx->tx_gmlid, *gmnal_tunables.gm_port, + gmnal_drop_sends_callback, tx); + spin_unlock(&gmni->gmni_gm_lock); return; } - gmnal_return_stxd(nal_data, stxd); - ptl_finalize(ni, stxd, cookie, 0); - return; -} -/* - * After an error on the port - * call this to allow future sends to complete - */ -void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context, - gm_status_t status) -{ - gmnal_data_t *nal_data; - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); - gmnal_return_stxd(stxd->nal_data, stxd); - return; + /* not reached */ + LBUG(); } - -void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, - gm_status_t status) +void +gmnal_check_txqueues_locked (gmnal_ni_t *gmni) { - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = stxd->nal_data; - - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); - if (status == GM_SUCCESS) { - GMNAL_GM_LOCK(nal_data); - gm_send_to_peer_with_callback(gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, - gmnal_small_tx_callback, - context); - GMNAL_GM_UNLOCK(nal_data); - } else { - CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is " - "[%d][%s]\n", stxd, status, gmnal_gm_error(status)); - } + gmnal_tx_t *tx; + gmnal_txbuf_t *ltxb; + int gmsize; + int pri; + void *netaddr; + + tx = list_empty(&gmni->gmni_buf_txq) ? NULL : + list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list); + if (tx != NULL && + (tx->tx_large_nob == 0 || + !list_empty(&gmni->gmni_idle_ltxbs))) { - return; -} + /* consume tx */ + list_del(&tx->tx_list); + + LASSERT (tx->tx_ltxb == NULL); + if (tx->tx_large_nob != 0) { + ltxb = list_entry(gmni->gmni_idle_ltxbs.next, + gmnal_txbuf_t, txb_list); -/* - * Begine a large transmit. - * Do a gm_register of the memory pointed to by the iovec - * and send details to the receiver. The receiver does a gm_get - * to pull the data and sends and ack when finished. Upon receipt of - * this ack, deregister the memory. Only 1 send token is required here. - */ -int -gmnal_large_tx(ptl_ni_t *ni, void *private, ptl_msg_t *cookie, - ptl_hdr_t *hdr, int type, lnet_nid_t global_nid, lnet_pid_t pid, - unsigned int niov, struct iovec *iov, size_t offset, int size) -{ + /* consume large buffer */ + list_del(<xb->txb_list); - gmnal_data_t *nal_data; - gmnal_stxd_t *stxd = NULL; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - unsigned int local_nid; - int mlen = 0; /* the size of the init message data */ - struct iovec *iov_dup = NULL; - gm_status_t gm_status; - int niov_dup; - - - CDEBUG(D_TRACE, "gmnal_large_tx ni [%p] private [%p], cookie [%p] " - "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], " - "iov [%p], size [%d]\n", ni, private, cookie, hdr, type, - global_nid, pid, niov, iov, size); - - LASSERT (ni != NULL); - nal_data = (gmnal_data_t*)ni->ni_data; - - /* - * Get stxd and buffer. Put local address of data in buffer, - * send local addresses to target, - * wait for the target node to suck the data over. - * The stxd is used to ren - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - - stxd->type = GMNAL_LARGE_MESSAGE_INIT; - stxd->cookie = cookie; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the iov in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_INIT; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd; - msghdr->niov = niov ; - buffer += sizeof(gmnal_msghdr_t); - mlen = sizeof(gmnal_msghdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); - - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); - buffer += sizeof(ptl_hdr_t); - mlen += sizeof(ptl_hdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; - } - - LASSERT(offset >= 0); - /* - * Store the iovs in the stxd for we can get - * them later if we need them - */ - stxd->iov[0].iov_base = iov->iov_base + offset; - stxd->iov[0].iov_len = iov->iov_len - offset; - CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov); - if (niov > 1) - gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec)); - stxd->niov = niov; - - /* - * copy the iov to the buffer so target knows - * where to get the data from - */ - CDEBUG(D_INFO, "processing iov to [%p]\n", buffer); - gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec)); - mlen += stxd->niov*(sizeof(struct iovec)); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - /* - * register the memory so the NIC can get hold of the data - * This is a slow process. it'd be good to overlap it - * with something else. - */ - iov = stxd->iov; - iov_dup = iov; - niov_dup = niov; - while(niov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_register_memory(nal_data->gm_port, - iov->iov_base, iov->iov_len); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - while (iov_dup != iov) { - gm_deregister_memory(nal_data->gm_port, - iov_dup->iov_base, - iov_dup->iov_len); - iov_dup++; - } - GMNAL_GM_UNLOCK(nal_data); - gmnal_return_stxd(nal_data, stxd); - return(-EIO); - } - - GMNAL_GM_UNLOCK(nal_data); - iov++; - } + spin_unlock(&gmni->gmni_tx_lock); - /* - * Send the init message to the target - */ - CDEBUG(D_INFO, "sending mlen [%d]\n", mlen); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_ERROR, "Failed to obtain local id\n"); - gmnal_return_stxd(nal_data, stxd); - /* TO DO deregister memory on failure */ - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid); - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, mlen, GM_LOW_PRIORITY, - local_nid, gmnal_large_tx_callback, - (void*)stxd); - GMNAL_GM_UNLOCK(nal_data); - - CDEBUG(D_INFO, "done\n"); - - return(0); -} + /* Unlocking here allows sends to get re-ordered, + * but we want to allow other CPUs to progress... */ -/* - * Callback function indicates that send of buffer with - * large message iovec has completed (or failed). - */ -void -gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) -{ - gmnal_small_tx_callback(gm_port, context, status); + tx->tx_ltxb = ltxb; -} + /* marshall message in tx_ltxb... + * 1. Copy what was marshalled so far (in tx_buf) */ + memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf), + GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob); + /* 2. Copy the payload */ + gmnal_copy_to_netbuf( + gmni->gmni_large_pages, + <xb->txb_buf, + tx->tx_msgnob, + tx->tx_large_niov, + tx->tx_large_iskiov ? NULL : tx->tx_large_frags.iov, + tx->tx_large_iskiov ? tx->tx_large_frags.kiov : NULL, + tx->tx_large_offset, + tx->tx_large_nob); + tx->tx_msgnob += tx->tx_large_nob; -/* - * Have received a buffer that contains an iovec of the sender. - * Do a gm_register_memory of the receivers buffer and then do a get - * data from the sender. - */ -int -gmnal_large_rx(ptl_ni_t *ni, void *private, ptl_msg_t *cookie, - unsigned int nriov, struct iovec *riov, size_t offset, - size_t mlen, size_t rlen) -{ - gmnal_data_t *nal_data = ni->ni_data; - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - void *buffer = NULL; - struct iovec *riov_dup; - int nriov_dup; - gmnal_msghdr_t *msghdr = NULL; - gm_status_t gm_status; - - CDEBUG(D_TRACE, "gmnal_large_rx :: ni[%p], private[%p], " - "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n", - ni, private, cookie, nriov, riov, mlen, rlen); - - if (!srxd) { - CDEBUG(D_ERROR, "gmnal_large_rx no context\n"); - ptl_finalize(ni, private, cookie, -EIO); - return(-EIO); - } + /* We've copied everything... */ + ptl_finalize(gmni->gmni_ni, NULL, tx->tx_ptlmsg, 0); + tx->tx_ptlmsg = NULL; - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - /* - * Store the senders stxd address in the srxd for this message - * The gmnal_large_message_ack needs it to notify the sender - * the pull of data is complete - */ - srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - /* - * Register the receivers memory - * get the data, - * tell the sender that we got the data - * then tell the receiver we got the data - * TO DO - * If the iovecs match, could interleave - * gm_registers and gm_gets for each element - */ - while (offset >= riov->iov_len) { - offset -= riov->iov_len; - riov++; - nriov--; - } - LASSERT (nriov >= 0); - LASSERT (offset >= 0); - /* - * do this so the final gm_get callback can deregister the memory - */ - PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec))); - - srxd->riov[0].iov_base = riov->iov_base + offset; - srxd->riov[0].iov_len = riov->iov_len - offset; - if (nriov > 1) - gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec))); - srxd->nriov = nriov; - - riov = srxd->riov; - nriov_dup = nriov; - riov_dup = riov; - while(nriov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - riov->iov_base, riov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_register_memory(nal_data->gm_port, - riov->iov_base, riov->iov_len); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - riov->iov_base, riov->iov_len); - GMNAL_GM_LOCK(nal_data); - while (riov_dup != riov) { - gm_deregister_memory(nal_data->gm_port, - riov_dup->iov_base, - riov_dup->iov_len); - riov_dup++; - } - GMNAL_GM_LOCK(nal_data); - /* - * give back srxd and buffer. Send NACK to sender - */ - PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec))); - return(-EIO); - } - GMNAL_GM_UNLOCK(nal_data); - riov++; - } + spin_lock(&gmni->gmni_tx_lock); + } - /* - * now do gm_get to get the data - */ - srxd->cookie = cookie; - if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, - nriov_dup, riov_dup) != GMNAL_STATUS_OK) { - CDEBUG(D_ERROR, "can't get the data"); - } + LASSERT (tx->tx_ptlmsg == NULL); - CDEBUG(D_INFO, "lgmanl_large_rx done\n"); + list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq); + } - return(0); -} + if (!list_empty(&gmni->gmni_cred_txq) && + gmni->gmni_tx_credits != 0) { + tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list); -/* - * Perform a number of remote gets as part of receiving - * a large message. - * The final one to complete (i.e. the last callback to get called) - * tidies up. - * gm_get requires a send token. - */ -int -gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, - int nriov, struct iovec *riov) -{ + /* consume tx and 1 credit */ + list_del(&tx->tx_list); + gmni->gmni_tx_credits--; - int ncalls = 0; + spin_unlock(&gmni->gmni_tx_lock); - CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], " - "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov); + /* Unlocking here allows sends to get re-ordered, but we want + * to allow other CPUs to progress... */ + LASSERT(!tx->tx_credit); + tx->tx_credit = 1; - ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CDEBUG(D_ERROR, "there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls); - spin_lock_init(&srxd->callback_lock); - srxd->ncallbacks = ncalls; - srxd->callback_status = 0; - - ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CDEBUG(D_ERROR, "there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } + if (tx->tx_msgnob <= gmni->gmni_small_msgsize) { + LASSERT (tx->tx_ltxb == NULL); + netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf); + gmsize = gmni->gmni_small_gmsize; + pri = GMNAL_SMALL_PRIORITY; + } else { + LASSERT (tx->tx_ltxb != NULL); + netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf); + gmsize = gmni->gmni_large_gmsize; + pri = GMNAL_LARGE_PRIORITY; + } - return(GMNAL_STATUS_OK); + spin_lock(&gmni->gmni_gm_lock); -} + gm_send_to_peer_with_callback(gmni->gmni_port, + netaddr, gmsize, + tx->tx_msgnob, + pri, + tx->tx_gmlid, + gmnal_tx_callback, + (void*)tx); - -/* - * pull data from source node (source iovec) to a local iovec. - * The iovecs may not match which adds the complications below. - * Count the number of gm_gets that will be required so the callbacks - * can determine who is the last one. - */ -int -gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, - struct iovec *siov, int nriov, struct iovec *riov) -{ - - int ncalls = 0; - int slen = siov->iov_len, rlen = riov->iov_len; - char *sbuf = siov->iov_base, *rbuf = riov->iov_base; - unsigned long sbuf_long; - gm_remote_ptr_t remote_ptr = 0; - unsigned int source_node; - gmnal_ltxd_t *ltxd = NULL; - gmnal_data_t *nal_data = srxd->nal_data; - - CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data); - if (do_copy) { - if (!nal_data) { - CDEBUG(D_ERROR, "Bad args No nal_data\n"); - return(GMNAL_STATUS_FAIL); - } - GMNAL_GM_LOCK(nal_data); - if (gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, - &source_node) != GM_SUCCESS) { - - CDEBUG(D_ERROR, "cannot resolve global_id [%u] " - "to local node_id\n", srxd->gm_source_node); - GMNAL_GM_UNLOCK(nal_data); - return(GMNAL_STATUS_FAIL); - } - GMNAL_GM_UNLOCK(nal_data); - /* - * We need a send token to use gm_get - * getting an stxd gets us a send token. - * the stxd is used as the context to the - * callback function (so stxd can be returned). - * Set pointer in stxd to srxd so callback count in srxd - * can be decremented to find last callback to complete - */ - CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", - srxd->gm_source_node, source_node); - } - - do { - CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n", - sbuf, slen, rbuf, rlen); - if (slen > rlen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slen>rlen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - GMNAL_GM_LOCK(nal_data); - /* - * funny business to get rid - * of compiler warning - */ - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at the end of 1 iov element - */ - sbuf+=rlen; - slen-=rlen; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } else if (rlen > slen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slensrxd = srxd; - GMNAL_GM_LOCK(nal_data); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - slen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at end of siov element - */ - rbuf+=slen; - rlen-=slen; - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - } else { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "rlen=slen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - GMNAL_GM_LOCK(nal_data); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at end of siov and riov element - */ - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } - - } while (nriov); - return(ncalls); + spin_unlock(&gmni->gmni_gm_lock); + spin_lock(&gmni->gmni_tx_lock); + } } - -/* - * The callback function that is invoked after each gm_get call completes. - * Multiple callbacks may be invoked for 1 transaction, only the final - * callback has work to do. - */ void -gmnal_remote_get_callback(gm_port_t *gm_port, void *context, - gm_status_t status) +gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx) { + int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize : + gmni->gmni_small_gmsize; + int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY : + GMNAL_SMALL_PRIORITY; + void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf); + + CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer); + + spin_lock(&gmni->gmni_gm_lock); + gm_provide_receive_buffer_with_tag(gmni->gmni_port, + buffer, gmsize, pri, 0); + spin_unlock(&gmni->gmni_gm_lock); +} - gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context; - gmnal_srxd_t *srxd = ltxd->srxd; - ptl_ni_t *ni = srxd->nal_data->ni; - int lastone; - struct iovec *riov; - int nriov; - gmnal_data_t *nal_data; - - CDEBUG(D_TRACE, "called for context [%p]\n", context); - - if (status != GM_SUCCESS) { - CDEBUG(D_ERROR, "reports error [%d][%s]\n", status, - gmnal_gm_error(status)); +int +gmnal_rx_thread(void *arg) +{ + gmnal_ni_t *gmni = arg; + gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; + gmnal_rx_t *rx; + + kportal_daemonize("gmnal_rxd"); + + down(&gmni->gmni_rx_mutex); + + while (!gmni->gmni_shutdown) { + + spin_lock(&gmni->gmni_gm_lock); + rxevent = gm_blocking_receive_no_spin(gmni->gmni_port); + spin_unlock(&gmni->gmni_gm_lock); + + switch (GM_RECV_EVENT_TYPE(rxevent)) { + default: + gm_unknown(gmni->gmni_port, rxevent); + continue; + + case GM_FAST_RECV_EVENT: + case GM_FAST_PEER_RECV_EVENT: + case GM_PEER_RECV_EVENT: + case GM_FAST_HIGH_RECV_EVENT: + case GM_FAST_HIGH_PEER_RECV_EVENT: + case GM_HIGH_PEER_RECV_EVENT: + case GM_RECV_EVENT: + case GM_HIGH_RECV_EVENT: + break; + } + + recv = &rxevent->recv; + rx = gm_hash_find(gmni->gmni_rx_hash, + gm_ntohp(recv->buffer)); + LASSERT (rx != NULL); + + rx->rx_recv_nob = gm_ntoh_u32(recv->length); + rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id); + rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id); + rx->rx_recv_type = gm_ntoh_u8(recv->type); + + switch (GM_RECV_EVENT_TYPE(rxevent)) { + case GM_FAST_RECV_EVENT: + case GM_FAST_PEER_RECV_EVENT: + case GM_FAST_HIGH_RECV_EVENT: + case GM_FAST_HIGH_PEER_RECV_EVENT: + LASSERT (rx->rx_recv_nob <= PAGE_SIZE); + + memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf), + gm_ntohp(recv->message), rx->rx_recv_nob); + break; + } + + up(&gmni->gmni_rx_mutex); + + CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, + GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf), + gm_ntohp(recv->buffer), rx->rx_recv_nob); + + /* We're connectionless: simply drop packets with + * errors */ + if (gmnal_unpack_msg(gmni, rx) == 0) { + LASSERT (GMNAL_NETBUF_MSG(&rx->rx_buf)->gmm_type == GMNAL_MSG_IMMEDIATE); + (void)ptl_parse(gmni->gmni_ni, + &(GMNAL_NETBUF_MSG(&rx->rx_buf)->gmm_u.immediate.gmim_hdr), + rx); + } + + gmnal_post_rx(gmni, rx); + + down(&gmni->gmni_rx_mutex); } - spin_lock(&srxd->callback_lock); - srxd->ncallbacks--; - srxd->callback_status |= status; - lastone = srxd->ncallbacks?0:1; - spin_unlock(&srxd->callback_lock); - nal_data = srxd->nal_data; + up(&gmni->gmni_rx_mutex); - /* - * everyone returns a send token - */ - gmnal_return_ltxd(nal_data, ltxd); - - if (!lastone) { - CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd); - return; - } - - /* - * Let our client application proceed - */ - CDEBUG(D_ERROR, "final callback context[%p]\n", srxd); - ptl_finalize(ni, srxd, srxd->cookie, 0); - - /* - * send an ack to the sender to let him know we got the data - */ - gmnal_large_tx_ack(nal_data, srxd); - - /* - * Unregister the memory that was used - * This is a very slow business (slower then register) - */ - nriov = srxd->nriov; - riov = srxd->riov; - GMNAL_GM_LOCK(nal_data); - while (nriov--) { - CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base); - if (gm_deregister_memory(srxd->nal_data->gm_port, - riov->iov_base, riov->iov_len)) { - CDEBUG(D_ERROR, "failed to deregister memory [%p]\n", - riov->iov_base); - } - riov++; - } - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov); - - /* - * repost the receive buffer (return receive token) - */ - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - GMNAL_GM_UNLOCK(nal_data); - - return; + CDEBUG(D_NET, "exiting\n"); + atomic_dec(&gmni->gmni_nthreads); + return 0; } - -/* - * Called on target node. - * After pulling data from a source node - * send an ack message to indicate the large transmit is complete. - */ -void -gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) +void +gmnal_stop_threads(gmnal_ni_t *gmni) { + int count = 2; - gmnal_stxd_t *stxd; - gmnal_msghdr_t *msghdr; - void *buffer = NULL; - unsigned int local_nid; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, - srxd->gm_source_node); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, &local_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_ERROR, "Failed to obtain local id\n"); - return; + gmni->gmni_shutdown = 1; + mb(); + + /* wake rxthread owning gmni_rx_mutex with an alarm. */ + spin_lock(&gmni->gmni_gm_lock); + gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL); + spin_unlock(&gmni->gmni_gm_lock); + + while (atomic_read(&gmni->gmni_nthreads) != 0) { + count++; + if ((count & (count - 1)) == 0) + CWARN("Waiting for %d threads to stop\n", + atomic_read(&gmni->gmni_nthreads)); + gmnal_yield(1); } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd); - - stxd->nal_data = nal_data; - stxd->type = GMNAL_LARGE_MESSAGE_ACK; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the data in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - /* - * Add in the address of the original stxd from the sender node - * so it knows which thread to notify. - */ - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_ACK; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - CDEBUG(D_INFO, "sending\n"); - stxd->msg_size= sizeof(gmnal_msghdr_t); - - - CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, srxd->gm_source_node, local_nid, stxd); - GMNAL_GM_LOCK(nal_data); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, - gmnal_large_tx_ack_callback, - (void*)stxd); - - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n"); - - return; } - -/* - * A callback to indicate the small transmit operation is compete - * Check for errors and try to deal with them. - * Call ptl_finalise to inform the client application that the - * send is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ -void -gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, - gm_status_t status) +int +gmnal_start_threads(gmnal_ni_t *gmni) { - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; + int i; + int pid; - if (!stxd) { - CDEBUG(D_ERROR, "send completion event for unknown stxd\n"); - return; - } - CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n", - stxd, status); - gmnal_return_stxd(stxd->nal_data, stxd); + LASSERT (!gmni->gmni_shutdown); + LASSERT (atomic_read(&gmni->gmni_nthreads) == 0); - GMNAL_GM_UNLOCK(nal_data); - return; -} - -/* - * Indicates the large transmit operation is compete. - * Called on transmit side (means data has been pulled by receiver - * or failed). - * Call ptl_finalise to inform the client application that the send - * is complete, deregister the memory and return the stxd. - * Finally, report the rx buffer that the ack message was delivered in. - */ -void -gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) -{ - ptl_ni_t *ni = nal_data->ni; - gmnal_stxd_t *stxd = NULL; - gmnal_msghdr_t *msghdr = NULL; - void *buffer = NULL; - struct iovec *iov; - - - CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer); - - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd); - - ptl_finalize(ni, stxd, stxd->cookie, 0); - - /* - * extract the iovec from the stxd, deregister the memory. - * free the space used to store the iovec - */ - iov = stxd->iov; - while(stxd->niov--) { - CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n", - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_deregister_memory(nal_data->gm_port, iov->iov_base, - iov->iov_len); - GMNAL_GM_UNLOCK(nal_data); - iov++; - } + gm_initialize_alarm(&gmni->gmni_alarm); - /* - * return the send token - * TO DO It is bad to hold onto the send token so long? - */ - gmnal_return_stxd(nal_data, stxd); + for (i = 0; i < num_online_cpus(); i++) { + pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0); + if (pid < 0) { + CERROR("rx thread failed to start: %d\n", pid); + gmnal_stop_threads(gmni); + return pid; + } - /* - * requeue the receive buffer - */ - gmnal_rx_requeue_buffer(nal_data, srxd); - + atomic_inc(&gmni->gmni_nthreads); + } - return; + return 0; } diff --git a/lnet/klnds/gmlnd/gmlnd_module.c b/lnet/klnds/gmlnd/gmlnd_module.c index 3ee8f2c..29f1a07 100644 --- a/lnet/klnds/gmlnd/gmlnd_module.c +++ b/lnet/klnds/gmlnd/gmlnd_module.c @@ -22,76 +22,61 @@ #include "gmnal.h" -int gmnal_small_msg_size = sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t) + PTL_MTU; -/* - * -1 indicates default value. - * This is 1 thread per cpu - * See start_kernel_threads - */ -int num_rx_threads = -1; -int num_stxds = 5; -int gm_port_id = 4; - -int -gmnal_ctl(ptl_ni_t *ni, unsigned int cmd, void *arg) -{ - struct portal_ioctl_data *data = arg; - gmnal_data_t *nal_data = NULL; - char *name = NULL; - int nid = -2; - int gnid; - gm_status_t gm_status; - - - CDEBUG(D_TRACE, "gmnal_cmd [%d] ni_data [%p]\n", cmd, ni->ni_data); - nal_data = (gmnal_data_t*)ni->ni_data; - switch(cmd) { - case IOC_PORTAL_GET_GMID: - - PORTAL_ALLOC(name, data->ioc_plen1); - if (name == NULL) - return -ENOMEM; - - if (copy_from_user(name, data->ioc_pbuf1, data->ioc_plen1)) { - PORTAL_FREE(name, data->ioc_plen1); - return -EFAULT; - } - - GMNAL_GM_LOCK(nal_data); - //nid = gm_host_name_to_node_id(nal_data->gm_port, name); - gm_status = gm_host_name_to_node_id_ex (nal_data->gm_port, 0, name, &nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_host_name_to_node_id_ex(...host %s) failed[%d]\n", - name, gm_status); - PORTAL_FREE(name, data->ioc_plen1); - return -ENOENT; - } - - CDEBUG(D_INFO, "Local node %s id is [%d]\n", name, nid); - PORTAL_FREE(name, data->ioc_plen1); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, - nid, &gnid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n", - gm_status); - return -ENOENT; - } - CDEBUG(D_INFO, "Global node is is [%u][%x]\n", gnid, gnid); - - /* gnid returned to userspace in ioc_nid!!! */ - data->ioc_nid = gnid; - return 0; - - default: - CDEBUG(D_INFO, "gmnal_cmd UNKNOWN[%d]\n", cmd); - return -EINVAL; - } -} - +static int port = GMNAL_PORT; +CFS_MODULE_PARM(port, "i", int, 0444, + "GM port to use for communications"); + +static int ntx = GMNAL_NTX; +CFS_MODULE_PARM(ntx, "i", int, 0444, + "# 'normal' tx descriptors"); + +static int ntx_nblk = GMNAL_NTX_NBLK; +CFS_MODULE_PARM(ntx_nblk, "i", int, 0444, + "# 'reserved' tx descriptors"); + +static int nlarge_tx_bufs = GMNAL_NLARGE_TX_BUFS; +CFS_MODULE_PARM(nlarge_tx_bufs, "i", int, 0444, + "# large tx message buffers"); + +static int nrx_small = GMNAL_NRX_SMALL; +CFS_MODULE_PARM(nrx_small, "i", int, 0444, + "# small rx message buffers"); + +static int nrx_large = GMNAL_NRX_LARGE; +CFS_MODULE_PARM(nrx_large, "i", int, 0444, + "# large rx message buffers"); + +gmnal_tunables_t gmnal_tunables = { + .gm_port = &port, + .gm_ntx = &ntx, + .gm_ntx_nblk = &ntx_nblk, + .gm_nlarge_tx_bufs = &nlarge_tx_bufs, + .gm_nrx_small = &nrx_small, + .gm_nrx_large = &nrx_large, +}; + +#if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM +static ctl_table gmnal_ctl_table[] = { + {1, "port", &port, + sizeof (int), 0444, NULL, &proc_dointvec}, + {2, "ntx", &ntx, + sizeof (int), 0444, NULL, &proc_dointvec}, + {3, "ntx_nblk", &ntx_nblk, + sizeof (int), 0444, NULL, &proc_dointvec}, + {4, "nlarge_tx_bufs", &nlarge_tx_bufs, + sizeof (int), 0444, NULL, &proc_dointvec}, + {5, "nrx_small", &nrx_small, + sizeof (int), 0444, NULL, &proc_dointvec}, + {6, "nrx_large", &nrx_large, + sizeof (int), 0444, NULL, &proc_dointvec}, + {0} +}; + +static ctl_table gmnal_top_ctl_table[] = { + {207, "gmnal", NULL, 0, 0555, gmnal_ctl_table}, + {0} +}; +#endif static int __init gmnal_load(void) @@ -99,43 +84,40 @@ gmnal_load(void) int status; CDEBUG(D_TRACE, "This is the gmnal module initialisation routine\n"); - - CDEBUG(D_INFO, "Calling gmnal_init\n"); +#if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM + gmnal_tunables.gm_sysctl = + register_sysctl_table(gmnal_top_ctl_table, 0); + + if (gmnal_tunables.gm_sysctl == NULL) + CWARN("Can't setup /proc tunables\n"); +#endif + CDEBUG(D_NET, "Calling gmnal_init\n"); status = gmnal_init(); if (status == 0) { - CDEBUG(D_INFO, "Portals GMNAL initialised ok\n"); + CDEBUG(D_NET, "Portals GMNAL initialised ok\n"); } else { - CDEBUG(D_INFO, "Portals GMNAL Failed to initialise\n"); + CDEBUG(D_NET, "Portals GMNAL Failed to initialise\n"); return(-ENODEV); - } - CDEBUG(D_INFO, "This is the end of the gmnal init routine"); - + CDEBUG(D_NET, "This is the end of the gmnal init routine"); return(0); } - static void __exit gmnal_unload(void) { gmnal_fini(); - return; +#if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM + if (gmnal_tunables.gm_sysctl != NULL) + unregister_sysctl_table(gmnal_tunables.gm_sysctl); +#endif } - module_init(gmnal_load); - module_exit(gmnal_unload); -MODULE_PARM(gmnal_small_msg_size, "i"); -MODULE_PARM(num_rx_threads, "i"); -MODULE_PARM(num_stxds, "i"); -MODULE_PARM(gm_port_id, "i"); - -MODULE_AUTHOR("Morgan Doyle"); - -MODULE_DESCRIPTION("A Portals kernel NAL for Myrinet GM."); - +MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_DESCRIPTION("Kernel GM NAL v1.01"); MODULE_LICENSE("GPL"); diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index 508a48c..70a7a54 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -18,853 +18,654 @@ * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -/* - * All utilities required by lgmanl - */ #include "gmnal.h" -/* - * Am I one of the gmnal rxthreads ? - */ -int -gmnal_is_rxthread(gmnal_data_t *nal_data) +void +gmnal_free_netbuf_pages (gmnal_netbuf_t *nb, int npages) { - int i; - for (i=0; irxthread_pid[i] == current->pid) - return(1); - } - return(0); + int i; + + for (i = 0; i < npages; i++) + __free_page(nb->nb_pages[i]); } - -/* - * Allocate tx descriptors/tokens (large and small) - * allocate a number of small tx buffers and register with GM - * so they are wired and set up for DMA. This is a costly operation. - * Also allocate a corrosponding descriptor to keep track of - * the buffer. - * Put all small descriptors on singly linked list to be available to send - * function. - * Allocate the rest of the available tx tokens for large messages. These will be - * used to do gm_gets in gmnal_copyiov - */ int -gmnal_alloc_txd(gmnal_data_t *nal_data) +gmnal_alloc_netbuf_pages (gmnal_ni_t *gmni, gmnal_netbuf_t *nb, int npages) { - int ntx= 0, nstx= 0, nrxt_stx= 0, - nltx= 0, i = 0; - gmnal_stxd_t *txd = NULL; - gmnal_ltxd_t *ltxd = NULL; - void *txbuffer = NULL; - - CDEBUG(D_TRACE, "gmnal_alloc_small tx\n"); - - GMNAL_GM_LOCK(nal_data); - /* - * total number of transmit tokens - */ - ntx = gm_num_send_tokens(nal_data->gm_port); - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx); - - /* - * allocate a number for small sends - * num_stxds from gmnal_module.c - */ - nstx = num_stxds; - /* - * give that number plus 1 to the receive threads - */ - nrxt_stx = nstx + 1; - - /* - * give the rest for gm_gets - */ - nltx = ntx - (nrxt_stx + nstx); - if (nltx < 1) { - CDEBUG(D_ERROR, "No tokens available for large messages\n"); - return(GMNAL_STATUS_FAIL); - } - - - /* - * A semaphore is initialised with the - * number of transmit tokens available. - * To get a stxd, acquire the token semaphore. - * this decrements the available token count - * (if no tokens you block here, someone returning a - * stxd will release the semaphore and wake you) - * When token is obtained acquire the spinlock - * to manipulate the list - */ - GMNAL_TXD_TOKEN_INIT(nal_data, nstx); - GMNAL_TXD_LOCK_INIT(nal_data); - GMNAL_RXT_TXD_TOKEN_INIT(nal_data, nrxt_stx); - GMNAL_RXT_TXD_LOCK_INIT(nal_data); - GMNAL_LTXD_TOKEN_INIT(nal_data, nltx); - GMNAL_LTXD_LOCK_INIT(nal_data); - - for (i=0; i<=nstx; i++) { - PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t)); - if (!txd) { - CDEBUG(D_ERROR, "Failed to malloc txd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); - } - GMNAL_GM_LOCK(nal_data); - txbuffer = gm_dma_malloc(nal_data->gm_port, - GMNAL_SMALL_MSG_SIZE(nal_data)); - GMNAL_GM_UNLOCK(nal_data); - if (!txbuffer) { - CDEBUG(D_ERROR, "Failed to gm_dma_malloc txbuffer [%d]," - " size [%d]\n", i, - GMNAL_SMALL_MSG_SIZE(nal_data)); - PORTAL_FREE(txd, sizeof(gmnal_stxd_t)); - return(GMNAL_STATUS_FAIL); - } - txd->buffer = txbuffer; - txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data); - txd->gm_size = gm_min_size_for_length(txd->buffer_size); - txd->nal_data = (struct _gmnal_data_t*)nal_data; - txd->rxt = 0; - - txd->next = nal_data->stxd; - nal_data->stxd = txd; - CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); - } - - for (i=0; i<=nrxt_stx; i++) { - PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t)); - if (!txd) { - CDEBUG(D_ERROR, "Failed to malloc txd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); - } - GMNAL_GM_LOCK(nal_data); - txbuffer = gm_dma_malloc(nal_data->gm_port, - GMNAL_SMALL_MSG_SIZE(nal_data)); - GMNAL_GM_UNLOCK(nal_data); - if (!txbuffer) { - CDEBUG(D_ERROR, "Failed to gm_dma_malloc txbuffer [%d]," - " size [%d]\n", i, - GMNAL_SMALL_MSG_SIZE(nal_data)); - PORTAL_FREE(txd, sizeof(gmnal_stxd_t)); - return(GMNAL_STATUS_FAIL); - } - txd->buffer = txbuffer; - txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data); - txd->gm_size = gm_min_size_for_length(txd->buffer_size); - txd->nal_data = (struct _gmnal_data_t*)nal_data; - txd->rxt = 1; - - txd->next = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd; - CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); - } - - /* - * string together large tokens - */ - for (i=0; i<=nltx ; i++) { - PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t)); - ltxd->next = nal_data->ltxd; - nal_data->ltxd = ltxd; - } - return(GMNAL_STATUS_OK); + int i; + gm_status_t gmrc; + + LASSERT (npages > 0); + + for (i = 0; i < npages; i++) { + + nb->nb_pages[i] = alloc_page(GFP_KERNEL); + + if (nb->nb_pages[i] == NULL) { + CERROR("Can't allocate page\n"); + gmnal_free_netbuf_pages(nb, i); + return -ENOMEM; + } + + CDEBUG(D_NET,"[%3d] page %p, phys "LPX64", @ "LPX64"\n", + i, nb->nb_pages[i], (__u64)page_to_phys(nb->nb_pages[i]), + gmni->gmni_netaddr_base); + + gmrc = gm_register_memory_ex_phys(gmni->gmni_port, + page_to_phys(nb->nb_pages[i]), + PAGE_SIZE, + gmni->gmni_netaddr_base); + CDEBUG(D_NET,"[%3d] page %p: %d\n", i, nb->nb_pages[i], gmrc); + + if (gmrc != GM_SUCCESS) { + CERROR("Can't map page: %d(%s)\n", gmrc, + gmnal_gmstatus2str(gmrc)); + gmnal_free_netbuf_pages(nb, i+1); + return -ENOMEM; + } + + if (i == 0) + nb->nb_netaddr = gmni->gmni_netaddr_base; + + gmni->gmni_netaddr_base += PAGE_SIZE; + } + + return 0; } -/* Free the list of wired and gm_registered small tx buffers and - * the tx descriptors that go along with them. - */ void -gmnal_free_txd(gmnal_data_t *nal_data) +gmnal_free_ltxbuf (gmnal_ni_t *gmni, gmnal_txbuf_t *txb) { - gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL; - gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL; + int npages = gmni->gmni_large_pages; - CDEBUG(D_TRACE, "gmnal_free_small tx\n"); + LASSERT (gmni->gmni_port == NULL); + /* No unmapping; the port has been closed */ - while(txd) { - CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); - _txd = txd; - txd = txd->next; - GMNAL_GM_LOCK(nal_data); - gm_dma_free(nal_data->gm_port, _txd->buffer); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); - } - txd = nal_data->rxt_stxd; - while(txd) { - CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], " - "size [%d]\n", txd, txd->buffer, txd->buffer_size); - _txd = txd; - txd = txd->next; - GMNAL_GM_LOCK(nal_data); - gm_dma_free(nal_data->gm_port, _txd->buffer); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(_txd, sizeof(gmnal_stxd_t)); - } - ltxd = nal_data->ltxd; - while(txd) { - _ltxd = ltxd; - ltxd = ltxd->next; - PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t)); - } - - return; + gmnal_free_netbuf_pages(&txb->txb_buf, gmni->gmni_large_pages); + PORTAL_FREE(txb, offsetof(gmnal_txbuf_t, txb_buf.nb_pages[npages])); } - -/* - * Get a txd from the list - * This get us a wired and gm_registered small tx buffer. - * This implicitly gets us a send token also. - */ -gmnal_stxd_t * -gmnal_get_stxd(gmnal_data_t *nal_data, int block) +int +gmnal_alloc_ltxbuf (gmnal_ni_t *gmni) { + int npages = gmni->gmni_large_pages; + int sz = offsetof(gmnal_txbuf_t, txb_buf.nb_pages[npages]); + gmnal_txbuf_t *txb; + int rc; + + PORTAL_ALLOC(txb, sz); + if (txb == NULL) { + CERROR("Can't allocate large txbuffer\n"); + return -ENOMEM; + } - gmnal_stxd_t *txd = NULL; - pid_t pid = current->pid; + rc = gmnal_alloc_netbuf_pages(gmni, &txb->txb_buf, npages); + if (rc != 0) { + PORTAL_FREE(txb, sz); + return rc; + } + list_add_tail(&txb->txb_list, &gmni->gmni_idle_ltxbs); - CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", - nal_data, block, pid); + txb->txb_next = gmni->gmni_ltxbs; + gmni->gmni_ltxbs = txb; - if (gmnal_is_rxthread(nal_data)) { - CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n"); - GMNAL_RXT_TXD_GETTOKEN(nal_data); - GMNAL_RXT_TXD_LOCK(nal_data); - txd = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd->next; - GMNAL_RXT_TXD_UNLOCK(nal_data); - CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", - txd, nal_data->rxt_stxd); - txd->kniov = 0; - txd->rxt = 1; - } else { - if (block) { - CDEBUG(D_INFO, "Attempting to get token\n"); - GMNAL_TXD_GETTOKEN(nal_data); - CDEBUG(D_PORTALS, "Got token\n"); - } else { - if (GMNAL_TXD_TRYGETTOKEN(nal_data)) { - CDEBUG(D_ERROR, "can't get token\n"); - return(NULL); - } - } - GMNAL_TXD_LOCK(nal_data); - txd = nal_data->stxd; - nal_data->stxd = txd->next; - GMNAL_TXD_UNLOCK(nal_data); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd, - nal_data->stxd); - txd->kniov = 0; - } /* general txd get */ - return(txd); + return 0; } -/* - * Return a txd to the list - */ void -gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd) +gmnal_free_tx (gmnal_tx_t *tx) { - CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data, - txd, txd->rxt); + LASSERT (tx->tx_gmni->gmni_port == NULL); - /* - * this transmit descriptor is - * for the rxthread - */ - if (txd->rxt) { - GMNAL_RXT_TXD_LOCK(nal_data); - txd->next = nal_data->rxt_stxd; - nal_data->rxt_stxd = txd; - GMNAL_RXT_TXD_UNLOCK(nal_data); - GMNAL_RXT_TXD_RETURNTOKEN(nal_data); - CDEBUG(D_INFO, "Returned stxd to rxthread list\n"); - } else { - GMNAL_TXD_LOCK(nal_data); - txd->next = nal_data->stxd; - nal_data->stxd = txd; - GMNAL_TXD_UNLOCK(nal_data); - GMNAL_TXD_RETURNTOKEN(nal_data); - CDEBUG(D_INFO, "Returned stxd to general list\n"); - } - return; + gmnal_free_netbuf_pages(&tx->tx_buf, 1); + PORTAL_FREE(tx, sizeof(*tx)); } - -/* - * Get a large transmit descriptor from the free list - * This implicitly gets us a transmit token . - * always wait for one. - */ -gmnal_ltxd_t * -gmnal_get_ltxd(gmnal_data_t *nal_data) +int +gmnal_alloc_tx (gmnal_ni_t *gmni, int nblk) { + gmnal_tx_t *tx; + int rc; + + PORTAL_ALLOC(tx, sizeof(*tx)); + if (tx == NULL) { + CERROR("Failed to allocate tx\n"); + return -ENOMEM; + } + + memset(tx, 0, sizeof(*tx)); - gmnal_ltxd_t *ltxd = NULL; - - CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data); + rc = gmnal_alloc_netbuf_pages(gmni, &tx->tx_buf, 1); + if (rc != 0) { + PORTAL_FREE(tx, sizeof(*tx)); + return -ENOMEM; + } - GMNAL_LTXD_GETTOKEN(nal_data); - GMNAL_LTXD_LOCK(nal_data); - ltxd = nal_data->ltxd; - nal_data->ltxd = ltxd->next; - GMNAL_LTXD_UNLOCK(nal_data); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd); - return(ltxd); + tx->tx_gmni = gmni; + tx->tx_isnblk = nblk; + + if (tx->tx_isnblk) + list_add_tail(&tx->tx_list, &gmni->gmni_nblk_idle_txs); + else + list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs); + + tx->tx_next = gmni->gmni_txs; + gmni->gmni_txs = tx; + + return 0; } -/* - * Return an ltxd to the list - */ void -gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd) +gmnal_free_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx) { - CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd); + int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1; + + LASSERT (gmni->gmni_port == NULL); - GMNAL_LTXD_LOCK(nal_data); - ltxd->next = nal_data->ltxd; - nal_data->ltxd = ltxd; - GMNAL_LTXD_UNLOCK(nal_data); - GMNAL_LTXD_RETURNTOKEN(nal_data); - return; + gmnal_free_netbuf_pages(&rx->rx_buf, npages); + PORTAL_FREE(rx, offsetof(gmnal_rx_t, rx_buf.nb_pages[npages])); } -/* - * allocate a number of small rx buffers and register with GM - * so they are wired and set up for DMA. This is a costly operation. - * Also allocate a corrosponding descriptor to keep track of - * the buffer. - * Put all descriptors on singly linked list to be available to - * receive thread. - */ + int -gmnal_alloc_srxd(gmnal_data_t *nal_data) +gmnal_alloc_rx (gmnal_ni_t *gmni, int islarge) { - int nrx = 0, nsrx = 0, i = 0; - gmnal_srxd_t *rxd = NULL; - void *rxbuffer = NULL; - - CDEBUG(D_TRACE, "gmnal_alloc_small rx\n"); - - GMNAL_GM_LOCK(nal_data); - nrx = gm_num_receive_tokens(nal_data->gm_port); - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n", - nrx); - - nsrx = nrx/2; - nsrx = 12; - /* - * make the number of rxds twice our total - * number of stxds plus 1 - */ - nsrx = num_stxds*2 + 2; - - CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n", - nsrx); - - - GMNAL_GM_LOCK(nal_data); - nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs, - gm_hash_hash_ptr, 0, 0, nsrx, 0); - GMNAL_GM_UNLOCK(nal_data); - if (!nal_data->srxd_hash) { - CDEBUG(D_ERROR, "Failed to create hash table\n"); - return(GMNAL_STATUS_NOMEM); - } - - GMNAL_RXD_TOKEN_INIT(nal_data, nsrx); - GMNAL_RXD_LOCK_INIT(nal_data); - - for (i=0; i<=nsrx; i++) { - PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t)); - if (!rxd) { - CDEBUG(D_ERROR, "Failed to malloc rxd [%d]\n", i); - return(GMNAL_STATUS_NOMEM); - } -#if 0 - PORTAL_ALLOC(rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data)); - if (!rxbuffer) { - CDEBUG(D_ERROR, "Failed to malloc rxbuffer [%d], " - "size [%d]\n", i, - GMNAL_SMALL_MSG_SIZE(nal_data)); - PORTAL_FREE(rxd, sizeof(gmnal_srxd_t)); - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_NET, "Calling gm_register_memory with port [%p] " - "rxbuffer [%p], size [%d]\n", nal_data->gm_port, - rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data)); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_register_memory(nal_data->gm_port, rxbuffer, - GMNAL_SMALL_MSG_SIZE(nal_data)); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_ERROR, "gm_register_memory failed buffer [%p]," - " index [%d]\n", rxbuffer, i); - switch(gm_status) { - case(GM_FAILURE): - CDEBUG(D_ERROR, "GM_FAILURE\n"); - break; - case(GM_PERMISSION_DENIED): - CDEBUG(D_ERROR, "PERMISSION_DENIED\n"); - break; - case(GM_INVALID_PARAMETER): - CDEBUG(D_ERROR, "INVALID_PARAMETER\n"); - break; - default: - CDEBUG(D_ERROR, "Unknown error[%d]\n", - gm_status); - break; - - } - return(GMNAL_STATUS_FAIL); - } -#else - GMNAL_GM_LOCK(nal_data); - rxbuffer = gm_dma_malloc(nal_data->gm_port, - GMNAL_SMALL_MSG_SIZE(nal_data)); - GMNAL_GM_UNLOCK(nal_data); - if (!rxbuffer) { - CDEBUG(D_ERROR, "Failed to gm_dma_malloc rxbuffer [%d]," - " size [%d]\n", i, - GMNAL_SMALL_MSG_SIZE(nal_data)); - PORTAL_FREE(rxd, sizeof(gmnal_srxd_t)); - return(GMNAL_STATUS_FAIL); - } -#endif - - rxd->buffer = rxbuffer; - rxd->size = GMNAL_SMALL_MSG_SIZE(nal_data); - rxd->gmsize = gm_min_size_for_length(rxd->size); - - if (gm_hash_insert(nal_data->srxd_hash, - (void*)rxbuffer, (void*)rxd)) { - - CDEBUG(D_ERROR, "failed to create hash entry rxd[%p] " - "for rxbuffer[%p]\n", rxd, rxbuffer); - return(GMNAL_STATUS_FAIL); - } - - rxd->next = nal_data->srxd; - nal_data->srxd = rxd; - CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], " - "size [%d]\n", rxd, rxd->buffer, rxd->size); - } + int npages = islarge ? gmni->gmni_large_pages : 1; + int sz = offsetof(gmnal_rx_t, rx_buf.nb_pages[npages]); + int rc; + gmnal_rx_t *rx; + gm_status_t gmrc; + + PORTAL_ALLOC(rx, sz); + if (rx == NULL) { + CERROR("Failed to allocate rx\n"); + return -ENOMEM; + } + + memset(rx, 0, sizeof(*rx)); - return(GMNAL_STATUS_OK); + rc = gmnal_alloc_netbuf_pages(gmni, &rx->rx_buf, npages); + if (rc != 0) { + PORTAL_FREE(rx, sz); + return rc; + } + + rx->rx_islarge = islarge; + rx->rx_next = gmni->gmni_rxs; + gmni->gmni_rxs = rx; + + gmrc = gm_hash_insert(gmni->gmni_rx_hash, + GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf), rx); + if (gmrc != GM_SUCCESS) { + CERROR("Couldn't add rx to hash table: %d\n", gmrc); + return -ENOMEM; + } + + return 0; } - - -/* Free the list of wired and gm_registered small rx buffers and the - * rx descriptors that go along with them. - */ void -gmnal_free_srxd(gmnal_data_t *nal_data) +gmnal_copy_tofrom_netbuf(int niov, struct iovec *iov, lnet_kiov_t *kiov, int offset, + int nb_pages, gmnal_netbuf_t *nb, int nb_offset, + int nob, int from_nb) { - gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL; - - CDEBUG(D_TRACE, "gmnal_free_small rx\n"); - - while(rxd) { - CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n", - rxd, rxd->buffer, rxd->size); - _rxd = rxd; - rxd = rxd->next; - -#if 0 - GMNAL_GM_LOCK(nal_data); - gm_deregister_memory(nal_data->gm_port, _rxd->buffer, - _rxd->size); - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE); -#else - GMNAL_GM_LOCK(nal_data); - gm_dma_free(nal_data->gm_port, _rxd->buffer); - GMNAL_GM_UNLOCK(nal_data); -#endif - PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t)); - } - return; + int nb_page; + int nb_nob; + char *nb_ptr; + int iov_nob; + char *iov_ptr; + + if (nob == 0) + return; + + LASSERT (nob > 0); + LASSERT (niov > 0); + LASSERT ((iov == NULL) != (kiov == NULL)); + + /* skip 'offset' bytes */ + if (kiov != NULL) { + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = ((char *)kmap(kiov->kiov_page)) + + kiov->kiov_offset + offset; + iov_nob = kiov->kiov_len - offset; + } else { + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = iov->iov_base + offset; + iov_nob = iov->iov_len - offset; + } + + LASSERT (nb_pages > 0); + LASSERT (nb_offset < PAGE_SIZE); + + nb_page = 0; + nb_ptr = page_address(nb->nb_pages[0]) + nb_offset; + nb_nob = PAGE_SIZE - nb_offset; + + for (;;) { + int this_nob = nob; + + if (this_nob > nb_nob) + this_nob = nb_nob; + if (this_nob > iov_nob) + this_nob = iov_nob; + + if (from_nb) + memcpy(iov_ptr, nb_ptr, this_nob); + else + memcpy(nb_ptr, iov_ptr, this_nob); + + nob -= this_nob; + if (nob == 0) + break; + + nb_nob -= this_nob; + if (nb_nob != 0) { + nb_ptr += this_nob; + } else { + nb_page++; + LASSERT (nb_page < nb_pages); + nb_ptr = page_address(nb->nb_pages[nb_page]); + nb_nob = PAGE_SIZE; + } + + iov_nob -= this_nob; + if (iov_nob != 0) { + iov_ptr += this_nob; + } else if (kiov != NULL) { + kunmap(kiov->kiov_page); + kiov++; + niov--; + LASSERT (niov > 0); + iov_ptr = ((char *)kmap(kiov->kiov_page)) + + kiov->kiov_offset; + iov_nob = kiov->kiov_len; + } else { + iov++; + niov--; + LASSERT (niov > 0); + iov_ptr = iov->iov_base; + iov_nob = iov->iov_len; + } + } + + if (kiov != NULL) + kunmap(kiov->kiov_page); } - -/* - * Get a rxd from the free list - * This get us a wired and gm_registered small rx buffer. - * This implicitly gets us a receive token also. - */ -gmnal_srxd_t * -gmnal_get_srxd(gmnal_data_t *nal_data, int block) +void +gmnal_free_ltxbufs (gmnal_ni_t *gmni) { + gmnal_txbuf_t *txb; + + while ((txb = gmni->gmni_ltxbs) != NULL) { + gmni->gmni_ltxbs = txb->txb_next; + gmnal_free_ltxbuf(gmni, txb); + } +} - gmnal_srxd_t *rxd = NULL; - CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block); +int +gmnal_alloc_ltxbufs (gmnal_ni_t *gmni) +{ + int nlarge_tx_bufs = *gmnal_tunables.gm_nlarge_tx_bufs; + int i; + int rc; + + for (i = 0; i < nlarge_tx_bufs; i++) { + rc = gmnal_alloc_ltxbuf(gmni); + + if (rc != 0) + return rc; + } - if (block) { - GMNAL_RXD_GETTOKEN(nal_data); - } else { - if (GMNAL_RXD_TRYGETTOKEN(nal_data)) { - CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n"); - return(NULL); - } - } - GMNAL_RXD_LOCK(nal_data); - rxd = nal_data->srxd; - if (rxd) - nal_data->srxd = rxd->next; - GMNAL_RXD_UNLOCK(nal_data); - CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd); - return(rxd); + return 0; } -/* - * Return an rxd to the list - */ void -gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd) +gmnal_free_txs(gmnal_ni_t *gmni) { - CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd); + gmnal_tx_t *tx; - GMNAL_RXD_LOCK(nal_data); - rxd->next = nal_data->srxd; - nal_data->srxd = rxd; - GMNAL_RXD_UNLOCK(nal_data); - GMNAL_RXD_RETURNTOKEN(nal_data); - return; + while ((tx = gmni->gmni_txs) != NULL) { + gmni->gmni_txs = tx->tx_next; + gmnal_free_tx (tx); + } } -/* - * Given a pointer to a srxd find - * the relevant descriptor for it - * This is done by searching a hash - * list that is created when the srxd's - * are created - */ -gmnal_srxd_t * -gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer) +int +gmnal_alloc_txs(gmnal_ni_t *gmni) { - gmnal_srxd_t *srxd = NULL; - CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer); - srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer); - CDEBUG(D_INFO, "srxd is [%p]\n", srxd); - return(srxd); -} + int ntxcred = gm_num_send_tokens(gmni->gmni_port); + int ntx = *gmnal_tunables.gm_ntx; + int ntx_nblk = *gmnal_tunables.gm_ntx_nblk; + int i; + int rc; + + CWARN("ntxcred: %d\n", ntxcred); + gmni->gmni_tx_credits = ntxcred; + + for (i = 0; i < ntx_nblk + ntx; i++) { + rc = gmnal_alloc_tx(gmni, i < ntx_nblk); + if (rc != 0) + return rc; + } + return 0; +} void -gmnal_stop_rxthread(gmnal_data_t *nal_data) +gmnal_free_rxs(gmnal_ni_t *gmni) { - int delay = 30; - + gmnal_rx_t *rx; + while ((rx = gmni->gmni_rxs) != NULL) { + gmni->gmni_rxs = rx->rx_next; - CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n", - nal_data); - - nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP; - - gmnal_remove_rxtwe(nal_data); - /* - * kick the thread - */ - up(&nal_data->rxtwe_wait); - - while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) { - CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n"); - gmnal_yield(1); - up(&nal_data->rxtwe_wait); - } + gmnal_free_rx(gmni, rx); + } - if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) { - CDEBUG(D_ERROR, "I don't know how to wake the thread\n"); - } else { - CDEBUG(D_INFO, "rx thread seems to have stopped\n"); - } + LASSERT (gmni->gmni_port == NULL); +#if 0 + /* GM releases all resources allocated to a port when it closes */ + if (gmni->gmni_rx_hash != NULL) + gm_destroy_hash(gmni->gmni_rx_hash); +#endif } -void -gmnal_stop_ctthread(gmnal_data_t *nal_data) +int +gmnal_alloc_rxs (gmnal_ni_t *gmni) { - int delay = 15; - - - - CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n", - nal_data); - - nal_data->ctthread_flag = GMNAL_THREAD_STOP; - GMNAL_GM_LOCK(nal_data); - gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10, - NULL, NULL); - GMNAL_GM_UNLOCK(nal_data); - - while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) { - CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n"); - gmnal_yield(1); - } - - if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_ERROR, "I DON'T KNOW HOW TO WAKE THE THREAD\n"); - } else { - CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n"); + int nrxcred = gm_num_receive_tokens(gmni->gmni_port); + int nrx_small = *gmnal_tunables.gm_nrx_small; + int nrx_large = *gmnal_tunables.gm_nrx_large; + int nrx = nrx_large + nrx_small; + int rc; + int i; + + CWARN("nrxcred: %d(%dL+%dS)\n", nrxcred, nrx_large, nrx_small); + + if (nrx > nrxcred) { + int nlarge = (nrx_large * nrxcred)/nrx; + int nsmall = nrxcred - nlarge; + + CWARN("Only %d rx credits: " + "reducing large %d->%d, small %d->%d\n", nrxcred, + nrx_large, nlarge, nrx_small, nsmall); + + *gmnal_tunables.gm_nrx_large = nrx_large = nlarge; + *gmnal_tunables.gm_nrx_small = nrx_small = nsmall; + nrx = nlarge + nsmall; + } + + gmni->gmni_rx_hash = gm_create_hash(gm_hash_compare_ptrs, + gm_hash_hash_ptr, 0, 0, nrx, 0); + if (gmni->gmni_rx_hash == NULL) { + CERROR("Failed to create hash table\n"); + return -ENOMEM; } -} + for (i = 0; i < nrx; i++ ) { + rc = gmnal_alloc_rx(gmni, i < nrx_large); + if (rc != 0) + return rc; + } + return 0; +} char * -gmnal_gm_error(gm_status_t status) +gmnal_gmstatus2str(gm_status_t status) { return(gm_strerror(status)); switch(status) { - case(GM_SUCCESS): - return("SUCCESS"); - case(GM_FAILURE): - return("FAILURE"); - case(GM_INPUT_BUFFER_TOO_SMALL): - return("INPUT_BUFFER_TOO_SMALL"); - case(GM_OUTPUT_BUFFER_TOO_SMALL): - return("OUTPUT_BUFFER_TOO_SMALL"); - case(GM_TRY_AGAIN ): - return("TRY_AGAIN"); - case(GM_BUSY): - return("BUSY"); - case(GM_MEMORY_FAULT): - return("MEMORY_FAULT"); - case(GM_INTERRUPTED): - return("INTERRUPTED"); - case(GM_INVALID_PARAMETER): - return("INVALID_PARAMETER"); - case(GM_OUT_OF_MEMORY): - return("OUT_OF_MEMORY"); - case(GM_INVALID_COMMAND): - return("INVALID_COMMAND"); - case(GM_PERMISSION_DENIED): - return("PERMISSION_DENIED"); - case(GM_INTERNAL_ERROR): - return("INTERNAL_ERROR"); - case(GM_UNATTACHED): - return("UNATTACHED"); - case(GM_UNSUPPORTED_DEVICE): - return("UNSUPPORTED_DEVICE"); - case(GM_SEND_TIMED_OUT): - return("GM_SEND_TIMEDOUT"); - case(GM_SEND_REJECTED): - return("GM_SEND_REJECTED"); - case(GM_SEND_TARGET_PORT_CLOSED): - return("GM_SEND_TARGET_PORT_CLOSED"); - case(GM_SEND_TARGET_NODE_UNREACHABLE): - return("GM_SEND_TARGET_NODE_UNREACHABLE"); - case(GM_SEND_DROPPED): - return("GM_SEND_DROPPED"); - case(GM_SEND_PORT_CLOSED): - return("GM_SEND_PORT_CLOSED"); - case(GM_NODE_ID_NOT_YET_SET): - return("GM_NODE_ID_NOT_YET_SET"); - case(GM_STILL_SHUTTING_DOWN): - return("GM_STILL_SHUTTING_DOWN"); - case(GM_CLONE_BUSY): - return("GM_CLONE_BUSY"); - case(GM_NO_SUCH_DEVICE): - return("GM_NO_SUCH_DEVICE"); - case(GM_ABORTED): - return("GM_ABORTED"); - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - return("GM_INCOMPATIBLE_LIB_AND_DRIVER"); - case(GM_UNTRANSLATED_SYSTEM_ERROR): - return("GM_UNTRANSLATED_SYSTEM_ERROR"); - case(GM_ACCESS_DENIED): - return("GM_ACCESS_DENIED"); - - -/* - * These ones are in the docs but aren't in the header file - case(GM_DEV_NOT_FOUND): - return("GM_DEV_NOT_FOUND"); - case(GM_INVALID_PORT_NUMBER): - return("GM_INVALID_PORT_NUMBER"); - case(GM_UC_ERROR): - return("GM_US_ERROR"); - case(GM_PAGE_TABLE_FULL): - return("GM_PAGE_TABLE_FULL"); - case(GM_MINOR_OVERFLOW): - return("GM_MINOR_OVERFLOW"); - case(GM_SEND_ORPHANED): - return("GM_SEND_ORPHANED"); - case(GM_HARDWARE_FAULT): - return("GM_HARDWARE_FAULT"); - case(GM_DATA_CORRUPTED): - return("GM_DATA_CORRUPTED"); - case(GM_TIMED_OUT): - return("GM_TIMED_OUT"); - case(GM_USER_ERROR): - return("GM_USER_ERROR"); - case(GM_NO_MATCH): - return("GM_NOMATCH"); - case(GM_NOT_SUPPORTED_IN_KERNEL): - return("GM_NOT_SUPPORTED_IN_KERNEL"); - case(GM_NOT_SUPPORTED_ON_ARCH): - return("GM_NOT_SUPPORTED_ON_ARCH"); - case(GM_PTE_REF_CNT_OVERFLOW): - return("GM_PTR_REF_CNT_OVERFLOW"); - case(GM_NO_DRIVER_SUPPORT): - return("GM_NO_DRIVER_SUPPORT"); - case(GM_FIRMWARE_NOT_RUNNING): - return("GM_FIRMWARE_NOT_RUNNING"); + case(GM_SUCCESS): + return("SUCCESS"); + case(GM_FAILURE): + return("FAILURE"); + case(GM_INPUT_BUFFER_TOO_SMALL): + return("INPUT_BUFFER_TOO_SMALL"); + case(GM_OUTPUT_BUFFER_TOO_SMALL): + return("OUTPUT_BUFFER_TOO_SMALL"); + case(GM_TRY_AGAIN ): + return("TRY_AGAIN"); + case(GM_BUSY): + return("BUSY"); + case(GM_MEMORY_FAULT): + return("MEMORY_FAULT"); + case(GM_INTERRUPTED): + return("INTERRUPTED"); + case(GM_INVALID_PARAMETER): + return("INVALID_PARAMETER"); + case(GM_OUT_OF_MEMORY): + return("OUT_OF_MEMORY"); + case(GM_INVALID_COMMAND): + return("INVALID_COMMAND"); + case(GM_PERMISSION_DENIED): + return("PERMISSION_DENIED"); + case(GM_INTERNAL_ERROR): + return("INTERNAL_ERROR"); + case(GM_UNATTACHED): + return("UNATTACHED"); + case(GM_UNSUPPORTED_DEVICE): + return("UNSUPPORTED_DEVICE"); + case(GM_SEND_TIMED_OUT): + return("GM_SEND_TIMEDOUT"); + case(GM_SEND_REJECTED): + return("GM_SEND_REJECTED"); + case(GM_SEND_TARGET_PORT_CLOSED): + return("GM_SEND_TARGET_PORT_CLOSED"); + case(GM_SEND_TARGET_NODE_UNREACHABLE): + return("GM_SEND_TARGET_NODE_UNREACHABLE"); + case(GM_SEND_DROPPED): + return("GM_SEND_DROPPED"); + case(GM_SEND_PORT_CLOSED): + return("GM_SEND_PORT_CLOSED"); + case(GM_NODE_ID_NOT_YET_SET): + return("GM_NODE_ID_NOT_YET_SET"); + case(GM_STILL_SHUTTING_DOWN): + return("GM_STILL_SHUTTING_DOWN"); + case(GM_CLONE_BUSY): + return("GM_CLONE_BUSY"); + case(GM_NO_SUCH_DEVICE): + return("GM_NO_SUCH_DEVICE"); + case(GM_ABORTED): + return("GM_ABORTED"); + case(GM_INCOMPATIBLE_LIB_AND_DRIVER): + return("GM_INCOMPATIBLE_LIB_AND_DRIVER"); + case(GM_UNTRANSLATED_SYSTEM_ERROR): + return("GM_UNTRANSLATED_SYSTEM_ERROR"); + case(GM_ACCESS_DENIED): + return("GM_ACCESS_DENIED"); + + + /* + * These ones are in the docs but aren't in the header file + case(GM_DEV_NOT_FOUND): + return("GM_DEV_NOT_FOUND"); + case(GM_INVALID_PORT_NUMBER): + return("GM_INVALID_PORT_NUMBER"); + case(GM_UC_ERROR): + return("GM_US_ERROR"); + case(GM_PAGE_TABLE_FULL): + return("GM_PAGE_TABLE_FULL"); + case(GM_MINOR_OVERFLOW): + return("GM_MINOR_OVERFLOW"); + case(GM_SEND_ORPHANED): + return("GM_SEND_ORPHANED"); + case(GM_HARDWARE_FAULT): + return("GM_HARDWARE_FAULT"); + case(GM_DATA_CORRUPTED): + return("GM_DATA_CORRUPTED"); + case(GM_TIMED_OUT): + return("GM_TIMED_OUT"); + case(GM_USER_ERROR): + return("GM_USER_ERROR"); + case(GM_NO_MATCH): + return("GM_NOMATCH"); + case(GM_NOT_SUPPORTED_IN_KERNEL): + return("GM_NOT_SUPPORTED_IN_KERNEL"); + case(GM_NOT_SUPPORTED_ON_ARCH): + return("GM_NOT_SUPPORTED_ON_ARCH"); + case(GM_PTE_REF_CNT_OVERFLOW): + return("GM_PTR_REF_CNT_OVERFLOW"); + case(GM_NO_DRIVER_SUPPORT): + return("GM_NO_DRIVER_SUPPORT"); + case(GM_FIRMWARE_NOT_RUNNING): + return("GM_FIRMWARE_NOT_RUNNING"); + * These ones are in the docs but aren't in the header file + */ - * These ones are in the docs but aren't in the header file - */ - default: - return("UNKNOWN GM ERROR CODE"); + default: + return("UNKNOWN GM ERROR CODE"); } } char * -gmnal_rxevent(gm_recv_event_t *ev) +gmnal_rxevent2str(gm_recv_event_t *ev) { short event; event = GM_RECV_EVENT_TYPE(ev); switch(event) { - case(GM_NO_RECV_EVENT): - return("GM_NO_RECV_EVENT"); - case(GM_SENDS_FAILED_EVENT): - return("GM_SEND_FAILED_EVENT"); - case(GM_ALARM_EVENT): - return("GM_ALARM_EVENT"); - case(GM_SENT_EVENT): - return("GM_SENT_EVENT"); - case(_GM_SLEEP_EVENT): - return("_GM_SLEEP_EVENT"); - case(GM_RAW_RECV_EVENT): - return("GM_RAW_RECV_EVENT"); - case(GM_BAD_SEND_DETECTED_EVENT): - return("GM_BAD_SEND_DETECTED_EVENT"); - case(GM_SEND_TOKEN_VIOLATION_EVENT): - return("GM_SEND_TOKEN_VIOLATION_EVENT"); - case(GM_RECV_TOKEN_VIOLATION_EVENT): - return("GM_RECV_TOKEN_VIOLATION_EVENT"); - case(GM_BAD_RECV_TOKEN_EVENT): - return("GM_BAD_RECV_TOKEN_EVENT"); - case(GM_ALARM_VIOLATION_EVENT): - return("GM_ALARM_VIOLATION_EVENT"); - case(GM_RECV_EVENT): - return("GM_RECV_EVENT"); - case(GM_HIGH_RECV_EVENT): - return("GM_HIGH_RECV_EVENT"); - case(GM_PEER_RECV_EVENT): - return("GM_PEER_RECV_EVENT"); - case(GM_HIGH_PEER_RECV_EVENT): - return("GM_HIGH_PEER_RECV_EVENT"); - case(GM_FAST_RECV_EVENT): - return("GM_FAST_RECV_EVENT"); - case(GM_FAST_HIGH_RECV_EVENT): - return("GM_FAST_HIGH_RECV_EVENT"); - case(GM_FAST_PEER_RECV_EVENT): - return("GM_FAST_PEER_RECV_EVENT"); - case(GM_FAST_HIGH_PEER_RECV_EVENT): - return("GM_FAST_HIGH_PEER_RECV_EVENT"); - case(GM_REJECTED_SEND_EVENT): - return("GM_REJECTED_SEND_EVENT"); - case(GM_ORPHANED_SEND_EVENT): - return("GM_ORPHANED_SEND_EVENT"); - case(GM_BAD_RESEND_DETECTED_EVENT): - return("GM_BAD_RESEND_DETETED_EVENT"); - case(GM_DROPPED_SEND_EVENT): - return("GM_DROPPED_SEND_EVENT"); - case(GM_BAD_SEND_VMA_EVENT): - return("GM_BAD_SEND_VMA_EVENT"); - case(GM_BAD_RECV_VMA_EVENT): - return("GM_BAD_RECV_VMA_EVENT"); - case(_GM_FLUSHED_ALARM_EVENT): - return("GM_FLUSHED_ALARM_EVENT"); - case(GM_SENT_TOKENS_EVENT): - return("GM_SENT_TOKENS_EVENTS"); - case(GM_IGNORE_RECV_EVENT): - return("GM_IGNORE_RECV_EVENT"); - case(GM_ETHERNET_RECV_EVENT): - return("GM_ETHERNET_RECV_EVENT"); - case(GM_NEW_NO_RECV_EVENT): - return("GM_NEW_NO_RECV_EVENT"); - case(GM_NEW_SENDS_FAILED_EVENT): - return("GM_NEW_SENDS_FAILED_EVENT"); - case(GM_NEW_ALARM_EVENT): - return("GM_NEW_ALARM_EVENT"); - case(GM_NEW_SENT_EVENT): - return("GM_NEW_SENT_EVENT"); - case(_GM_NEW_SLEEP_EVENT): - return("GM_NEW_SLEEP_EVENT"); - case(GM_NEW_RAW_RECV_EVENT): - return("GM_NEW_RAW_RECV_EVENT"); - case(GM_NEW_BAD_SEND_DETECTED_EVENT): - return("GM_NEW_BAD_SEND_DETECTED_EVENT"); - case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT): - return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT"); - case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT): - return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT"); - case(GM_NEW_BAD_RECV_TOKEN_EVENT): - return("GM_NEW_BAD_RECV_TOKEN_EVENT"); - case(GM_NEW_ALARM_VIOLATION_EVENT): - return("GM_NEW_ALARM_VIOLATION_EVENT"); - case(GM_NEW_RECV_EVENT): - return("GM_NEW_RECV_EVENT"); - case(GM_NEW_HIGH_RECV_EVENT): - return("GM_NEW_HIGH_RECV_EVENT"); - case(GM_NEW_PEER_RECV_EVENT): - return("GM_NEW_PEER_RECV_EVENT"); - case(GM_NEW_HIGH_PEER_RECV_EVENT): - return("GM_NEW_HIGH_PEER_RECV_EVENT"); - case(GM_NEW_FAST_RECV_EVENT): - return("GM_NEW_FAST_RECV_EVENT"); - case(GM_NEW_FAST_HIGH_RECV_EVENT): - return("GM_NEW_FAST_HIGH_RECV_EVENT"); - case(GM_NEW_FAST_PEER_RECV_EVENT): - return("GM_NEW_FAST_PEER_RECV_EVENT"); - case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT): - return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT"); - case(GM_NEW_REJECTED_SEND_EVENT): - return("GM_NEW_REJECTED_SEND_EVENT"); - case(GM_NEW_ORPHANED_SEND_EVENT): - return("GM_NEW_ORPHANED_SEND_EVENT"); - case(_GM_NEW_PUT_NOTIFICATION_EVENT): - return("_GM_NEW_PUT_NOTIFICATION_EVENT"); - case(GM_NEW_FREE_SEND_TOKEN_EVENT): - return("GM_NEW_FREE_SEND_TOKEN_EVENT"); - case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT): - return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT"); - case(GM_NEW_BAD_RESEND_DETECTED_EVENT): - return("GM_NEW_BAD_RESEND_DETECTED_EVENT"); - case(GM_NEW_DROPPED_SEND_EVENT): - return("GM_NEW_DROPPED_SEND_EVENT"); - case(GM_NEW_BAD_SEND_VMA_EVENT): - return("GM_NEW_BAD_SEND_VMA_EVENT"); - case(GM_NEW_BAD_RECV_VMA_EVENT): - return("GM_NEW_BAD_RECV_VMA_EVENT"); - case(_GM_NEW_FLUSHED_ALARM_EVENT): - return("GM_NEW_FLUSHED_ALARM_EVENT"); - case(GM_NEW_SENT_TOKENS_EVENT): - return("GM_NEW_SENT_TOKENS_EVENT"); - case(GM_NEW_IGNORE_RECV_EVENT): - return("GM_NEW_IGNORE_RECV_EVENT"); - case(GM_NEW_ETHERNET_RECV_EVENT): - return("GM_NEW_ETHERNET_RECV_EVENT"); - default: - return("Unknown Recv event"); -#if 0 - case(/* _GM_PUT_NOTIFICATION_EVENT */ - case(/* GM_FREE_SEND_TOKEN_EVENT */ - case(/* GM_FREE_HIGH_SEND_TOKEN_EVENT */ -#endif - } + case(GM_NO_RECV_EVENT): + return("GM_NO_RECV_EVENT"); + case(GM_SENDS_FAILED_EVENT): + return("GM_SEND_FAILED_EVENT"); + case(GM_ALARM_EVENT): + return("GM_ALARM_EVENT"); + case(GM_SENT_EVENT): + return("GM_SENT_EVENT"); + case(_GM_SLEEP_EVENT): + return("_GM_SLEEP_EVENT"); + case(GM_RAW_RECV_EVENT): + return("GM_RAW_RECV_EVENT"); + case(GM_BAD_SEND_DETECTED_EVENT): + return("GM_BAD_SEND_DETECTED_EVENT"); + case(GM_SEND_TOKEN_VIOLATION_EVENT): + return("GM_SEND_TOKEN_VIOLATION_EVENT"); + case(GM_RECV_TOKEN_VIOLATION_EVENT): + return("GM_RECV_TOKEN_VIOLATION_EVENT"); + case(GM_BAD_RECV_TOKEN_EVENT): + return("GM_BAD_RECV_TOKEN_EVENT"); + case(GM_ALARM_VIOLATION_EVENT): + return("GM_ALARM_VIOLATION_EVENT"); + case(GM_RECV_EVENT): + return("GM_RECV_EVENT"); + case(GM_HIGH_RECV_EVENT): + return("GM_HIGH_RECV_EVENT"); + case(GM_PEER_RECV_EVENT): + return("GM_PEER_RECV_EVENT"); + case(GM_HIGH_PEER_RECV_EVENT): + return("GM_HIGH_PEER_RECV_EVENT"); + case(GM_FAST_RECV_EVENT): + return("GM_FAST_RECV_EVENT"); + case(GM_FAST_HIGH_RECV_EVENT): + return("GM_FAST_HIGH_RECV_EVENT"); + case(GM_FAST_PEER_RECV_EVENT): + return("GM_FAST_PEER_RECV_EVENT"); + case(GM_FAST_HIGH_PEER_RECV_EVENT): + return("GM_FAST_HIGH_PEER_RECV_EVENT"); + case(GM_REJECTED_SEND_EVENT): + return("GM_REJECTED_SEND_EVENT"); + case(GM_ORPHANED_SEND_EVENT): + return("GM_ORPHANED_SEND_EVENT"); + case(GM_BAD_RESEND_DETECTED_EVENT): + return("GM_BAD_RESEND_DETETED_EVENT"); + case(GM_DROPPED_SEND_EVENT): + return("GM_DROPPED_SEND_EVENT"); + case(GM_BAD_SEND_VMA_EVENT): + return("GM_BAD_SEND_VMA_EVENT"); + case(GM_BAD_RECV_VMA_EVENT): + return("GM_BAD_RECV_VMA_EVENT"); + case(_GM_FLUSHED_ALARM_EVENT): + return("GM_FLUSHED_ALARM_EVENT"); + case(GM_SENT_TOKENS_EVENT): + return("GM_SENT_TOKENS_EVENTS"); + case(GM_IGNORE_RECV_EVENT): + return("GM_IGNORE_RECV_EVENT"); + case(GM_ETHERNET_RECV_EVENT): + return("GM_ETHERNET_RECV_EVENT"); + case(GM_NEW_NO_RECV_EVENT): + return("GM_NEW_NO_RECV_EVENT"); + case(GM_NEW_SENDS_FAILED_EVENT): + return("GM_NEW_SENDS_FAILED_EVENT"); + case(GM_NEW_ALARM_EVENT): + return("GM_NEW_ALARM_EVENT"); + case(GM_NEW_SENT_EVENT): + return("GM_NEW_SENT_EVENT"); + case(_GM_NEW_SLEEP_EVENT): + return("GM_NEW_SLEEP_EVENT"); + case(GM_NEW_RAW_RECV_EVENT): + return("GM_NEW_RAW_RECV_EVENT"); + case(GM_NEW_BAD_SEND_DETECTED_EVENT): + return("GM_NEW_BAD_SEND_DETECTED_EVENT"); + case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT): + return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT"); + case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT): + return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT"); + case(GM_NEW_BAD_RECV_TOKEN_EVENT): + return("GM_NEW_BAD_RECV_TOKEN_EVENT"); + case(GM_NEW_ALARM_VIOLATION_EVENT): + return("GM_NEW_ALARM_VIOLATION_EVENT"); + case(GM_NEW_RECV_EVENT): + return("GM_NEW_RECV_EVENT"); + case(GM_NEW_HIGH_RECV_EVENT): + return("GM_NEW_HIGH_RECV_EVENT"); + case(GM_NEW_PEER_RECV_EVENT): + return("GM_NEW_PEER_RECV_EVENT"); + case(GM_NEW_HIGH_PEER_RECV_EVENT): + return("GM_NEW_HIGH_PEER_RECV_EVENT"); + case(GM_NEW_FAST_RECV_EVENT): + return("GM_NEW_FAST_RECV_EVENT"); + case(GM_NEW_FAST_HIGH_RECV_EVENT): + return("GM_NEW_FAST_HIGH_RECV_EVENT"); + case(GM_NEW_FAST_PEER_RECV_EVENT): + return("GM_NEW_FAST_PEER_RECV_EVENT"); + case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT): + return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT"); + case(GM_NEW_REJECTED_SEND_EVENT): + return("GM_NEW_REJECTED_SEND_EVENT"); + case(GM_NEW_ORPHANED_SEND_EVENT): + return("GM_NEW_ORPHANED_SEND_EVENT"); + case(_GM_NEW_PUT_NOTIFICATION_EVENT): + return("_GM_NEW_PUT_NOTIFICATION_EVENT"); + case(GM_NEW_FREE_SEND_TOKEN_EVENT): + return("GM_NEW_FREE_SEND_TOKEN_EVENT"); + case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT): + return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT"); + case(GM_NEW_BAD_RESEND_DETECTED_EVENT): + return("GM_NEW_BAD_RESEND_DETECTED_EVENT"); + case(GM_NEW_DROPPED_SEND_EVENT): + return("GM_NEW_DROPPED_SEND_EVENT"); + case(GM_NEW_BAD_SEND_VMA_EVENT): + return("GM_NEW_BAD_SEND_VMA_EVENT"); + case(GM_NEW_BAD_RECV_VMA_EVENT): + return("GM_NEW_BAD_RECV_VMA_EVENT"); + case(_GM_NEW_FLUSHED_ALARM_EVENT): + return("GM_NEW_FLUSHED_ALARM_EVENT"); + case(GM_NEW_SENT_TOKENS_EVENT): + return("GM_NEW_SENT_TOKENS_EVENT"); + case(GM_NEW_IGNORE_RECV_EVENT): + return("GM_NEW_IGNORE_RECV_EVENT"); + case(GM_NEW_ETHERNET_RECV_EVENT): + return("GM_NEW_ETHERNET_RECV_EVENT"); + default: + return("Unknown Recv event"); + /* _GM_PUT_NOTIFICATION_EVENT */ + /* GM_FREE_SEND_TOKEN_EVENT */ + /* GM_FREE_HIGH_SEND_TOKEN_EVENT */ + } } @@ -874,203 +675,3 @@ gmnal_yield(int delay) set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(delay); } - -int -gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, - int len) -{ - - CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, - GMNAL_SMALL_MSG_SIZE(nal_data)); - - if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) - < GMNAL_SMALL_MSG_SIZE(nal_data)) { - - CDEBUG(D_INFO, "Yep, small message\n"); - return(1); - } else { - CDEBUG(D_ERROR, "No, not small message\n"); - /* - * could be made up of lots of little ones ! - */ - return(0); - } - -} - -/* - * extract info from the receive event. - * Have to do this before the next call to gm_receive - * Deal with all endian stuff here. - * Then stick work entry on list where rxthreads - * can get it to complete the receive - */ -int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) -{ - gmnal_rxtwe_t *we = NULL; - - CDEBUG(D_NET, "adding entry to list\n"); - - PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t)); - if (!we) { - CDEBUG(D_ERROR, "failed to malloc\n"); - return(GMNAL_STATUS_FAIL); - } - we->buffer = gm_ntohp(recv->buffer); - we->snode = (int)gm_ntoh_u16(recv->sender_node_id); - we->sport = (int)gm_ntoh_u8(recv->sender_port_id); - we->type = (int)gm_ntoh_u8(recv->type); - we->length = (int)gm_ntohl(recv->length); - - spin_lock(&nal_data->rxtwe_lock); - if (nal_data->rxtwe_tail) { - nal_data->rxtwe_tail->next = we; - } else { - nal_data->rxtwe_head = we; - nal_data->rxtwe_tail = we; - } - nal_data->rxtwe_tail = we; - spin_unlock(&nal_data->rxtwe_lock); - - up(&nal_data->rxtwe_wait); - return(GMNAL_STATUS_OK); -} - -void -gmnal_remove_rxtwe(gmnal_data_t *nal_data) -{ - gmnal_rxtwe_t *_we, *we = nal_data->rxtwe_head; - - CDEBUG(D_NET, "removing all work list entries\n"); - - spin_lock(&nal_data->rxtwe_lock); - CDEBUG(D_NET, "Got lock\n"); - while (we) { - _we = we; - we = we->next; - PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t)); - } - spin_unlock(&nal_data->rxtwe_lock); - nal_data->rxtwe_head = NULL; - nal_data->rxtwe_tail = NULL; -} - -gmnal_rxtwe_t * -gmnal_get_rxtwe(gmnal_data_t *nal_data) -{ - gmnal_rxtwe_t *we = NULL; - - CDEBUG(D_NET, "Getting entry to list\n"); - - do { - while(down_interruptible(&nal_data->rxtwe_wait) != 0) - /* do nothing */; - if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) { - /* - * time to stop - * TO DO some one free the work entries - */ - return(NULL); - } - spin_lock(&nal_data->rxtwe_lock); - if (nal_data->rxtwe_head) { - CDEBUG(D_INFO, "Got a work entry\n"); - we = nal_data->rxtwe_head; - nal_data->rxtwe_head = we->next; - if (!nal_data->rxtwe_head) - nal_data->rxtwe_tail = NULL; - } else { - CDEBUG(D_WARNING, "woken but no work\n"); - } - spin_unlock(&nal_data->rxtwe_lock); - } while (!we); - - CDEBUG(D_INFO, "Returning we[%p]\n", we); - return(we); -} - - -/* - * Start the caretaker thread and a number of receiver threads - * The caretaker thread gets events from the gm library. - * It passes receive events to the receiver threads via a work list. - * It processes other events itself in gm_unknown. These will be - * callback events or sleeps. - */ -int -gmnal_start_kernel_threads(gmnal_data_t *nal_data) -{ - - int threads = 0; - /* - * the alarm is used to wake the caretaker thread from - * gm_unknown call (sleeping) to exit it. - */ - CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n"); - gm_initialize_alarm(&nal_data->ctthread_alarm); - nal_data->ctthread_flag = GMNAL_THREAD_RESET; - - - CDEBUG(D_INFO, "Starting caretaker thread\n"); - nal_data->ctthread_pid = - kernel_thread(gmnal_ct_thread, (void*)nal_data, 0); - if (nal_data->ctthread_pid <= 0) { - CDEBUG(D_ERROR, "Caretaker thread failed to start\n"); - return(GMNAL_STATUS_FAIL); - } - - while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) { - gmnal_yield(1); - CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n"); - } - - CDEBUG(D_INFO, "caretaker thread has started\n"); - - - /* - * Now start a number of receiver threads - * these treads get work to do from the caretaker (ct) thread - */ - nal_data->rxthread_flag = GMNAL_THREAD_RESET; - nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET; - - for (threads=0; threadsrxthread_pid[threads] = -1; - spin_lock_init(&nal_data->rxtwe_lock); - spin_lock_init(&nal_data->rxthread_flag_lock); - sema_init(&nal_data->rxtwe_wait, 0); - nal_data->rxtwe_head = NULL; - nal_data->rxtwe_tail = NULL; - /* - * If the default number of receive threades isn't - * modified at load time, then start one thread per cpu - */ - if (num_rx_threads == -1) - num_rx_threads = smp_num_cpus; - CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads); - for (threads=0; threadsrxthread_pid[threads] = - kernel_thread(gmnal_rx_thread, (void*)nal_data, 0); - if (nal_data->rxthread_pid[threads] <= 0) { - CDEBUG(D_ERROR, "Receive thread failed to start\n"); - gmnal_stop_rxthread(nal_data); - gmnal_stop_ctthread(nal_data); - return(GMNAL_STATUS_FAIL); - } - } - - for (;;) { - spin_lock(&nal_data->rxthread_flag_lock); - if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) { - spin_unlock(&nal_data->rxthread_flag_lock); - break; - } - spin_unlock(&nal_data->rxthread_flag_lock); - gmnal_yield(1); - } - - CDEBUG(D_INFO, "receive threads seem to have started\n"); - - return(GMNAL_STATUS_OK); -} diff --git a/lnet/lnet/acceptor.c b/lnet/lnet/acceptor.c index 89a6763..2940c5a 100644 --- a/lnet/lnet/acceptor.c +++ b/lnet/lnet/acceptor.c @@ -310,7 +310,7 @@ ptl_acceptor(void *arg) __u32 peer_ip; int peer_port; ptl_ni_t *blind_ni; - int secure = (int)arg; + int secure = (int)((unsigned long)arg); LASSERT (ptl_acceptor_state.pta_sock == NULL); diff --git a/lnet/ulnds/connection.c b/lnet/ulnds/connection.c index cabfe21..df115d9 100644 --- a/lnet/ulnds/connection.c +++ b/lnet/ulnds/connection.c @@ -245,7 +245,7 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) int nob; ptl_acceptor_connreq_t cr; ptl_hdr_t hdr; - ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid; + ptl_magicversion_t hmv; gettimeofday(&tv, NULL); incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; @@ -255,13 +255,18 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) cr.acr_version = PTL_PROTO_ACCEPTOR_VERSION; cr.acr_nid = nid; - CLASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid)); + /* hmv initialised and copied separately into hdr; compiler "optimize" + * likely due to confusion about pointer alias of hmv and hdr when this + * was done in-place. */ + hmv.magic = cpu_to_le32(PTL_PROTO_TCP_MAGIC); + hmv.version_major = cpu_to_le32(PTL_PROTO_TCP_VERSION_MAJOR); + hmv.version_minor = cpu_to_le32(PTL_PROTO_TCP_VERSION_MINOR); memset (&hdr, 0, sizeof (hdr)); - hmv->magic = cpu_to_le32(PTL_PROTO_TCP_MAGIC); - hmv->version_major = cpu_to_le32(PTL_PROTO_TCP_VERSION_MAJOR); - hmv->version_minor = cpu_to_le32(PTL_PROTO_TCP_VERSION_MINOR); - + + CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid)); + memcpy(&hdr.dest_nid, &hmv, sizeof(hmv)); + /* hdr.src_nid/src_pid are ignored at dest */ hdr.type = cpu_to_le32(PTL_MSG_HELLO); @@ -279,23 +284,23 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) if (rc != 0) return -1; - rc = tcpnal_read(nid, sockfd, hmv, sizeof(*hmv)); + rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv)); if (rc != 0) return -1; - if (hmv->magic != le32_to_cpu(PTL_PROTO_TCP_MAGIC)) { + if (hmv.magic != le32_to_cpu(PTL_PROTO_TCP_MAGIC)) { CERROR ("Bad magic %#08x (%#08x expected) from %s\n", - cpu_to_le32(hmv->magic), PTL_PROTO_TCP_MAGIC, + cpu_to_le32(hmv.magic), PTL_PROTO_TCP_MAGIC, libcfs_nid2str(nid)); return -1; } - if (hmv->version_major != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MAJOR) || - hmv->version_minor != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MINOR)) { + if (hmv.version_major != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MAJOR) || + hmv.version_minor != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MINOR)) { CERROR ("Incompatible protocol version %d.%d (%d.%d expected)" " from %s\n", - le16_to_cpu (hmv->version_major), - le16_to_cpu (hmv->version_minor), + le16_to_cpu (hmv.version_major), + le16_to_cpu (hmv.version_minor), PTL_PROTO_TCP_VERSION_MAJOR, PTL_PROTO_TCP_VERSION_MINOR, libcfs_nid2str(nid)); @@ -308,7 +313,8 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) /* version 1 sends magic/version as the dest_nid of a 'hello' header, * so read the rest of it in now... */ - rc = tcpnal_read(nid, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv)); + rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv), + sizeof(hdr) - sizeof(hmv)); if (rc != 0) return -1; diff --git a/lnet/ulnds/socklnd/connection.c b/lnet/ulnds/socklnd/connection.c index cabfe21..df115d9 100644 --- a/lnet/ulnds/socklnd/connection.c +++ b/lnet/ulnds/socklnd/connection.c @@ -245,7 +245,7 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) int nob; ptl_acceptor_connreq_t cr; ptl_hdr_t hdr; - ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid; + ptl_magicversion_t hmv; gettimeofday(&tv, NULL); incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; @@ -255,13 +255,18 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) cr.acr_version = PTL_PROTO_ACCEPTOR_VERSION; cr.acr_nid = nid; - CLASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid)); + /* hmv initialised and copied separately into hdr; compiler "optimize" + * likely due to confusion about pointer alias of hmv and hdr when this + * was done in-place. */ + hmv.magic = cpu_to_le32(PTL_PROTO_TCP_MAGIC); + hmv.version_major = cpu_to_le32(PTL_PROTO_TCP_VERSION_MAJOR); + hmv.version_minor = cpu_to_le32(PTL_PROTO_TCP_VERSION_MINOR); memset (&hdr, 0, sizeof (hdr)); - hmv->magic = cpu_to_le32(PTL_PROTO_TCP_MAGIC); - hmv->version_major = cpu_to_le32(PTL_PROTO_TCP_VERSION_MAJOR); - hmv->version_minor = cpu_to_le32(PTL_PROTO_TCP_VERSION_MINOR); - + + CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid)); + memcpy(&hdr.dest_nid, &hmv, sizeof(hmv)); + /* hdr.src_nid/src_pid are ignored at dest */ hdr.type = cpu_to_le32(PTL_MSG_HELLO); @@ -279,23 +284,23 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) if (rc != 0) return -1; - rc = tcpnal_read(nid, sockfd, hmv, sizeof(*hmv)); + rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv)); if (rc != 0) return -1; - if (hmv->magic != le32_to_cpu(PTL_PROTO_TCP_MAGIC)) { + if (hmv.magic != le32_to_cpu(PTL_PROTO_TCP_MAGIC)) { CERROR ("Bad magic %#08x (%#08x expected) from %s\n", - cpu_to_le32(hmv->magic), PTL_PROTO_TCP_MAGIC, + cpu_to_le32(hmv.magic), PTL_PROTO_TCP_MAGIC, libcfs_nid2str(nid)); return -1; } - if (hmv->version_major != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MAJOR) || - hmv->version_minor != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MINOR)) { + if (hmv.version_major != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MAJOR) || + hmv.version_minor != cpu_to_le16 (PTL_PROTO_TCP_VERSION_MINOR)) { CERROR ("Incompatible protocol version %d.%d (%d.%d expected)" " from %s\n", - le16_to_cpu (hmv->version_major), - le16_to_cpu (hmv->version_minor), + le16_to_cpu (hmv.version_major), + le16_to_cpu (hmv.version_minor), PTL_PROTO_TCP_VERSION_MAJOR, PTL_PROTO_TCP_VERSION_MINOR, libcfs_nid2str(nid)); @@ -308,7 +313,8 @@ tcpnal_hello (int sockfd, lnet_nid_t nid) /* version 1 sends magic/version as the dest_nid of a 'hello' header, * so read the rest of it in now... */ - rc = tcpnal_read(nid, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv)); + rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv), + sizeof(hdr) - sizeof(hmv)); if (rc != 0) return -1; -- 1.8.3.1