From e76a10c16bcf19d91dc90cadd9928b9bcb2215d1 Mon Sep 17 00:00:00 2001 From: eeb Date: Fri, 19 Dec 2003 13:58:28 +0000 Subject: [PATCH] * PtlMDUnlink() can no longer return PTL_MD_INUSE, since it commits the MD for destruction. If no network I/O is current at the time, a PTL_EVENT_UNLINK event is created. * The 'unlinked_me' field of an event has been replaced by a simple flag 'unlinked' that is set if the event signals the destruction of the MD. * Events have a new 'status' field. This is PTL_OK on successful completion, and any other portals errno on completion with failure. CWARN() messages in these callbacks log abnormal completion. * All event callbacks changed to handle the UNLINK event, completion status and unlinked flag. * All abnormal completions changed to work with PltMDUnlink and the new callbacks. * Removed bd_complete from ptlrpc_bulk_desc and added bd_success. Communications have completed when bd_network_rw gets cleared. If bd_success is set, then bd_nob_transferred tells you how much data was sent/received. * Changed MDS and OST bulk completion to deal with failed bulk transfers. The PtlBD server just LASSERTS things went OK, so we can be reminded to implement better error handling there too. * ptlrpc_wake_client_req() inline helper. * Changed the lib/NAL interface as follows.... . cb_callback() is optional and defaults to calling the event queue's callback if it is left NULL. . cb_read(), cb_write(), cb_map(), cb_map_pages(), return PTL_OK on success and another portals errno on failure. . cb_send(), cb_send_pages(), cb_recv(), cb_recv_pages() return PTL_OK if and only if they can commit to calling lib_finalize() when the relevent message completes (possibly with error). . cb_send(), cb_send_pages(), cb_recv(), cb_recv_pages() may not modify the iovec/ptl_kiov_t they are passed, and must do I/O on the subsection of this scatter/gather buffer starting at 'offset' for 'mlen' bytes. This greatly simplifies portals lib level descriptor management at minimal expense to the NAL. . portals lib now exports lib_extract_iov(), lib_extract_kiov() and the other iov helpers take an additional 'offset' parameter, to simplify offset buffer coding in the NAL. . lib_parse() is void (i.e. returns no value). . lib_finalize() takes an addition ptl_errno_t completion status. ...note that NALs other than qswnal and socknal need to have these changes implemented properly and tested. * Swapped some loose fprintf()s for CERROR() * Dropped PORTAL_SLAB_ALLOC(); portals just uses PORTAL_ALLOC() now. Since there are no slabs now, I also changed #ifdef PTL_USE_SLAB_CACHE to #ifndef PTL_USE_LIB_FREELIST * Changed lib_msg_alloc() so it is _never_ called with the statelock held, just like all the other allocators. * Changed dynamic MD allocation to size the MD by the number of fragments. * Dropped a bunch of dross, plus the iovs from lib_msg_t so they become tiny again. --- lnet/include/linux/kp30.h | 37 -- lnet/include/lnet/errno.h | 3 +- lnet/include/lnet/lib-lnet.h | 140 +++-- lnet/include/lnet/lib-nal.h | 67 ++- lnet/include/lnet/lib-p30.h | 140 +++-- lnet/include/lnet/lib-types.h | 16 +- lnet/include/lnet/lnet.h | 1 - lnet/include/lnet/p30.h | 1 - lnet/include/lnet/types.h | 27 +- lnet/klnds/gmlnd/gmlnd.h | 4 +- lnet/klnds/gmlnd/gmlnd_cb.c | 13 - lnet/klnds/iblnd/ibnal_cb.c | 21 +- lnet/klnds/qswlnd/qswlnd_cb.c | 221 +++++--- lnet/klnds/scimaclnd/scimacnal_cb.c | 34 +- lnet/klnds/socklnd/socklnd.c | 15 +- lnet/klnds/socklnd/socklnd_cb.c | 88 ++- lnet/klnds/toelnd/toenal_cb.c | 53 +- lnet/libcfs/module.c | 2 + lnet/lnet/api-eq.c | 6 - lnet/lnet/api-errno.c | 1 - lnet/lnet/api-ni.c | 2 +- lnet/lnet/api-wrap.c | 4 +- lnet/lnet/lib-init.c | 112 +--- lnet/lnet/lib-md.c | 83 ++- lnet/lnet/lib-move.c | 788 +++++++++++++------------- lnet/lnet/lib-msg.c | 172 +++--- lnet/ulnds/proclib.c | 24 +- lnet/ulnds/socklnd/proclib.c | 24 +- lnet/ulnds/socklnd/tcplnd.c | 93 +-- lnet/ulnds/tcplnd.c | 93 +-- lustre/portals/include/linux/kp30.h | 37 -- lustre/portals/include/portals/errno.h | 3 +- lustre/portals/include/portals/lib-nal.h | 67 ++- lustre/portals/include/portals/lib-p30.h | 140 +++-- lustre/portals/include/portals/lib-types.h | 16 +- lustre/portals/include/portals/p30.h | 1 - lustre/portals/include/portals/types.h | 27 +- lustre/portals/knals/gmnal/gmnal.h | 4 +- lustre/portals/knals/gmnal/gmnal_cb.c | 13 - lustre/portals/knals/ibnal/ibnal_cb.c | 21 +- lustre/portals/knals/qswnal/qswnal_cb.c | 221 +++++--- lustre/portals/knals/scimacnal/scimacnal_cb.c | 34 +- lustre/portals/knals/socknal/socknal.c | 15 +- lustre/portals/knals/socknal/socknal_cb.c | 88 ++- lustre/portals/knals/toenal/toenal_cb.c | 53 +- lustre/portals/libcfs/module.c | 2 + lustre/portals/portals/api-eq.c | 6 - lustre/portals/portals/api-errno.c | 1 - lustre/portals/portals/api-ni.c | 2 +- lustre/portals/portals/api-wrap.c | 4 +- lustre/portals/portals/lib-init.c | 112 +--- lustre/portals/portals/lib-md.c | 83 ++- lustre/portals/portals/lib-move.c | 788 +++++++++++++------------- lustre/portals/portals/lib-msg.c | 172 +++--- lustre/portals/unals/proclib.c | 24 +- lustre/portals/unals/tcpnal.c | 93 +-- 56 files changed, 2150 insertions(+), 2162 deletions(-) diff --git a/lnet/include/linux/kp30.h b/lnet/include/linux/kp30.h index 3d60631..7c3d824 100644 --- a/lnet/include/linux/kp30.h +++ b/lnet/include/linux/kp30.h @@ -326,43 +326,6 @@ do { \ s, (ptr), atomic_read(&portal_kmemory)); \ } while (0) -#ifndef SLAB_MEMALLOC -#define SLAB_MEMALLOC 0 -#endif - -#define PORTAL_SLAB_ALLOC(ptr, slab, size) \ -do { \ - LASSERT(!in_interrupt()); \ - (ptr) = kmem_cache_alloc((slab), (SLAB_KERNEL | SLAB_MEMALLOC)); \ - if ((ptr) == NULL) { \ - CERROR("PORTALS: out of memory at %s:%d (tried to alloc" \ - " '" #ptr "' from slab '" #slab "')\n", __FILE__, \ - __LINE__); \ - CERROR("PORTALS: %d total bytes allocated by portals\n", \ - atomic_read(&portal_kmemory)); \ - } else { \ - portal_kmem_inc((ptr), (size)); \ - memset((ptr), 0, (size)); \ - } \ - CDEBUG(D_MALLOC, "kmalloced '" #ptr "': %d at %p (tot %d).\n", \ - (int)(size), (ptr), atomic_read(&portal_kmemory)); \ -} while (0) - -#define PORTAL_SLAB_FREE(ptr, slab, size) \ -do { \ - int s = (size); \ - if ((ptr) == NULL) { \ - CERROR("PORTALS: free NULL '" #ptr "' (%d bytes) at " \ - "%s:%d\n", s, __FILE__, __LINE__); \ - break; \ - } \ - memset((ptr), 0x5a, s); \ - kmem_cache_free((slab), ptr); \ - portal_kmem_dec((ptr), s); \ - CDEBUG(D_MALLOC, "kfreed '" #ptr "': %d at %p (tot %d).\n", \ - s, (ptr), atomic_read (&portal_kmemory)); \ -} while (0) - /* ------------------------------------------------------------------- */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) diff --git a/lnet/include/lnet/errno.h b/lnet/include/lnet/errno.h index 817936a..08f084a 100644 --- a/lnet/include/lnet/errno.h +++ b/lnet/include/lnet/errno.h @@ -50,9 +50,8 @@ typedef enum { PTL_IOV_TOO_SMALL = 31, PTL_EQ_INUSE = 32, - PTL_MD_INUSE = 33, - PTL_MAX_ERRNO = 33 + PTL_MAX_ERRNO = 32 } ptl_err_t; /* If you change these, you must update the string table in api-errno.c */ diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index c402828..10a7bc7 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -42,7 +41,7 @@ do { \ nal->cb_sti(nal, flagsp); \ } -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST #define MAX_MES 2048 #define MAX_MDS 2048 @@ -98,7 +97,7 @@ lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ unsigned long flags; @@ -142,8 +141,20 @@ lib_me_free (nal_cb_t *nal, lib_me_t *me) static inline lib_msg_t * lib_msg_alloc (nal_cb_t *nal) { - /* ALWAYS called with statelock held */ - return ((lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs)); + /* NEVER called with statelock held */ + unsigned long flags; + lib_msg_t *msg; + + state_lock (nal, &flags); + msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs); + state_unlock (nal, &flags); + + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } + return(msg); } static inline void @@ -155,26 +166,13 @@ lib_msg_free (nal_cb_t *nal, lib_msg_t *msg) #else -extern kmem_cache_t *ptl_md_slab; -extern kmem_cache_t *ptl_msg_slab; -extern kmem_cache_t *ptl_me_slab; -extern kmem_cache_t *ptl_eq_slab; -extern atomic_t md_in_use_count; -extern atomic_t msg_in_use_count; -extern atomic_t me_in_use_count; -extern atomic_t eq_in_use_count; - static inline lib_eq_t * lib_eq_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_eq_t *eq; - PORTAL_SLAB_ALLOC(eq, ptl_eq_slab, sizeof(*eq)); - - if (eq == NULL) - return (NULL); - atomic_inc (&eq_in_use_count); + PORTAL_ALLOC(eq, sizeof(*eq)); return (eq); } @@ -182,21 +180,34 @@ static inline void lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) { /* ALWAYS called with statelock held */ - atomic_dec (&eq_in_use_count); - PORTAL_SLAB_FREE(eq, ptl_eq_slab, sizeof(*eq)); + PORTAL_FREE(eq, sizeof(*eq)); } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ lib_md_t *md; - PORTAL_SLAB_ALLOC(md, ptl_md_slab, sizeof(*md)); - - if (md == NULL) - return (NULL); - - atomic_inc (&md_in_use_count); + int size; + int niov; + + if ((umd->options & PTL_MD_KIOV) != 0) { + niov = umd->niov; + size = offsetof(lib_md_t, md_iov.kiov[niov]); + } else { + niov = ((umd->options & PTL_MD_IOV) != 0) ? + umd->niov : 1; + size = offsetof(lib_md_t, md_iov.iov[niov]); + } + + PORTAL_ALLOC(md, size); + + if (md != NULL) { + /* Set here in case of early free */ + md->options = umd->options; + md->md_niov = niov; + } + return (md); } @@ -204,8 +215,14 @@ static inline void lib_md_free (nal_cb_t *nal, lib_md_t *md) { /* ALWAYS called with statelock held */ - atomic_dec (&md_in_use_count); - PORTAL_SLAB_FREE(md, ptl_md_slab, sizeof(*md)); + int size; + + if ((md->options & PTL_MD_KIOV) != 0) + size = offsetof(lib_md_t, md_iov.kiov[md->md_niov]); + else + size = offsetof(lib_md_t, md_iov.iov[md->md_niov]); + + PORTAL_FREE(md, size); } static inline lib_me_t * @@ -213,12 +230,8 @@ lib_me_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_me_t *me; - PORTAL_SLAB_ALLOC(me, ptl_me_slab, sizeof(*me)); - - if (me == NULL) - return (NULL); - atomic_inc (&me_in_use_count); + PORTAL_ALLOC(me, sizeof(*me)); return (me); } @@ -226,21 +239,21 @@ static inline void lib_me_free(nal_cb_t *nal, lib_me_t *me) { /* ALWAYS called with statelock held */ - atomic_dec (&me_in_use_count); - PORTAL_SLAB_FREE(me, ptl_me_slab, sizeof(*me)); + PORTAL_FREE(me, sizeof(*me)); } static inline lib_msg_t * lib_msg_alloc(nal_cb_t *nal) { - /* ALWAYS called with statelock held */ + /* NEVER called with statelock held */ lib_msg_t *msg; - PORTAL_SLAB_ALLOC(msg, ptl_msg_slab, sizeof(*msg)); - if (msg == NULL) - return (NULL); - - atomic_inc (&msg_in_use_count); + PORTAL_ALLOC(msg, sizeof(*msg)); + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } return (msg); } @@ -248,8 +261,7 @@ static inline void lib_msg_free(nal_cb_t *nal, lib_msg_t *msg) { /* ALWAYS called with statelock held */ - atomic_dec (&msg_in_use_count); - PORTAL_SLAB_FREE(msg, ptl_msg_slab, sizeof(*msg)); + PORTAL_FREE(msg, sizeof(*msg)); } #endif @@ -348,26 +360,40 @@ extern char *dispatch_name(int index); * Call backs will be made to write events, send acks or * replies and so on. */ -extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); -extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern void lib_enq_event_locked (nal_cb_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev); +extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, int status); +extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private); extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd); -extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); +extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr); + extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); -extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len); -extern void lib_copy_buf2iov (int niov, struct iovec *iov, char *dest, ptl_size_t len); +extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_iov (int dst_niov, struct iovec *dst, + int src_niov, struct iovec *src, + ptl_size_t offset, ptl_size_t len); extern ptl_size_t lib_kiov_nob (int niov, ptl_kiov_t *iov); -extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *iov, ptl_size_t len); -extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *iov, char *src, ptl_size_t len); +extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, + ptl_size_t offset, ptl_size_t len); + extern void lib_assert_wire_constants (void); -extern void lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, - ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); -extern int lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - lib_md_t *md, ptl_size_t offset, ptl_size_t len); +extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, + ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); +extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + lib_md_t *md, ptl_size_t offset, ptl_size_t len); extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in, ptl_md_t * md_out); diff --git a/lnet/include/lnet/lib-nal.h b/lnet/include/lnet/lib-nal.h index 4052c0c..0bf557e 100644 --- a/lnet/include/lnet/lib-nal.h +++ b/lnet/include/lnet/lib-nal.h @@ -18,47 +18,60 @@ struct nal_cb_t { lib_ni_t ni; void *nal_data; /* - * send: Sends a preformatted header and user data to a - * specified remote process. - * Can overwrite iov. + * send: Sends a preformatted header and payload data to a + * specified remote process. The payload is scattered over 'niov' + * fragments described by iov, starting at 'offset' for 'mlen' + * bytes. + * NB the NAL may NOT overwrite iov. + * PTL_OK on success => NAL has committed to send and will call + * lib_finalize on completion */ - int (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t mlen); + ptl_err_t (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen); /* as send, but with a set of page fragments (NULL if not supported) */ - int (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, ptl_kiov_t *iov, size_t mlen); + ptl_err_t (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, ptl_kiov_t *iov, + size_t offset, size_t mlen); /* - * recv: Receives an incoming message from a remote process - * Type of iov depends on options. Can overwrite iov. + * recv: Receives an incoming message from a remote process. The + * payload is to be received into the scattered buffer of 'niov' + * fragments described by iov, starting at 'offset' for 'mlen' + * bytes. Payload bytes after 'mlen' up to 'rlen' are to be + * discarded. + * NB the NAL may NOT overwrite iov. + * PTL_OK on success => NAL has committed to receive and will call + * lib_finalize on completion */ - int (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - unsigned int niov, struct iovec *iov, size_t mlen, - size_t rlen); + ptl_err_t (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen); /* as recv, but with a set of page fragments (NULL if not supported) */ - int (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - unsigned int niov, ptl_kiov_t *iov, size_t mlen, - size_t rlen); + ptl_err_t (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + unsigned int niov, ptl_kiov_t *iov, + size_t offset, size_t mlen, size_t rlen); /* * read: Reads a block of data from a specified user address */ - int (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr, - user_ptr src_addr, size_t len); + ptl_err_t (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr, + user_ptr src_addr, size_t len); /* * write: Writes a block of data into a specified user address */ - int (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr, - void *src_addr, size_t len); + ptl_err_t (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr, + void *src_addr, size_t len); /* * callback: Calls an event callback + * NULL => lib calls eq's callback (if any) directly. */ - int (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev); + void (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq, + ptl_event_t *ev); /* * malloc: Acquire a block of memory in a system independent @@ -74,14 +87,14 @@ struct nal_cb_t { * type of *iov depends on options. * Set to NULL if not required. */ - int (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, - void **addrkey); + ptl_err_t (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, + void **addrkey); void (*cb_unmap) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, void **addrkey); /* as (un)map, but with a set of page fragments */ - int (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, - void **addrkey); + ptl_err_t (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, + void **addrkey); void (*cb_unmap_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, void **addrkey); diff --git a/lnet/include/lnet/lib-p30.h b/lnet/include/lnet/lib-p30.h index c402828..10a7bc7 100644 --- a/lnet/include/lnet/lib-p30.h +++ b/lnet/include/lnet/lib-p30.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -42,7 +41,7 @@ do { \ nal->cb_sti(nal, flagsp); \ } -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST #define MAX_MES 2048 #define MAX_MDS 2048 @@ -98,7 +97,7 @@ lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ unsigned long flags; @@ -142,8 +141,20 @@ lib_me_free (nal_cb_t *nal, lib_me_t *me) static inline lib_msg_t * lib_msg_alloc (nal_cb_t *nal) { - /* ALWAYS called with statelock held */ - return ((lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs)); + /* NEVER called with statelock held */ + unsigned long flags; + lib_msg_t *msg; + + state_lock (nal, &flags); + msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs); + state_unlock (nal, &flags); + + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } + return(msg); } static inline void @@ -155,26 +166,13 @@ lib_msg_free (nal_cb_t *nal, lib_msg_t *msg) #else -extern kmem_cache_t *ptl_md_slab; -extern kmem_cache_t *ptl_msg_slab; -extern kmem_cache_t *ptl_me_slab; -extern kmem_cache_t *ptl_eq_slab; -extern atomic_t md_in_use_count; -extern atomic_t msg_in_use_count; -extern atomic_t me_in_use_count; -extern atomic_t eq_in_use_count; - static inline lib_eq_t * lib_eq_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_eq_t *eq; - PORTAL_SLAB_ALLOC(eq, ptl_eq_slab, sizeof(*eq)); - - if (eq == NULL) - return (NULL); - atomic_inc (&eq_in_use_count); + PORTAL_ALLOC(eq, sizeof(*eq)); return (eq); } @@ -182,21 +180,34 @@ static inline void lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) { /* ALWAYS called with statelock held */ - atomic_dec (&eq_in_use_count); - PORTAL_SLAB_FREE(eq, ptl_eq_slab, sizeof(*eq)); + PORTAL_FREE(eq, sizeof(*eq)); } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ lib_md_t *md; - PORTAL_SLAB_ALLOC(md, ptl_md_slab, sizeof(*md)); - - if (md == NULL) - return (NULL); - - atomic_inc (&md_in_use_count); + int size; + int niov; + + if ((umd->options & PTL_MD_KIOV) != 0) { + niov = umd->niov; + size = offsetof(lib_md_t, md_iov.kiov[niov]); + } else { + niov = ((umd->options & PTL_MD_IOV) != 0) ? + umd->niov : 1; + size = offsetof(lib_md_t, md_iov.iov[niov]); + } + + PORTAL_ALLOC(md, size); + + if (md != NULL) { + /* Set here in case of early free */ + md->options = umd->options; + md->md_niov = niov; + } + return (md); } @@ -204,8 +215,14 @@ static inline void lib_md_free (nal_cb_t *nal, lib_md_t *md) { /* ALWAYS called with statelock held */ - atomic_dec (&md_in_use_count); - PORTAL_SLAB_FREE(md, ptl_md_slab, sizeof(*md)); + int size; + + if ((md->options & PTL_MD_KIOV) != 0) + size = offsetof(lib_md_t, md_iov.kiov[md->md_niov]); + else + size = offsetof(lib_md_t, md_iov.iov[md->md_niov]); + + PORTAL_FREE(md, size); } static inline lib_me_t * @@ -213,12 +230,8 @@ lib_me_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_me_t *me; - PORTAL_SLAB_ALLOC(me, ptl_me_slab, sizeof(*me)); - - if (me == NULL) - return (NULL); - atomic_inc (&me_in_use_count); + PORTAL_ALLOC(me, sizeof(*me)); return (me); } @@ -226,21 +239,21 @@ static inline void lib_me_free(nal_cb_t *nal, lib_me_t *me) { /* ALWAYS called with statelock held */ - atomic_dec (&me_in_use_count); - PORTAL_SLAB_FREE(me, ptl_me_slab, sizeof(*me)); + PORTAL_FREE(me, sizeof(*me)); } static inline lib_msg_t * lib_msg_alloc(nal_cb_t *nal) { - /* ALWAYS called with statelock held */ + /* NEVER called with statelock held */ lib_msg_t *msg; - PORTAL_SLAB_ALLOC(msg, ptl_msg_slab, sizeof(*msg)); - if (msg == NULL) - return (NULL); - - atomic_inc (&msg_in_use_count); + PORTAL_ALLOC(msg, sizeof(*msg)); + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } return (msg); } @@ -248,8 +261,7 @@ static inline void lib_msg_free(nal_cb_t *nal, lib_msg_t *msg) { /* ALWAYS called with statelock held */ - atomic_dec (&msg_in_use_count); - PORTAL_SLAB_FREE(msg, ptl_msg_slab, sizeof(*msg)); + PORTAL_FREE(msg, sizeof(*msg)); } #endif @@ -348,26 +360,40 @@ extern char *dispatch_name(int index); * Call backs will be made to write events, send acks or * replies and so on. */ -extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); -extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern void lib_enq_event_locked (nal_cb_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev); +extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, int status); +extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private); extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd); -extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); +extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr); + extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); -extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len); -extern void lib_copy_buf2iov (int niov, struct iovec *iov, char *dest, ptl_size_t len); +extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_iov (int dst_niov, struct iovec *dst, + int src_niov, struct iovec *src, + ptl_size_t offset, ptl_size_t len); extern ptl_size_t lib_kiov_nob (int niov, ptl_kiov_t *iov); -extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *iov, ptl_size_t len); -extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *iov, char *src, ptl_size_t len); +extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, + ptl_size_t offset, ptl_size_t len); + extern void lib_assert_wire_constants (void); -extern void lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, - ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); -extern int lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - lib_md_t *md, ptl_size_t offset, ptl_size_t len); +extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, + ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); +extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + lib_md_t *md, ptl_size_t offset, ptl_size_t len); extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in, ptl_md_t * md_out); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 30e56af..904204b 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -12,11 +12,11 @@ #include #ifdef __KERNEL__ -# define PTL_USE_SLAB_CACHE # include # include # include #else +# define PTL_USE_LIB_FREELIST # include #endif @@ -139,16 +139,9 @@ typedef struct { struct lib_msg_t { struct list_head msg_list; - int send_ack; lib_md_t *md; - ptl_nid_t nid; - ptl_pid_t pid; - ptl_event_t ev; ptl_handle_wire_t ack_wmd; - union { - struct iovec iov[PTL_MD_MAX_IOV]; - ptl_kiov_t kiov[PTL_MD_MAX_IOV]; - } msg_iov; + ptl_event_t ev; }; struct lib_ptl_t { @@ -212,9 +205,8 @@ struct lib_md_t { }; #define PTL_MD_FLAG_UNLINK (1 << 0) -#define PTL_MD_FLAG_AUTO_UNLINKED (1 << 1) -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST typedef struct { void *fl_objs; /* single contiguous array of objects */ @@ -262,7 +254,7 @@ typedef struct { struct list_head ni_test_peers; -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST lib_freelist_t ni_free_mes; lib_freelist_t ni_free_msgs; lib_freelist_t ni_free_mds; diff --git a/lnet/include/lnet/lnet.h b/lnet/include/lnet/lnet.h index a4ea39b..8b1495e 100644 --- a/lnet/include/lnet/lnet.h +++ b/lnet/include/lnet/lnet.h @@ -21,7 +21,6 @@ #include #include #include -#include #include extern int __p30_initialized; /* for libraries & test codes */ diff --git a/lnet/include/lnet/p30.h b/lnet/include/lnet/p30.h index a4ea39b..8b1495e 100644 --- a/lnet/include/lnet/p30.h +++ b/lnet/include/lnet/p30.h @@ -21,7 +21,6 @@ #include #include #include -#include #include extern int __p30_initialized; /* for libraries & test codes */ diff --git a/lnet/include/lnet/types.h b/lnet/include/lnet/types.h index e4ccebf..7ffe797 100644 --- a/lnet/include/lnet/types.h +++ b/lnet/include/lnet/types.h @@ -17,6 +17,8 @@ typedef u_int64_t __u64; # define do_gettimeofday(tv) gettimeofday(tv, NULL) #endif +#include + typedef __u64 ptl_nid_t; typedef __u32 ptl_pid_t; typedef __u32 ptl_pt_index_t; @@ -97,7 +99,8 @@ typedef enum { PTL_EVENT_PUT, PTL_EVENT_REPLY, PTL_EVENT_ACK, - PTL_EVENT_SENT + PTL_EVENT_SENT, + PTL_EVENT_UNLINK, } ptl_event_kind_t; #define PTL_SEQ_BASETYPE long @@ -112,15 +115,19 @@ typedef unsigned PTL_SEQ_BASETYPE ptl_seq_t; #pragma pack(push, 4) #endif typedef struct { - ptl_event_kind_t type; - ptl_process_id_t initiator; - ptl_pt_index_t portal; - ptl_match_bits_t match_bits; - ptl_size_t rlength, mlength, offset; - ptl_handle_me_t unlinked_me; - ptl_md_t mem_desc; - ptl_hdr_data_t hdr_data; - struct timeval arrival_time; + ptl_event_kind_t type; + ptl_err_t status; + int unlinked; + ptl_process_id_t initiator; + ptl_pt_index_t portal; + ptl_match_bits_t match_bits; + ptl_size_t rlength; + ptl_size_t mlength; + ptl_size_t offset; + ptl_md_t mem_desc; + ptl_hdr_data_t hdr_data; + struct timeval arrival_time; + volatile ptl_seq_t sequence; } ptl_event_t; #ifdef __CYGWIN__ diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index 53757ab..cdde5b7 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -353,8 +353,6 @@ int gmnal_cb_read(nal_cb_t *, void *private, void *, user_ptr, size_t); int gmnal_cb_write(nal_cb_t *, void *private, user_ptr, void *, size_t); -int gmnal_cb_callback(nal_cb_t *, void *, lib_eq_t *, ptl_event_t *); - void *gmnal_cb_malloc(nal_cb_t *, size_t); void gmnal_cb_free(nal_cb_t *, void *, size_t); @@ -384,7 +382,7 @@ void gmnal_fini(void); a->cb_recv_pages = gmnal_cb_recv_pages; \ a->cb_read = gmnal_cb_read; \ a->cb_write = gmnal_cb_write; \ - a->cb_callback = gmnal_cb_callback; \ + a->cb_callback = NULL; \ a->cb_malloc = gmnal_cb_malloc; \ a->cb_free = gmnal_cb_free; \ a->cb_map = NULL; \ diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index 6ae91db..e055242 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -126,7 +126,6 @@ int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, niov, iov, len); } else { CDEBUG(D_ERROR, "Large message send it is not supported\n"); - lib_finalize(nal_cb, private, cookie); return(PTL_FAIL); gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, niov, iov, len); @@ -200,18 +199,6 @@ int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, return(PTL_OK); } -int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - - if (eq->event_callback != NULL) { - CDEBUG(D_INFO, "found callback\n"); - eq->event_callback(ev); - } - - return(PTL_OK); -} - void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len) { void *ptr = NULL; diff --git a/lnet/klnds/iblnd/ibnal_cb.c b/lnet/klnds/iblnd/ibnal_cb.c index 2c07cc4..0688062 100644 --- a/lnet/klnds/iblnd/ibnal_cb.c +++ b/lnet/klnds/iblnd/ibnal_cb.c @@ -306,7 +306,7 @@ kibnal_send(nal_cb_t *nal, if(buf_length > MAX_MSG_SIZE) { CERROR("kibnal_send:request exceeds Transmit data size (%d).\n", MAX_MSG_SIZE); - rc = -1; + rc = PTL_FAIL; return rc; } else { @@ -363,7 +363,7 @@ kibnal_send(nal_cb_t *nal, PROF_FINISH(kibnal_send); // time stapm of send operation - rc = 1; + rc = PTL_OK; return rc; } @@ -386,7 +386,7 @@ int kibnal_send_pages(nal_cb_t * nal, ptl_kiov_t *iov, size_t mlen) { - int rc = 1; + int rc = PTL_FAIL; CDEBUG(D_NET, "kibnal_send_pages\n"); @@ -420,7 +420,7 @@ void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) // // do you need this // -int kibnal_callback(nal_cb_t * nal, +void kibnal_callback(nal_cb_t * nal, void *private, lib_eq_t *eq, ptl_event_t *ev) @@ -507,7 +507,7 @@ kibnal_recv_pages(nal_cb_t * nal, { CDEBUG(D_NET, "recv_pages not implemented\n"); - return PTL_OK; + return PTL_FAIL; } @@ -526,11 +526,12 @@ kibnal_recv(nal_cb_t *nal, CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen); /* What was actually received must be >= what sender claims to - * have sent. This is an LASSERT, since lib-move doesn't - * check cb return code yet. */ - LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen); + * have sent. */ LASSERT (mlen <= rlen); + if (krx->krx_len < sizeof (ptl_hdr_t) + rlen) + return (PTL_FAIL); + PROF_START(kibnal_recv); if(mlen != 0) { @@ -542,12 +543,12 @@ kibnal_recv(nal_cb_t *nal, PROF_START(lib_finalize); - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, cookie, PTL_OK); PROF_FINISH(lib_finalize); PROF_FINISH(kibnal_recv); - return rlen; + return PTL_OK; } // diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 43926c9..39a4ea4 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -30,7 +30,7 @@ * LIB functions follow * */ -static int +static ptl_err_t kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -38,10 +38,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -49,7 +49,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } static void * @@ -145,7 +145,7 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) } int -kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) +kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -157,9 +157,17 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); - + + /* skip complete frags before 'offset' */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = kiov->kiov_len; + int fraglen = kiov->kiov_len - offset; /* nob exactly spans the iovs */ LASSERT (fraglen <= nob); @@ -182,7 +190,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) /* XXX this is really crap, but we'll have to kmap until * EKC has a page (rather than vaddr) mapping interface */ - ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; CDEBUG(D_NET, "%p[%d] loading %p for %d, page %d, %d total\n", @@ -212,6 +220,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) kiov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -226,7 +235,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) } int -kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) +kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, + int niov, struct iovec *iov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -238,8 +248,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) LASSERT (niov > 0); LASSERT (nob > 0); + /* skip complete frags before offset */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = iov->iov_len; + int fraglen = iov->iov_len - offset; long npages = kqswnal_pages_spanned (iov->iov_base, fraglen); /* nob exactly spans the iovs */ @@ -260,12 +278,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) CDEBUG(D_NET, "%p[%d] loading %p for %d, pages %d for %ld, %d total\n", - ktx, nfrags, iov->iov_base, fraglen, basepage, npages, - nmapped); + ktx, nfrags, iov->iov_base + offset, fraglen, + basepage, npages, nmapped); elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, - iov->iov_base, fraglen, + iov->iov_base + offset, fraglen, basepage, &ktx->ktx_frags.iov[nfrags].Base); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; @@ -284,6 +302,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) iov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -410,7 +429,7 @@ void kqswnal_tx_done (kqswnal_tx_t *ktx, int error) { lib_msg_t *msg; - lib_msg_t *repmsg; + lib_msg_t *repmsg = NULL; switch (ktx->ktx_state) { case KTX_FORWARDING: /* router asked me to forward this packet */ @@ -420,22 +439,30 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) case KTX_SENDING: /* packet sourced locally */ lib_finalize (&kqswnal_lib, ktx->ktx_args[0], - (lib_msg_t *)ktx->ktx_args[1]); + (lib_msg_t *)ktx->ktx_args[1], + (error == 0) ? PTL_OK : + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); break; case KTX_GETTING: /* Peer has DMA-ed direct? */ LASSERT (KQSW_OPTIMIZE_GETS); msg = (lib_msg_t *)ktx->ktx_args[1]; - repmsg = NULL; - if (error == 0) + if (error == 0) { repmsg = lib_fake_reply_msg (&kqswnal_lib, ktx->ktx_nid, msg->md); + if (repmsg == NULL) + error = -ENOMEM; + } - lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg); - - if (repmsg != NULL) - lib_finalize (&kqswnal_lib, NULL, repmsg); + if (error == 0) { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], + msg, PTL_OK); + lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK); + } else { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg, + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + } break; default: @@ -461,7 +488,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) ktx->ktx_nid, status); kqswnal_notify_peer_down(ktx); - status = -EIO; + status = -EHOSTDOWN; } else if (ktx->ktx_state == KTX_GETTING) { /* RPC completed OK; what did our peer put in the status @@ -651,7 +678,8 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, int kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, - struct iovec *iov, ptl_kiov_t *kiov, int nob) + struct iovec *iov, ptl_kiov_t *kiov, + int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; char *buffer = (char *)page_address(krx->krx_pages[0]); @@ -682,9 +710,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, /* Ghastly hack part 1, uses the existing procedures to map the source data... */ ktx->ktx_nfrag = 0; if (kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); + rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov); else - rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov); + rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov); if (rc != 0) { CERROR ("Can't map source data: %d\n", rc); @@ -720,7 +748,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (-ECONNABORTED); } -static int +static ptl_err_t kqswnal_sendmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -731,6 +759,7 @@ kqswnal_sendmsg (nal_cb_t *nal, unsigned int payload_niov, struct iovec *payload_iov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { kqswnal_tx_t *ktx; @@ -739,6 +768,7 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM int i; kqsw_csum_t csum; + int sumoff; int sumnob; #endif @@ -802,9 +832,9 @@ kqswnal_sendmsg (nal_cb_t *nal, } /* peer expects RPC completion with GET data */ - rc = kqswnal_dma_reply (ktx, - payload_niov, payload_iov, - payload_kiov, payload_nob); + rc = kqswnal_dma_reply (ktx, payload_niov, + payload_iov, payload_kiov, + payload_offset, payload_nob); if (rc == 0) return (0); @@ -820,22 +850,39 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr)); memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum)); - for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) { + for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) { + LASSERT(i < niov); if (payload_kiov != NULL) { ptl_kiov_t *kiov = &payload_kiov[i]; - char *addr = ((char *)kmap (kiov->kiov_page)) + - kiov->kiov_offset; - - csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len)); - sumnob -= kiov->kiov_len; + + if (sumoff >= kiov->kiov_len) { + sumoff -= kiov->kiov_len; + } else { + char *addr = ((char *)kmap (kiov->kiov_page)) + + kiov->kiov_offset + sumoff; + int fragnob = kiov->kiov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + kunmap(kiov->kiov_page); + } } else { struct iovec *iov = &payload_iov[i]; - csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len)); - sumnob -= iov->iov_len; + if (sumoff > iov->iov_len) { + sumoff -= iov->iov_len; + } else { + char *addr = iov->iov_base + sumoff; + int fragnob = iov->iov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + } } } - memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); + memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif /* Set up first frag from pre-mapped buffer (it's at least the @@ -866,10 +913,10 @@ kqswnal_sendmsg (nal_cb_t *nal, * this (and eat my hat :) */ if ((libmsg->md->options & PTL_MD_KIOV) != 0) - rc = kqswnal_map_tx_kiov (ktx, md->length, + rc = kqswnal_map_tx_kiov (ktx, 0, md->length, md->md_niov, md->md_iov.kiov); else - rc = kqswnal_map_tx_iov (ktx, md->length, + rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); if (rc < 0) { @@ -894,18 +941,20 @@ kqswnal_sendmsg (nal_cb_t *nal, /* copy payload to ktx_buffer, immediately after hdr */ if (payload_kiov != NULL) lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_kiov, payload_nob); + payload_niov, payload_kiov, + payload_offset, payload_nob); else lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_iov, payload_nob); + payload_niov, payload_iov, + payload_offset, payload_nob); /* first frag includes payload */ ktx->ktx_frags.iov[0].Len += payload_nob; } else { if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, + rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, payload_niov, payload_kiov); else - rc = kqswnal_map_tx_iov (ktx, payload_nob, + rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); if (rc != 0) { kqswnal_put_idle_tx (ktx); @@ -930,7 +979,7 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_OK); } -static int +static ptl_err_t kqswnal_send (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -940,13 +989,15 @@ kqswnal_send (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, struct iovec *payload_iov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, payload_iov, NULL, payload_nob)); + payload_niov, payload_iov, NULL, + payload_offset, payload_nob)); } -static int +static ptl_err_t kqswnal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -956,10 +1007,12 @@ kqswnal_send_pages (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, payload_nob)); + payload_niov, NULL, payload_kiov, + payload_offset, payload_nob)); } int kqswnal_fwd_copy_contig = 0; @@ -1009,7 +1062,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) nob <= KQSW_TX_BUFFER_SIZE) { /* send from ktx's pre-allocated/mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); + lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ ktx->ktx_frags.iov[0].Len = nob; ktx->ktx_nfrag = 1; @@ -1019,7 +1072,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { /* zero copy */ ktx->ktx_nfrag = 0; /* no frags mapped yet */ - rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); + rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); if (rc != 0) goto failed; @@ -1079,7 +1132,8 @@ kqswnal_reply_complete (EP_RXD *rxd) krx->krx_rpc_completed = 1; kqswnal_requeue_rx (krx); - lib_finalize (&kqswnal_lib, NULL, msg); + lib_finalize (&kqswnal_lib, NULL, msg, + (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL); kqswnal_put_idle_tx (ktx); } @@ -1288,13 +1342,14 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) } #endif -static int +static ptl_err_t kqswnal_recvmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { @@ -1325,10 +1380,13 @@ kqswnal_recvmsg (nal_cb_t *nal, #endif CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen); - /* What was actually received must be >= payload. - * This is an LASSERT, as lib_finalize() doesn't have a completion status. */ - LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen); + /* What was actually received must be >= payload. */ LASSERT (mlen <= rlen); + if (krx->krx_nob < KQSW_HDR_SIZE + mlen) { + CERROR("Bad message size: have %d, need %d + %d\n", + krx->krx_nob, KQSW_HDR_SIZE, mlen); + return (PTL_FAIL); + } /* It must be OK to kmap() if required */ LASSERT (kiov == NULL || !in_interrupt ()); @@ -1343,20 +1401,37 @@ kqswnal_recvmsg (nal_cb_t *nal, page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); + if (kiov != NULL) { - iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; - iov_nob = kiov->kiov_len; + /* skip complete frags */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; + iov_nob = kiov->kiov_len - offset; } else { - iov_ptr = iov->iov_base; - iov_nob = iov->iov_len; + /* skip complete frags */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = iov->iov_base + offset; + iov_nob = iov->iov_len - offset; } - + for (;;) { - /* We expect the iov to exactly match mlen */ - LASSERT (iov_nob <= mlen); - - frag = MIN (page_nob, iov_nob); + frag = mlen; + if (frag > page_nob) + frag = page_nob; + if (frag > iov_nob) + frag = iov_nob; + memcpy (iov_ptr, page_ptr, frag); #if KQSW_CHECKSUM payload_csum = kqsw_csum (payload_csum, iov_ptr, frag); @@ -1415,35 +1490,41 @@ kqswnal_recvmsg (nal_cb_t *nal, "csum_nob %d\n", hdr_csum, payload_csum, csum_frags, csum_nob); #endif - lib_finalize(nal, private, libmsg); + lib_finalize(nal, private, libmsg, PTL_OK); kqswnal_requeue_rx (krx); - return (rlen); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_recv(nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, iov, NULL, + offset, mlen, rlen)); } -static int +static ptl_err_t kqswnal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, NULL, kiov, + offset, mlen, rlen)); } int diff --git a/lnet/klnds/scimaclnd/scimacnal_cb.c b/lnet/klnds/scimaclnd/scimacnal_cb.c index b31c2ea..52afb98 100644 --- a/lnet/klnds/scimaclnd/scimacnal_cb.c +++ b/lnet/klnds/scimaclnd/scimacnal_cb.c @@ -176,7 +176,8 @@ kscimacnal_txrelease(mac_mblk_t *msg, mac_msg_status_t status, void *context) break; } - lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie); + lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie, + (err == 0) ? PTL_OK : PTL_FAIL); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); } @@ -225,14 +226,14 @@ kscimacnal_sendmsg(nal_cb_t *nal, if (buf_len > mac_get_mtusize(ksci->ksci_machandle)) { CERROR("kscimacnal:request exceeds TX MTU size (%ld).\n", mac_get_mtusize(ksci->ksci_machandle)); - return -EINVAL; + return PTL_FAIL; } /* save transaction info for later finalize and cleanup */ PORTAL_ALLOC(ktx, (sizeof(kscimacnal_tx_t))); if (!ktx) { - return -ENOMEM; + return PTL_NOSPACE; } ktx->ktx_nmapped = 0; /* Start with no mapped pages :) */ @@ -247,7 +248,7 @@ kscimacnal_sendmsg(nal_cb_t *nal, kscimacnal_txrelease, ktx); if (!msg) { PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return -ENOMEM; + return PTL_NOSPACE; } mac_put_mblk(msg, sizeof(ptl_hdr_t)); lastblk=msg; @@ -284,7 +285,7 @@ kscimacnal_sendmsg(nal_cb_t *nal, if(!newblk) { mac_free_msg(msg); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return -ENOMEM; + return PTL_NOSPACE; } mac_put_mblk(newblk, nob); mac_link_mblk(lastblk, newblk); @@ -315,10 +316,10 @@ kscimacnal_sendmsg(nal_cb_t *nal, CERROR("kscimacnal: mac_send() failed, rc=%d\n", rc); mac_free_msg(msg); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return rc; + return PTL_FAIL; } - return 0; + return PTL_OK; } @@ -463,12 +464,15 @@ kscimacnal_recvmsg(nal_cb_t *nal, krx->msg, mlen, rlen, niov); /* What was actually received must be >= what sender claims to have - * sent. This is an LASSERT, since lib-move doesn't check cb return - * code yet. Also, rlen seems to be negative when mlen==0 so don't - * assert on that. - */ - LASSERT (mlen==0 || mac_msg_size(krx->msg) >= sizeof(ptl_hdr_t)+rlen); - LASSERT (mlen==0 || mlen <= rlen); + * sent. */ + LASSERT (mlen <= rlen); /* something is wrong if this isn't true */ + if (mac_msg_size(krx->msg) < sizeof(ptl_hdr_t)+mlen) { + /* We didn't receive everything lib thinks we did */ + CERROR("Bad message size: have %d, need %d + %d\n", + mac_msg_size(krx->msg), sizeof(ptl_hdr_t), mlen); + return (PTL_FAIL); + } + /* It must be OK to kmap() if required */ LASSERT (kiov == NULL || !in_interrupt ()); /* Either all pages or all vaddrs */ @@ -545,12 +549,12 @@ kscimacnal_recvmsg(nal_cb_t *nal, CDEBUG(D_NET, "Calling lib_finalize.\n"); PROF_START(lib_finalize); - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, cookie, PTL_OK); PROF_FINISH(lib_finalize); CDEBUG(D_NET, "Done.\n"); - return rlen; + return PTL_OK; } diff --git a/lnet/klnds/socklnd/socklnd.c b/lnet/klnds/socklnd/socklnd.c index f61a2bc..f583620 100644 --- a/lnet/klnds/socklnd/socklnd.c +++ b/lnet/klnds/socklnd/socklnd.c @@ -56,7 +56,7 @@ static ctl_table ksocknal_ctl_table[] = { &ksocknal_data.ksnd_eager_ack, sizeof (int), 0644, NULL, &proc_dointvec}, #if SOCKNAL_ZC - {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", + {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", &ksocknal_data.ksnd_zc_min_frag, sizeof (int), 0644, NULL, &proc_dointvec}, #endif @@ -996,15 +996,10 @@ ksocknal_destroy_conn (ksock_conn_t *conn) /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: -#if 0 - lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); -#else - CERROR ("Refusing to complete a partial receive from " - LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr); - CERROR ("This may hang communications and " - "prevent modules from unloading\n"); -#endif + CERROR ("Completing partial receive from "LPX64 + ", ip %08x, with error\n", + conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr); + lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL); break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 9e04712..dca9818 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -29,7 +29,7 @@ * LIB functions follow * */ -int +ptl_err_t ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -37,10 +37,10 @@ ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; + return PTL_OK; } -int +ptl_err_t ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -48,20 +48,7 @@ ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; -} - -int -ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - CDEBUG(D_NET, LPX64": callback eq %p ev %p\n", - nal->ni.nid, eq, ev); - - if (eq->event_callback != NULL) - eq->event_callback(ev); - - return 0; + return PTL_OK; } void * @@ -617,7 +604,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch) if (tx->tx_isfwd) { /* was a forwarded packet? */ kpr_fwd_done (&ksocknal_data.ksnd_router, - KSOCK_TX_2_KPR_FWD_DESC (tx), 0); + KSOCK_TX_2_KPR_FWD_DESC (tx), + (tx->tx_resid == 0) ? 0 : -ECONNABORTED); EXIT; return; } @@ -625,7 +613,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch) /* local send */ ltx = KSOCK_TX_2_KSOCK_LTX (tx); - lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie); + lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie, + (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL); ksocknal_free_ltx (ltx); EXIT; @@ -1011,7 +1000,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid) return (-EHOSTUNREACH); } -int +ptl_err_t ksocknal_sendmsg(nal_cb_t *nal, void *private, lib_msg_t *cookie, @@ -1022,6 +1011,7 @@ ksocknal_sendmsg(nal_cb_t *nal, unsigned int payload_niov, struct iovec *payload_iov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { ksock_ltx_t *ltx; @@ -1084,20 +1074,19 @@ ksocknal_sendmsg(nal_cb_t *nal, ltx->ltx_tx.tx_kiov = NULL; ltx->ltx_tx.tx_nkiov = 0; - ltx->ltx_tx.tx_niov = 1 + payload_niov; - - memcpy(ltx->ltx_iov + 1, payload_iov, - payload_niov * sizeof (*payload_iov)); - + ltx->ltx_tx.tx_niov = + 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1], + payload_niov, payload_iov, + payload_offset, payload_nob); } else { /* payload is all pages */ - ltx->ltx_tx.tx_kiov = ltx->ltx_kiov; - ltx->ltx_tx.tx_nkiov = payload_niov; - ltx->ltx_tx.tx_niov = 1; - memcpy(ltx->ltx_kiov, payload_kiov, - payload_niov * sizeof (*payload_kiov)); + ltx->ltx_tx.tx_kiov = ltx->ltx_kiov; + ltx->ltx_tx.tx_nkiov = + lib_extract_kiov(payload_niov, ltx->ltx_kiov, + payload_niov, payload_kiov, + payload_offset, payload_nob); } rc = ksocknal_launch_packet(<x->ltx_tx, nid); @@ -1108,28 +1097,28 @@ ksocknal_sendmsg(nal_cb_t *nal, return (PTL_FAIL); } -int +ptl_err_t ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int payload_niov, struct iovec *payload_iov, - size_t payload_len) + size_t payload_offset, size_t payload_len) { return (ksocknal_sendmsg(nal, private, cookie, hdr, type, nid, pid, payload_niov, payload_iov, NULL, - payload_len)); + payload_offset, payload_len)); } -int +ptl_err_t ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int payload_niov, ptl_kiov_t *payload_kiov, - size_t payload_len) + size_t payload_offset, size_t payload_len) { return (ksocknal_sendmsg(nal, private, cookie, hdr, type, nid, pid, payload_niov, NULL, payload_kiov, - payload_len)); + payload_offset, payload_len)); } void @@ -1576,7 +1565,7 @@ ksocknal_process_receive (ksock_conn_t *conn) case SOCKNAL_RX_BODY: /* payload all received */ - lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie); + lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK); /* Fall through */ case SOCKNAL_RX_SLOP: @@ -1612,9 +1601,10 @@ ksocknal_process_receive (ksock_conn_t *conn) return (-EINVAL); /* keep gcc happy */ } -int +ptl_err_t ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; @@ -1627,20 +1617,22 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_kiov = NULL; - conn->ksnc_rx_niov = niov; conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; - memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov)); + conn->ksnc_rx_niov = + lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov, + niov, iov, offset, mlen); LASSERT (mlen == lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); - return (rlen); + return (PTL_OK); } -int +ptl_err_t ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen) + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; @@ -1653,15 +1645,16 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_niov = 0; conn->ksnc_rx_iov = NULL; - conn->ksnc_rx_nkiov = niov; conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; - memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov)); + conn->ksnc_rx_nkiov = + lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov, + niov, kiov, offset, mlen); LASSERT (mlen == lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); - return (rlen); + return (PTL_OK); } int ksocknal_scheduler (void *arg) @@ -2681,7 +2674,6 @@ nal_cb_t ksocknal_lib = { cb_recv_pages: ksocknal_recv_pages, cb_read: ksocknal_read, cb_write: ksocknal_write, - cb_callback: ksocknal_callback, cb_malloc: ksocknal_malloc, cb_free: ksocknal_free, cb_printf: ksocknal_printf, diff --git a/lnet/klnds/toelnd/toenal_cb.c b/lnet/klnds/toelnd/toenal_cb.c index 3af9e33..50284b7 100644 --- a/lnet/klnds/toelnd/toenal_cb.c +++ b/lnet/klnds/toelnd/toenal_cb.c @@ -37,7 +37,7 @@ long ktoenal_packets_transmitted; * LIB functions follow * */ -int +ptl_err_t ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -45,10 +45,10 @@ ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; + return PTL_OK; } -int +ptl_err_t ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -56,20 +56,7 @@ ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; -} - -int -ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - CDEBUG(D_NET, LPX64": callback eq %p ev %p\n", - nal->ni.nid, eq, ev); - - if (eq->event_callback != NULL) - eq->event_callback(ev); - - return 0; + return PTL_OK; } void * @@ -322,7 +309,8 @@ ktoenal_process_transmit (ksock_conn_t *conn, unsigned long *irq_flags) { ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx); - lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie); + lib_finalize (&ktoenal_lib, ltx->ltx_private, + ltx->ltx_cookie, PTL_OK); spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags); @@ -400,10 +388,11 @@ ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx) spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags); } -int +ptl_err_t ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len) + unsigned int payload_niov, struct iovec *payload_iov, + size_t payload_off, size_t payload_len) { ptl_nid_t gatewaynid; ksock_conn_t *conn; @@ -427,6 +416,9 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, payload_niov > 0 ? payload_iov[0].iov_base : NULL, (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid); + /* XXX not implemented read-only iov with offset */ + LBUG(); + if ((conn = ktoenal_get_conn (nid)) == NULL) { /* It's not a peer; try to find a gateway */ @@ -435,14 +427,14 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, if (rc != 0) { CERROR ("Can't route to "LPX64": router error %d\n", nid, rc); - return (-1); + return (PTL_FAIL); } if ((conn = ktoenal_get_conn (gatewaynid)) == NULL) { CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n", nid, gatewaynid); - return (-1); + return (PTL_FAIL); } } @@ -457,7 +449,7 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, { CERROR ("Can't allocate tx desc\n"); ktoenal_put_conn (conn); - return (-1); + return (PTL_FAIL); } /* Init common (to sends and forwards) packet part */ @@ -483,7 +475,7 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, } ktoenal_launch_packet (conn, <x->ltx_tx); - return (0); + return (PTL_OK); } void @@ -893,7 +885,7 @@ ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags) case SOCKNAL_RX_BODY: atomic_inc (&ktoenal_packets_received); - lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */ + lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie, PTL_OK); /* packet is done now */ /* Fall through */ case SOCKNAL_RX_SLOP: @@ -934,13 +926,17 @@ ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags) list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns); } -int +ptl_err_t ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; int i; + /* XXX not implemented read-only iov with offset */ + LBUG(); + conn->ksnc_cookie = msg; LASSERT (niov <= PTL_MD_MAX_IOV); @@ -954,7 +950,7 @@ ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_nob_wanted = mlen; conn->ksnc_rx_nob_left = rlen; - return (rlen); + return (PTL_OK); } int @@ -1192,7 +1188,6 @@ nal_cb_t ktoenal_lib = { cb_recv: ktoenal_recv, cb_read: ktoenal_read, cb_write: ktoenal_write, - cb_callback: ktoenal_callback, cb_malloc: ktoenal_malloc, cb_free: ktoenal_free, cb_printf: ktoenal_printf, diff --git a/lnet/libcfs/module.c b/lnet/libcfs/module.c index 55e1935..a9d9786 100644 --- a/lnet/libcfs/module.c +++ b/lnet/libcfs/module.c @@ -817,9 +817,11 @@ EXPORT_SYMBOL(PtlMDBind); EXPORT_SYMBOL(lib_iov_nob); EXPORT_SYMBOL(lib_copy_iov2buf); EXPORT_SYMBOL(lib_copy_buf2iov); +EXPORT_SYMBOL(lib_extract_iov); EXPORT_SYMBOL(lib_kiov_nob); EXPORT_SYMBOL(lib_copy_kiov2buf); EXPORT_SYMBOL(lib_copy_buf2kiov); +EXPORT_SYMBOL(lib_extract_kiov); EXPORT_SYMBOL(lib_finalize); EXPORT_SYMBOL(lib_parse); EXPORT_SYMBOL(lib_fake_reply_msg); diff --git a/lnet/lnet/api-eq.c b/lnet/lnet/api-eq.c index e73d525..964b9d8 100644 --- a/lnet/lnet/api-eq.c +++ b/lnet/lnet/api-eq.c @@ -81,12 +81,6 @@ int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev) *ev = *new_event; - /* Set the unlinked_me interface number if there is one to pass - * back, since the NAL hasn't a clue what it is and therefore can't - * set it. */ - if (!PtlHandleEqual (ev->unlinked_me, PTL_HANDLE_NONE)) - ev->unlinked_me.nal_idx = eventq.nal_idx; - /* ensure event is delivered correctly despite possible races with lib_finalize */ if (eq->sequence != new_event->sequence) { diff --git a/lnet/lnet/api-errno.c b/lnet/lnet/api-errno.c index 026c93b..b5e7aa1 100644 --- a/lnet/lnet/api-errno.c +++ b/lnet/lnet/api-errno.c @@ -50,6 +50,5 @@ const char *ptl_err_str[] = { "PTL_IOV_TOO_SMALL", "PTL_EQ_INUSE", - "PTL_MD_INUSE" }; /* If you change these, you must update the number table in portals/errno.h */ diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index b2e069e..18eea91 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -125,7 +125,7 @@ int PtlNIInit(ptl_interface_t interface, ptl_pt_index_t ptl_size, if (ptl_interfaces[i] == nal) { nal->refct++; handle->nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | i; - fprintf(stderr, "Returning existing NAL (%d)\n", i); + CDEBUG(D_OTHER, "Returning existing NAL (%d)\n", i); ptl_ni_init_mutex_exit (); return PTL_OK; } diff --git a/lnet/lnet/api-wrap.c b/lnet/lnet/api-wrap.c index e54707f..d23a6aa 100644 --- a/lnet/lnet/api-wrap.c +++ b/lnet/lnet/api-wrap.c @@ -32,7 +32,7 @@ static int do_forward(ptl_handle_any_t any_h, int cmd, void *argbuf, nal_t *nal; if (!ptl_init) { - fprintf(stderr, "PtlGetId: Not initialized\n"); + CERROR("Not initialized\n"); return PTL_NOINIT; } @@ -262,7 +262,7 @@ static int validate_md(ptl_handle_any_t current_in, ptl_md_t md_in) int i; if (!ptl_init) { - fprintf(stderr, "PtlMDAttach/Bind/Update: Not initialized\n"); + CERROR("PtlMDAttach/Bind/Update: Not initialized\n"); return PTL_NOINIT; } diff --git a/lnet/lnet/lib-init.c b/lnet/lnet/lib-init.c index ab223d6..d4d8860 100644 --- a/lnet/lnet/lib-init.c +++ b/lnet/lnet/lib-init.c @@ -38,125 +38,17 @@ # include #endif -#ifdef PTL_USE_SLAB_CACHE -static int ptl_slab_users; - -kmem_cache_t *ptl_md_slab; -kmem_cache_t *ptl_msg_slab; -kmem_cache_t *ptl_me_slab; -kmem_cache_t *ptl_eq_slab; - -atomic_t md_in_use_count; -atomic_t msg_in_use_count; -atomic_t me_in_use_count; -atomic_t eq_in_use_count; - -/* NB zeroing in ctor and on freeing ensures items that - * kmem_cache_validate() OK, but haven't been initialised - * as an MD/ME/EQ can't have valid handles - */ -static void -ptl_md_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_md_t)); -} - -static void -ptl_me_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_me_t)); -} - -static void -ptl_eq_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_eq_t)); -} +#ifndef PTL_USE_LIB_FREELIST int kportal_descriptor_setup (nal_cb_t *nal) { - /* NB on failure caller must still call kportal_descriptor_cleanup */ - /* ****** */ - - /* We'll have 1 set of slabs for ALL the nals :) */ - - if (ptl_slab_users++) - return 0; - - ptl_md_slab = kmem_cache_create("portals_MD", - sizeof(lib_md_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_md_slab_ctor, NULL); - if (!ptl_md_slab) { - CERROR("couldn't allocate ptl_md_t slab"); - RETURN (PTL_NOSPACE); - } - - /* NB no ctor for msgs; they don't need handle verification */ - ptl_msg_slab = kmem_cache_create("portals_MSG", - sizeof(lib_msg_t), 0, - SLAB_HWCACHE_ALIGN, - NULL, NULL); - if (!ptl_msg_slab) { - CERROR("couldn't allocate ptl_msg_t slab"); - RETURN (PTL_NOSPACE); - } - - ptl_me_slab = kmem_cache_create("portals_ME", - sizeof(lib_me_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_me_slab_ctor, NULL); - if (!ptl_me_slab) { - CERROR("couldn't allocate ptl_me_t slab"); - RETURN (PTL_NOSPACE); - } - - ptl_eq_slab = kmem_cache_create("portals_EQ", - sizeof(lib_eq_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_eq_slab_ctor, NULL); - if (!ptl_eq_slab) { - CERROR("couldn't allocate ptl_eq_t slab"); - RETURN (PTL_NOSPACE); - } - - RETURN(PTL_OK); + return PTL_OK; } void kportal_descriptor_cleanup (nal_cb_t *nal) { - int rc; - - if (--ptl_slab_users != 0) - return; - - LASSERT (atomic_read (&md_in_use_count) == 0); - LASSERT (atomic_read (&me_in_use_count) == 0); - LASSERT (atomic_read (&eq_in_use_count) == 0); - LASSERT (atomic_read (&msg_in_use_count) == 0); - - if (ptl_md_slab != NULL) { - rc = kmem_cache_destroy(ptl_md_slab); - if (rc != 0) - CERROR("unable to free MD slab\n"); - } - if (ptl_msg_slab != NULL) { - rc = kmem_cache_destroy(ptl_msg_slab); - if (rc != 0) - CERROR("unable to free MSG slab\n"); - } - if (ptl_me_slab != NULL) { - rc = kmem_cache_destroy(ptl_me_slab); - if (rc != 0) - CERROR("unable to free ME slab\n"); - } - if (ptl_eq_slab != NULL) { - rc = kmem_cache_destroy(ptl_eq_slab); - if (rc != 0) - CERROR("unable to free EQ slab\n"); - } } #else diff --git a/lnet/lnet/lib-md.c b/lnet/lnet/lib-md.c index be6949c..a1ed583 100644 --- a/lnet/lnet/lib-md.c +++ b/lnet/lnet/lib-md.c @@ -83,7 +83,7 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private, int rc; int i; - /* NB we are passes an allocated, but uninitialised/active md. + /* NB we are passed an allocated, but uninitialised/active md. * if we return success, caller may lib_md_unlink() it. * otherwise caller may only lib_md_free() it. */ @@ -94,9 +94,10 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private, return PTL_INV_EQ; } - if ((md->options & PTL_MD_IOV) != 0 && /* discontiguous MD */ - md->niov > PTL_MD_MAX_IOV) /* too many fragments */ - return PTL_IOV_TOO_MANY; + /* Must check this _before_ allocation. Also, note that non-iov + * MDs must set md_niov to 0. */ + LASSERT((md->options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0 || + md->niov <= PTL_MD_MAX_IOV); if ((md->options & max_size_opts) != 0 && /* max size used */ (md->max_size < 0 || md->max_size > md->length)) // illegal max_size @@ -239,7 +240,11 @@ int do_PtlMDAttach(nal_cb_t * nal, void *private, void *v_args, void *v_ret) lib_md_t *md; unsigned long flags; - md = lib_md_alloc (nal); + if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 && + args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */ + return (ret->rc = PTL_IOV_TOO_MANY); + + md = lib_md_alloc(nal, &args->md_in); if (md == NULL) return (ret->rc = PTL_NOSPACE); @@ -287,7 +292,11 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret) lib_md_t *md; unsigned long flags; - md = lib_md_alloc (nal); + if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 && + args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */ + return (ret->rc = PTL_IOV_TOO_MANY); + + md = lib_md_alloc(nal, &args->md_in); if (md == NULL) return (ret->rc = PTL_NOSPACE); @@ -311,34 +320,43 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret) int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *v_args, void *v_ret) { - PtlMDUnlink_in *args = v_args; + PtlMDUnlink_in *args = v_args; PtlMDUnlink_out *ret = v_ret; - - lib_md_t *md; - unsigned long flags; + ptl_event_t ev; + lib_md_t *md; + unsigned long flags; state_lock(nal, &flags); md = ptl_handle2md(&args->md_in, nal); if (md == NULL) { - ret->rc = PTL_INV_MD; - } else if (md->pending != 0) { /* being filled/spilled */ - ret->rc = PTL_MD_INUSE; - } else { - /* Callers attempting to unlink a busy MD which will get - * unlinked once the net op completes should see INUSE, - * before completion and INV_MD thereafter. LASSERT we've - * got that right... */ - LASSERT ((md->md_flags & PTL_MD_FLAG_UNLINK) == 0); - - lib_md_deconstruct(nal, md, &ret->status_out); - lib_md_unlink(nal, md); - ret->rc = PTL_OK; + state_unlock(nal, &flags); + return (ret->rc = PTL_INV_MD); + } + + /* If the MD is busy, lib_md_unlink just marks it for deletion, and + * when the NAL is done, the completion event flags that the MD was + * unlinked. Otherwise, we enqueue an event now... */ + + if (md->eq != NULL && + md->pending == 0) { + memset(&ev, 0, sizeof(ev)); + + ev.type = PTL_EVENT_UNLINK; + ev.status = PTL_OK; + ev.unlinked = 1; + lib_md_deconstruct(nal, md, &ev.mem_desc); + + lib_enq_event_locked(nal, private, md->eq, &ev); } + lib_md_deconstruct(nal, md, &ret->status_out); + lib_md_unlink(nal, md); + ret->rc = PTL_OK; + state_unlock(nal, &flags); - return (ret->rc); + return (PTL_OK); } int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args, @@ -379,6 +397,23 @@ int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args, goto out; } + /* XXX fttb, the new MD must be the same type wrt fragmentation */ + if (((new->options ^ md->options) & + (PTL_MD_IOV | PTL_MD_KIOV)) != 0) { + ret->rc = PTL_INV_MD; + goto out; + } + + if (new->niov > md->md_niov) { + ret->rc = PTL_IOV_TOO_MANY; + goto out; + } + + if (new->niov < md->md_niov) { + ret->rc = PTL_IOV_TOO_SMALL; + goto out; + } + if (!PtlHandleEqual (args->testq_in, PTL_EQ_NONE)) { test_eq = ptl_handle2eq(&args->testq_in, nal); if (test_eq == NULL) { diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 6e904ba..b0a2c947 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -258,55 +258,78 @@ lib_iov_nob (int niov, struct iovec *iov) } void -lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len) +lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, + ptl_size_t offset, ptl_size_t len) { ptl_size_t nob; - while (len > 0) - { + if (len == 0) + return; + + /* skip complete frags before 'offset' */ + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + + do { LASSERT (niov > 0); - nob = MIN (iov->iov_len, len); - memcpy (dest, iov->iov_base, nob); + nob = MIN (iov->iov_len - offset, len); + memcpy (dest, iov->iov_base + offset, nob); len -= nob; dest += nob; niov--; iov++; - } + offset = 0; + } while (len > 0); } void -lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len) +lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, + char *src, ptl_size_t len) { ptl_size_t nob; - while (len > 0) - { + if (len == 0) + return; + + /* skip complete frags before 'offset' */ + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; LASSERT (niov > 0); - nob = MIN (iov->iov_len, len); - memcpy (iov->iov_base, src, nob); + } + + do { + LASSERT (niov > 0); + nob = MIN (iov->iov_len - offset, len); + memcpy (iov->iov_base + offset, src, nob); len -= nob; src += nob; niov--; iov++; - } + offset = 0; + } while (len > 0); } -static int -lib_extract_iov (struct iovec *dst, lib_md_t *md, +int +lib_extract_iov (int dst_niov, struct iovec *dst, + int src_niov, struct iovec *src, ptl_size_t offset, ptl_size_t len) { /* Initialise 'dst' to the subset of 'src' starting at 'offset', * for exactly 'len' bytes, and return the number of entries. * NB not destructive to 'src' */ - int src_niov = md->md_niov; - struct iovec *src = md->md_iov.iov; ptl_size_t frag_len; - int dst_niov; + int niov; - LASSERT (offset + len <= md->length); - if (len == 0) /* no data => */ return (0); /* no frags */ @@ -318,17 +341,17 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md, LASSERT (src_niov > 0); } - dst_niov = 1; + niov = 1; for (;;) { LASSERT (src_niov > 0); - LASSERT (dst_niov <= PTL_MD_MAX_IOV); + LASSERT (niov <= dst_niov); frag_len = src->iov_len - offset; dst->iov_base = ((char *)src->iov_base) + offset; if (len <= frag_len) { dst->iov_len = len; - return (dst_niov); + return (niov); } dst->iov_len = frag_len; @@ -336,7 +359,7 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md, len -= frag_len; dst++; src++; - dst_niov++; + niov++; src_niov--; offset = 0; } @@ -351,19 +374,22 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov) } void -lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) +lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len) { LASSERT (0); } void -lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len) +lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len) { LASSERT (0); } -static int -lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, +int +lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, ptl_size_t offset, ptl_size_t len) { LASSERT (0); @@ -383,18 +409,30 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov) } void -lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) +lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len) { ptl_size_t nob; char *addr; + + if (len == 0) + return; LASSERT (!in_interrupt ()); - while (len > 0) - { + + LASSERT (niov > 0); + while (offset > kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + + do{ LASSERT (niov > 0); - nob = MIN (kiov->kiov_len, len); + nob = MIN (kiov->kiov_len - offset, len); - addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; memcpy (dest, addr, nob); kunmap (kiov->kiov_page); @@ -402,22 +440,35 @@ lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) dest += nob; niov--; kiov++; - } + offset = 0; + } while (len > 0); } void -lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len) +lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len) { ptl_size_t nob; char *addr; + if (len == 0) + return; + LASSERT (!in_interrupt ()); - while (len > 0) - { + + LASSERT (niov > 0); + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + + do { LASSERT (niov > 0); - nob = MIN (kiov->kiov_len, len); + nob = MIN (kiov->kiov_len - offset, len); - addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; memcpy (addr, src, nob); kunmap (kiov->kiov_page); @@ -425,23 +476,21 @@ lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len) src += nob; niov--; kiov++; - } + offset = 0; + } while (len > 0); } -static int -lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, +int +lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, ptl_size_t offset, ptl_size_t len) { /* Initialise 'dst' to the subset of 'src' starting at 'offset', * for exactly 'len' bytes, and return the number of entries. * NB not destructive to 'src' */ - int src_niov = md->md_niov; - ptl_kiov_t *src = md->md_iov.kiov; ptl_size_t frag_len; - int dst_niov; + int niov; - LASSERT (offset + len <= md->length); - if (len == 0) /* no data => */ return (0); /* no frags */ @@ -453,10 +502,10 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, LASSERT (src_niov > 0); } - dst_niov = 1; + niov = 1; for (;;) { LASSERT (src_niov > 0); - LASSERT (dst_niov <= PTL_MD_MAX_IOV); + LASSERT (niov <= dst_niov); frag_len = src->kiov_len - offset; dst->kiov_page = src->kiov_page; @@ -465,7 +514,7 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, if (len <= frag_len) { dst->kiov_len = len; LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE); - return (dst_niov); + return (niov); } dst->kiov_len = frag_len; @@ -474,73 +523,66 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, len -= frag_len; dst++; src++; - dst_niov++; + niov++; src_niov--; offset = 0; } } #endif -void +ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen) { - int niov; - if (mlen == 0) - nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen); - else if ((md->options & PTL_MD_KIOV) == 0) { - niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen); - nal->cb_recv (nal, private, msg, - niov, msg->msg_iov.iov, mlen, rlen); - } else { - niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen); - nal->cb_recv_pages (nal, private, msg, - niov, msg->msg_iov.kiov, mlen, rlen); - } + return (nal->cb_recv(nal, private, msg, + 0, NULL, + offset, mlen, rlen)); + + if ((md->options & PTL_MD_KIOV) == 0) + return (nal->cb_recv(nal, private, msg, + md->md_niov, md->md_iov.iov, + offset, mlen, rlen)); + + return (nal->cb_recv_pages(nal, private, msg, + md->md_niov, md->md_iov.kiov, + offset, mlen, rlen)); } -int +ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, lib_md_t *md, ptl_size_t offset, ptl_size_t len) { - int niov; - if (len == 0) - return (nal->cb_send (nal, private, msg, - hdr, type, nid, pid, - 0, NULL, 0)); + return (nal->cb_send(nal, private, msg, + hdr, type, nid, pid, + 0, NULL, + offset, len)); - if ((md->options & PTL_MD_KIOV) == 0) { - niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len); - return (nal->cb_send (nal, private, msg, - hdr, type, nid, pid, - niov, msg->msg_iov.iov, len)); - } - - niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len); - return (nal->cb_send_pages (nal, private, msg, - hdr, type, nid, pid, - niov, msg->msg_iov.kiov, len)); + if ((md->options & PTL_MD_KIOV) == 0) + return (nal->cb_send(nal, private, msg, + hdr, type, nid, pid, + md->md_niov, md->md_iov.iov, + offset, len)); + + return (nal->cb_send_pages(nal, private, msg, + hdr, type, nid, pid, + md->md_niov, md->md_iov.kiov, + offset, len)); } -static lib_msg_t * -get_new_msg (nal_cb_t *nal, lib_md_t *md) +static void +lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg) { /* ALWAYS called holding the state_lock */ lib_counters_t *counters = &nal->ni.counters; - lib_msg_t *msg = lib_msg_alloc (nal); - - if (msg == NULL) - return (NULL); - - memset (msg, 0, sizeof (*msg)); - - msg->send_ack = 0; + /* Here, we commit the MD to a network OP by marking it busy and + * decrementing its threshold. Come what may, the network "owns" + * the MD until a call to lib_finalize() signals completion. */ msg->md = md; - do_gettimeofday(&msg->ev.arrival_time); + md->pending++; if (md->threshold != PTL_MD_THRESH_INF) { LASSERT (md->threshold > 0); @@ -552,8 +594,24 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md) counters->msgs_max = counters->msgs_alloc; list_add (&msg->msg_list, &nal->ni.ni_active_msgs); +} - return (msg); +static void +lib_drop_message (nal_cb_t *nal, void *private, ptl_hdr_t *hdr) +{ + unsigned long flags; + + /* CAVEAT EMPTOR: this only drops messages that we've not committed + * to receive (init_msg() not called) and therefore can't cause an + * event. */ + + state_lock(nal, &flags); + nal->ni.counters.drop_count++; + nal->ni.counters.drop_length += hdr->payload_length; + state_unlock(nal, &flags); + + /* NULL msg => if NAL calls lib_finalize it will be a noop */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); } /* @@ -563,17 +621,18 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md) * of long messages. * */ -static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; ptl_size_t mlength = 0; ptl_size_t offset = 0; int unlink = 0; + ptl_err_t rc; lib_me_t *me; lib_md_t *md; - lib_msg_t *msg; unsigned long flags; - + /* Convert put fields to host byte order */ hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits); hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index); @@ -586,8 +645,10 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->payload_length, hdr->msg.put.offset, hdr->msg.put.match_bits, &mlength, &offset, &unlink); - if (me == NULL) - goto drop; + if (me == NULL) { + state_unlock(nal, &flags); + return (PTL_FAIL); + } md = me->md; CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d " @@ -595,69 +656,46 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, md->md_lh.lh_cookie, md->md_niov, offset); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); + + msg->ev.type = PTL_EVENT_PUT; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.portal = hdr->msg.put.ptl_index; + msg->ev.match_bits = hdr->msg.put.match_bits; + msg->ev.rlength = hdr->payload_length; + msg->ev.mlength = mlength; + msg->ev.offset = offset; + msg->ev.hdr_data = hdr->msg.put.hdr_data; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) && !(md->options & PTL_MD_ACK_DISABLE)) { - msg->send_ack = 1; msg->ack_wmd = hdr->msg.put.ack_wmd; - msg->nid = hdr->src_nid; - msg->pid = hdr->src_pid; - msg->ev.match_bits = hdr->msg.put.match_bits; - } - - if (md->eq) { - msg->ev.type = PTL_EVENT_PUT; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.portal = hdr->msg.put.ptl_index; - msg->ev.match_bits = hdr->msg.put.match_bits; - msg->ev.rlength = hdr->payload_length; - msg->ev.mlength = mlength; - msg->ev.offset = offset; - msg->ev.hdr_data = hdr->msg.put.hdr_data; - - /* NB if this match has exhausted the MD, we can't be sure - * that this event will the the last one associated with - * this MD in the event queue (another message already - * matching this ME/MD could end up being last). So we - * remember the ME handle anyway and check again when we're - * allocating our slot in the event queue. - */ - ptl_me2handle (&msg->ev.unlinked_me, me); - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); } ni->counters.recv_count++; ni->counters.recv_length += mlength; - /* only unlink after MD's pending count has been bumped - * in get_new_msg() otherwise lib_me_unlink() will nuke it */ - if (unlink) { - md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED; + /* only unlink after MD's pending count has been bumped in + * lib_commit_md() otherwise lib_me_unlink() will nuke it */ + if (unlink) lib_me_unlink (nal, me); - } state_unlock(nal, &flags); - lib_recv (nal, private, msg, md, offset, mlength, hdr->payload_length); - return 0; + rc = lib_recv(nal, private, msg, md, offset, mlength, + hdr->payload_length); + if (rc != PTL_OK) + CERROR(LPU64": error on receiving PUT from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); - drop: - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + return (rc); } -static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; ptl_size_t mlength = 0; @@ -665,7 +703,6 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) int unlink = 0; lib_me_t *me; lib_md_t *md; - lib_msg_t *msg; ptl_hdr_t reply; unsigned long flags; int rc; @@ -683,8 +720,10 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->msg.get.sink_length, hdr->msg.get.src_offset, hdr->msg.get.match_bits, &mlength, &offset, &unlink); - if (me == NULL) - goto drop; + if (me == NULL) { + state_unlock(nal, &flags); + return (PTL_FAIL); + } md = me->md; CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d " @@ -692,45 +731,27 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, md->md_lh.lh_cookie, md->md_niov, offset); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_GET; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.portal = hdr->msg.get.ptl_index; - msg->ev.match_bits = hdr->msg.get.match_bits; - msg->ev.rlength = hdr->payload_length; - msg->ev.mlength = mlength; - msg->ev.offset = offset; - msg->ev.hdr_data = 0; - - /* NB if this match has exhausted the MD, we can't be sure - * that this event will the the last one associated with - * this MD in the event queue (another message already - * matching this ME/MD could end up being last). So we - * remember the ME handle anyway and check again when we're - * allocating our slot in the event queue. - */ - ptl_me2handle (&msg->ev.unlinked_me, me); - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + msg->ev.type = PTL_EVENT_GET; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.portal = hdr->msg.get.ptl_index; + msg->ev.match_bits = hdr->msg.get.match_bits; + msg->ev.rlength = hdr->payload_length; + msg->ev.mlength = mlength; + msg->ev.offset = offset; + msg->ev.hdr_data = 0; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.send_count++; ni->counters.send_length += mlength; - /* only unlink after MD's refcount has been bumped - * in get_new_msg() otherwise lib_me_unlink() will nuke it */ - if (unlink) { - md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED; + /* only unlink after MD's refcount has been bumped in + * lib_commit_md() otherwise lib_me_unlink() will nuke it */ + if (unlink) lib_me_unlink (nal, me); - } state_unlock(nal, &flags); @@ -746,36 +767,25 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, hdr->src_nid, hdr->src_pid, md, offset, mlength); - if (rc != PTL_OK) { - CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n", - ni->nid, hdr->src_nid); - /* Hmm, this will create a GET event and make believe - * the reply completed, which it kind of did, only the - * source won't get her reply */ - lib_finalize (nal, private, msg); - state_lock (nal, &flags); - goto drop; - } + if (rc != PTL_OK) + CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); + + /* Discard any junk after the hdr */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); - /* Complete the incoming message */ - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); return (rc); - drop: - ni->counters.drop_count++; - ni->counters.drop_length += hdr->msg.get.sink_length; - state_unlock(nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; } -static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; lib_md_t *md; int rlength; int length; - lib_msg_t *msg; unsigned long flags; + ptl_err_t rc; state_lock(nal, &flags); @@ -787,7 +797,9 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) md == NULL ? "invalid" : "inactive", hdr->msg.reply.dst_wmd.wh_interface_cookie, hdr->msg.reply.dst_wmd.wh_object_cookie); - goto drop; + + state_unlock(nal, &flags); + return (PTL_FAIL); } LASSERT (md->offset == 0); @@ -801,7 +813,8 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) ni->nid, hdr->src_nid, length, hdr->msg.reply.dst_wmd.wh_object_cookie, md->length); - goto drop; + state_unlock(nal, &flags); + return (PTL_FAIL); } length = md->length; } @@ -810,46 +823,36 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, length, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping REPLY from "LPU64": can't " - "allocate msg\n", ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_REPLY; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.rlength = rlength; - msg->ev.mlength = length; - msg->ev.offset = 0; + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.rlength = rlength; + msg->ev.mlength = length; + msg->ev.offset = 0; - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.recv_count++; ni->counters.recv_length += length; state_unlock(nal, &flags); - lib_recv (nal, private, msg, md, 0, length, rlength); - return 0; + rc = lib_recv(nal, private, msg, md, 0, length, rlength); + if (rc != PTL_OK) + CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); - drop: - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + return (rc); } -static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_ack(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { - lib_ni_t *ni = &nal->ni; - lib_md_t *md; - lib_msg_t *msg = NULL; - unsigned long flags; + lib_ni_t *ni = &nal->ni; + lib_md_t *md; + unsigned long flags; /* Convert ack fields to host byte order */ hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits); @@ -865,40 +868,37 @@ static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) (md == NULL) ? "invalid" : "inactive", hdr->msg.ack.dst_wmd.wh_interface_cookie, hdr->msg.ack.dst_wmd.wh_object_cookie); - goto drop; + + state_unlock(nal, &flags); + return (PTL_FAIL); } CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n", ni->nid, hdr->src_nid, hdr->msg.ack.dst_wmd.wh_object_cookie); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_ACK; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.mlength = hdr->msg.ack.mlength; - msg->ev.match_bits = hdr->msg.ack.match_bits; + msg->ev.type = PTL_EVENT_ACK; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.mlength = hdr->msg.ack.mlength; + msg->ev.match_bits = hdr->msg.ack.match_bits; - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.recv_count++; - state_unlock(nal, &flags); - lib_recv (nal, private, msg, NULL, 0, 0, hdr->payload_length); - return 0; - drop: - nal->ni.counters.drop_count++; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + state_unlock(nal, &flags); + + /* We have received and matched up the ack OK, create the + * completion event now... */ + lib_finalize(nal, private, msg, PTL_OK); + + /* ...and now discard any junk after the hdr */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); + + return (PTL_OK); } static char * @@ -980,10 +980,13 @@ void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr) } /* end of print_hdr() */ -int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +void +lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private) { unsigned long flags; - + ptl_err_t rc; + lib_msg_t *msg; + /* convert common fields to host byte order */ hdr->dest_nid = NTOH__u64 (hdr->dest_nid); hdr->src_nid = NTOH__u64 (hdr->src_nid); @@ -1005,22 +1008,16 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) nal->ni.nid, mv->magic, mv->version_major, mv->version_minor, hdr->src_nid); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } if (hdr->dest_nid != nal->ni.nid) { CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64 " (not me)\n", nal->ni.nid, hdr_type_string (hdr), hdr->src_nid, hdr->dest_nid); - - state_lock (nal, &flags); - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ @@ -1030,34 +1027,59 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) ": simulated failure\n", nal->ni.nid, hdr_type_string (hdr), hdr->src_nid); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } - + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping incoming %s from "LPU64 + ": can't allocate a lib_msg_t\n", + nal->ni.nid, hdr_type_string (hdr), + hdr->src_nid); + lib_drop_message(nal, private, hdr); + return; + } + + do_gettimeofday(&msg->ev.arrival_time); + switch (hdr->type) { case PTL_MSG_ACK: - return (parse_ack(nal, hdr, private)); + rc = parse_ack(nal, hdr, private, msg); + break; case PTL_MSG_PUT: - return (parse_put(nal, hdr, private)); + rc = parse_put(nal, hdr, private, msg); break; case PTL_MSG_GET: - return (parse_get(nal, hdr, private)); + rc = parse_get(nal, hdr, private, msg); break; case PTL_MSG_REPLY: - return (parse_reply(nal, hdr, private)); + rc = parse_reply(nal, hdr, private, msg); break; default: CERROR(LPU64": Dropping message from "LPU64 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid, hdr->type); - - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + rc = PTL_FAIL; + break; + } + + if (rc != PTL_OK) { + if (msg->md != NULL) { + /* committed... */ + lib_finalize(nal, private, msg, rc); + } else { + state_lock(nal, &flags); + lib_msg_free(nal, msg); /* expects state_lock held */ + state_unlock(nal, &flags); + + lib_drop_message(nal, private, hdr); + } } } - -int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) +int +do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret) { /* * Incoming: @@ -1072,16 +1094,15 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) * Outgoing: */ - PtlPut_in *args = v_args; - PtlPut_out *ret = v_ret; - ptl_hdr_t hdr; - - lib_ni_t *ni = &nal->ni; - lib_md_t *md; - lib_msg_t *msg = NULL; + PtlPut_in *args = v_args; ptl_process_id_t *id = &args->target_in; - unsigned long flags; - int rc; + PtlPut_out *ret = v_ret; + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + ptl_hdr_t hdr; + lib_md_t *md; + unsigned long flags; + int rc; if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ fail_peer (nal, id->nid, 1)) /* shall we now? */ @@ -1090,13 +1111,22 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) nal->ni.nid, id->nid); return (ret->rc = PTL_INV_PROC); } - - ret->rc = PTL_OK; + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n", + ni->nid, id->nid); + return (ret->rc = PTL_NOSPACE); + } + state_lock(nal, &flags); + md = ptl_handle2md(&args->md_in, nal); - if (md == NULL || !md->threshold) { + if (md == NULL || md->threshold == 0) { + lib_msg_free(nal, msg); state_unlock(nal, &flags); - return ret->rc = PTL_INV_MD; + + return (ret->rc = PTL_INV_MD); } CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid, @@ -1123,57 +1153,39 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) hdr.msg.put.offset = HTON__u32 (args->offset_in); hdr.msg.put.hdr_data = args->hdr_data_in; + lib_commit_md(nal, md, msg); + + msg->ev.type = PTL_EVENT_SENT; + msg->ev.initiator.nid = ni->nid; + msg->ev.initiator.pid = ni->pid; + msg->ev.portal = args->portal_in; + msg->ev.match_bits = args->match_bits_in; + msg->ev.rlength = md->length; + msg->ev.mlength = md->length; + msg->ev.offset = args->offset_in; + msg->ev.hdr_data = args->hdr_data_in; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); + ni->counters.send_count++; ni->counters.send_length += md->length; - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR("BAD: could not allocate msg!\n"); - state_unlock(nal, &flags); - return ret->rc = PTL_NOSPACE; - } - - /* - * If this memory descriptor has an event queue associated with - * it we need to allocate a message state object and record the - * information about this operation that will be recorded into - * event queue once the message has been completed. - * - * NB. We're now committed to the GET, since we just marked the MD - * busy. Callers who observe this (by getting PTL_MD_INUSE from - * PtlMDUnlink()) expect a completion event to tell them when the - * MD becomes idle. - */ - if (md->eq) { - msg->ev.type = PTL_EVENT_SENT; - msg->ev.initiator.nid = ni->nid; - msg->ev.initiator.pid = ni->pid; - msg->ev.portal = args->portal_in; - msg->ev.match_bits = args->match_bits_in; - msg->ev.rlength = md->length; - msg->ev.mlength = md->length; - msg->ev.offset = args->offset_in; - msg->ev.hdr_data = args->hdr_data_in; - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } - state_unlock(nal, &flags); rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT, id->nid, id->pid, md, 0, md->length); if (rc != PTL_OK) { - /* get_new_msg() committed us to sending by decrementing - * md->threshold, so we have to act like we did send, but - * the network dropped it. */ - lib_finalize (nal, private, msg); + CERROR(LPU64": error sending PUT to "LPU64": %d\n", + ni->nid, id->nid, rc); + lib_finalize (nal, private, msg, rc); } + /* completion will be signalled by an event */ return ret->rc = PTL_OK; } -lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, - lib_md_t *getmd) +lib_msg_t * +lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd) { /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This * returns a msg the NAL can pass to lib_finalize() so that a REPLY @@ -1185,39 +1197,38 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, * lib_finalize() of the original GET. */ lib_ni_t *ni = &nal->ni; - lib_msg_t *msg; + lib_msg_t *msg = lib_msg_alloc(nal); unsigned long flags; state_lock(nal, &flags); LASSERT (getmd->pending > 0); + if (msg == NULL) { + CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n", + peer_nid); + goto drop; + } + if (getmd->threshold == 0) { CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n", peer_nid, getmd); - goto drop; + goto drop_msg; } LASSERT (getmd->offset == 0); CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd); - msg = get_new_msg (nal, getmd); - if (msg == NULL) { - CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n", - peer_nid, getmd); - goto drop; - } + lib_commit_md (nal, getmd, msg); - if (getmd->eq) { - msg->ev.type = PTL_EVENT_REPLY; - msg->ev.initiator.nid = peer_nid; - msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ - msg->ev.rlength = msg->ev.mlength = getmd->length; - msg->ev.offset = 0; + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = peer_nid; + msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ + msg->ev.rlength = msg->ev.mlength = getmd->length; + msg->ev.offset = 0; - lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); ni->counters.recv_count++; ni->counters.recv_length += getmd->length; @@ -1225,7 +1236,9 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, state_unlock(nal, &flags); return msg; - + + drop_msg: + lib_msg_free(nal, msg); drop: nal->ni.counters.drop_count++; nal->ni.counters.drop_length += getmd->length; @@ -1235,7 +1248,8 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, return NULL; } -int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) +int +do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret) { /* * Incoming: @@ -1249,15 +1263,15 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) * Outgoing: */ - PtlGet_in *args = v_args; - PtlGet_out *ret = v_ret; - ptl_hdr_t hdr; - lib_msg_t *msg = NULL; - lib_ni_t *ni = &nal->ni; + PtlGet_in *args = v_args; ptl_process_id_t *id = &args->target_in; - lib_md_t *md; - unsigned long flags; - int rc; + PtlGet_out *ret = v_ret; + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + ptl_hdr_t hdr; + lib_md_t *md; + unsigned long flags; + int rc; if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ fail_peer (nal, id->nid, 1)) /* shall we now? */ @@ -1266,16 +1280,24 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) nal->ni.nid, id->nid); return (ret->rc = PTL_INV_PROC); } - + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n", + ni->nid, id->nid); + return (ret->rc = PTL_NOSPACE); + } + state_lock(nal, &flags); + md = ptl_handle2md(&args->md_in, nal); if (md == NULL || !md->threshold) { + lib_msg_free(nal, msg); state_unlock(nal, &flags); + return ret->rc = PTL_INV_MD; } - LASSERT (md->offset == 0); - CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid, (unsigned long)id->pid); @@ -1296,51 +1318,33 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) hdr.msg.get.src_offset = HTON__u32 (args->offset_in); hdr.msg.get.sink_length = HTON__u32 (md->length); - ni->counters.send_count++; + lib_commit_md(nal, md, msg); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR("do_PtlGet: BAD - could not allocate cookie!\n"); - state_unlock(nal, &flags); - return ret->rc = PTL_NOSPACE; - } + msg->ev.type = PTL_EVENT_SENT; + msg->ev.initiator.nid = ni->nid; + msg->ev.initiator.pid = ni->pid; + msg->ev.portal = args->portal_in; + msg->ev.match_bits = args->match_bits_in; + msg->ev.rlength = md->length; + msg->ev.mlength = md->length; + msg->ev.offset = args->offset_in; + msg->ev.hdr_data = 0; - /* - * If this memory descriptor has an event queue associated with - * it we must allocate a message state object that will record - * the information to be filled in once the message has been - * completed. More information is in the do_PtlPut() comments. - * - * NB. We're now committed to the GET, since we just marked the MD - * busy. Callers who observe this (by getting PTL_MD_INUSE from - * PtlMDUnlink()) expect a completion event to tell them when the - * MD becomes idle. - */ - if (md->eq) { - msg->ev.type = PTL_EVENT_SENT; - msg->ev.initiator.nid = ni->nid; - msg->ev.initiator.pid = ni->pid; - msg->ev.portal = args->portal_in; - msg->ev.match_bits = args->match_bits_in; - msg->ev.rlength = md->length; - msg->ev.mlength = md->length; - msg->ev.offset = args->offset_in; - msg->ev.hdr_data = 0; - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); + + ni->counters.send_count++; state_unlock(nal, &flags); rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET, id->nid, id->pid, NULL, 0, 0); if (rc != PTL_OK) { - /* get_new_msg() committed us to sending by decrementing - * md->threshold, so we have to act like we did send, but - * the network dropped it. */ - lib_finalize (nal, private, msg); + CERROR(LPU64": error sending GET to "LPU64": %d\n", + ni->nid, id->nid, rc); + lib_finalize (nal, private, msg, rc); } + /* completion will be signalled by an event */ return ret->rc = PTL_OK; } diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 9840ff5..36be55c 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -32,32 +32,81 @@ #include -int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) +void +lib_enq_event_locked (nal_cb_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev) { - lib_md_t *md; - lib_eq_t *eq; + ptl_event_t *eq_slot; int rc; + + ev->sequence = eq->sequence++; /* Allocate the next queue slot */ + + /* size must be a power of 2 to handle a wrapped sequence # */ + LASSERT (eq->size != 0 && + eq->size == LOWEST_BIT_SET (eq->size)); + eq_slot = eq->base + (ev->sequence & (eq->size - 1)); + + /* Copy the event into the allocated slot, ensuring all the rest of + * the event's contents have been copied _before_ the sequence + * number gets updated. A processes 'getting' an event waits on + * the next queue slot's sequence to be 'new'. When it is, _all_ + * other event fields had better be consistent. I assert + * 'sequence' is the last member, so I only need a 2 stage copy. */ + + LASSERT(sizeof (ptl_event_t) == + offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); + + rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, + offsetof (ptl_event_t, sequence)); + LASSERT (rc == PTL_OK); + +#ifdef __KERNEL__ + barrier(); +#endif + /* Updating the sequence number is what makes the event 'new' NB if + * the cb_write below isn't atomic, this could cause a race with + * PtlEQGet */ + rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, + (void *)&ev->sequence,sizeof (ev->sequence)); + LASSERT (rc == PTL_OK); + +#ifdef __KERNEL__ + barrier(); +#endif + + if (nal->cb_callback != NULL) + nal->cb_callback(nal, private, eq, ev); + else if (eq->event_callback != NULL) + eq->event_callback(ev); +} + +void +lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, int status) +{ + lib_md_t *md; + int unlink; unsigned long flags; + int rc; + ptl_hdr_t ack; /* ni went down while processing this message */ - if (nal->ni.up == 0) { - return -1; - } + if (nal->ni.up == 0) + return; if (msg == NULL) - return 0; + return; - rc = 0; - if (msg->send_ack) { - ptl_hdr_t ack; + /* Only send an ACK if the PUT completed successfully */ + if (status == PTL_OK && + !ptl_is_wire_handle_none(&msg->ack_wmd)) { - LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd)); + LASSERT(msg->ev.type == PTL_EVENT_PUT); memset (&ack, 0, sizeof (ack)); ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->nid); + ack.dest_nid = HTON__u64 (msg->ev.initiator.nid); ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->pid); + ack.dest_pid = HTON__u32 (msg->ev.initiator.pid); ack.src_pid = HTON__u32 (nal->ni.pid); ack.payload_length = 0; @@ -66,92 +115,35 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->nid, msg->pid, NULL, 0, 0); - /* If this send fails, there's nothing else to clean up */ + msg->ev.initiator.nid, msg->ev.initiator.pid, + NULL, 0, 0); + if (rc != PTL_OK) { + /* send failed: there's nothing else to clean up. */ + CERROR("Error %d sending ACK to "LPX64"\n", + rc, msg->ev.initiator.nid); + } } md = msg->md; - LASSERT (md->pending > 0); /* I've not dropped my ref yet */ - eq = md->eq; state_lock(nal, &flags); - if (eq != NULL) { - ptl_event_t *ev = &msg->ev; - ptl_event_t *eq_slot; - - /* I have to hold the lock while I bump the sequence number - * and copy the event into the queue. If not, and I was - * interrupted after bumping the sequence number, other - * events could fill the queue, including the slot I just - * allocated to this event. On resuming, I would overwrite - * a more 'recent' event with old event state, and - * processes taking events off the queue would not detect - * overflow correctly. - */ - - ev->sequence = eq->sequence++;/* Allocate the next queue slot */ - - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); - - /* Invalidate unlinked_me unless this is the last - * event for an auto-unlinked MD. Note that if md was - * auto-unlinked, md->pending can only decrease - */ - if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */ - md->pending != 1) /* not last ref */ - ev->unlinked_me = PTL_HANDLE_NONE; - - /* Copy the event into the allocated slot, ensuring all the - * rest of the event's contents have been copied _before_ - * the sequence number gets updated. A processes 'getting' - * an event waits on the next queue slot's sequence to be - * 'new'. When it is, _all_ other event fields had better - * be consistent. I assert 'sequence' is the last member, - * so I only need a 2 stage copy. - */ - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); - - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == 0); - -#ifdef __KERNEL__ - barrier(); -#endif - /* Updating the sequence number is what makes the event 'new' */ - - /* cb_write is not necessarily atomic, so this could - cause a race with PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == 0); + /* Now it's safe to drop my caller's ref */ + md->pending--; + LASSERT (md->pending >= 0); -#ifdef __KERNEL__ - barrier(); -#endif + /* Should I unlink this MD? */ + unlink = (md->pending == 0 && /* No other refs */ + (md->threshold == 0 || /* All ops done */ + md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */ - /* I must also ensure that (a) callbacks are made in the - * same order as the events land in the queue, and (b) the - * callback occurs before the event can be removed from the - * queue, so I can't drop the lock during the callback. */ - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - (void)((eq->event_callback) (ev)); - } + msg->ev.status = status; + msg->ev.unlinked = unlink; - LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0); + if (md->eq != NULL) + lib_enq_event_locked(nal, private, md->eq, &msg->ev); - md->pending--; - if (md->pending == 0 && /* no more outstanding operations on this md */ - (md->threshold == 0 || /* done its business */ - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */ + if (unlink) lib_md_unlink(nal, md); list_del (&msg->msg_list); @@ -159,6 +151,4 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) lib_msg_free(nal, msg); state_unlock(nal, &flags); - - return rc; } diff --git a/lnet/ulnds/proclib.c b/lnet/ulnds/proclib.c index 2627253..2a5ba0d 100644 --- a/lnet/ulnds/proclib.c +++ b/lnet/ulnds/proclib.c @@ -43,24 +43,24 @@ /* the following functions are stubs to satisfy the nal definition without doing anything particularily useful*/ -static int nal_write(nal_cb_t *nal, - void *private, - user_ptr dst_addr, - void *src_addr, - size_t len) +static ptl_err_t nal_write(nal_cb_t *nal, + void *private, + user_ptr dst_addr, + void *src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } -static int nal_read(nal_cb_t * nal, - void *private, - void *dst_addr, - user_ptr src_addr, - size_t len) +static ptl_err_t nal_read(nal_cb_t * nal, + void *private, + void *dst_addr, + user_ptr src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } static void *nal_malloc(nal_cb_t *nal, diff --git a/lnet/ulnds/socklnd/proclib.c b/lnet/ulnds/socklnd/proclib.c index 2627253..2a5ba0d 100644 --- a/lnet/ulnds/socklnd/proclib.c +++ b/lnet/ulnds/socklnd/proclib.c @@ -43,24 +43,24 @@ /* the following functions are stubs to satisfy the nal definition without doing anything particularily useful*/ -static int nal_write(nal_cb_t *nal, - void *private, - user_ptr dst_addr, - void *src_addr, - size_t len) +static ptl_err_t nal_write(nal_cb_t *nal, + void *private, + user_ptr dst_addr, + void *src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } -static int nal_read(nal_cb_t * nal, - void *private, - void *dst_addr, - user_ptr src_addr, - size_t len) +static ptl_err_t nal_read(nal_cb_t * nal, + void *private, + void *dst_addr, + user_ptr src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } static void *nal_malloc(nal_cb_t *nal, diff --git a/lnet/ulnds/socklnd/tcplnd.c b/lnet/ulnds/socklnd/tcplnd.c index bf28a058..8e84142 100644 --- a/lnet/ulnds/socklnd/tcplnd.c +++ b/lnet/ulnds/socklnd/tcplnd.c @@ -55,70 +55,69 @@ * * sends a packet to the peer, after insuring that a connection exists */ -int tcpnal_send(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, - size_t len) +ptl_err_t tcpnal_send(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, + ptl_pid_t pid, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t len) { connection c; bridge b=(bridge)n->nal_data; struct iovec tiov[257]; static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER; - int rc; + ptl_err_t rc = PTL_OK; + int sysrc; int total; + int ntiov; int i; if (!(c=force_tcp_connection((manager)b->lower, PNAL_IP(nid,b), PNAL_PORT(nid,pid), - b->local))) - return(1); + b->local))) + return(PTL_FAIL); -#if 0 /* TODO: these results should be checked. furthermore, provision must be made for the SIGPIPE which is delivered when writing on a tcp socket which has closed underneath the application. there is a linux flag in the sendmsg call which turns off the signally behaviour, but its nonstandard */ - syscall(SYS_write, c->fd,hdr,sizeof(ptl_hdr_t)); - LASSERT (niov <= 1); - if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len); -#else + LASSERT (niov <= 256); tiov[0].iov_base = hdr; tiov[0].iov_len = sizeof(ptl_hdr_t); + ntiov = 1 + lib_extract_iov(256, &tiov[1], niov, iov, offset, len); - if (niov > 0) - memcpy(&tiov[1], iov, niov * sizeof(struct iovec)); pthread_mutex_lock(&send_lock); #if 1 - for (i = total = 0; i <= niov; i++) + for (i = total = 0; i <= ntiov; i++) total += tiov[i].iov_len; - rc = syscall(SYS_writev, c->fd, tiov, niov+1); - if (rc != total) { + sysrc = syscall(SYS_writev, c->fd, tiov, ntiov); + if (sysrc != total) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, total, errno); - abort(); + rc = PTL_FAIL; } #else - for (i = total = 0; i <= niov; i++) { + for (i = total = 0; i <= ntiov; i++) { rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0); if (rc != tiov[i].iov_len) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, tiov[i].iov_len, errno); - abort(); + rc = PTL_FAIL; + break; } - total != rc; + total += rc; } #endif #if 0 @@ -131,10 +130,14 @@ int tcpnal_send(nal_cb_t *n, total, niov + 1); #endif pthread_mutex_unlock(&send_lock); -#endif - lib_finalize(n, private, cookie); - - return(0); + + if (rc == PTL_OK) { + /* NB the NAL only calls lib_finalize() if it returns PTL_OK + * from cb_send() */ + lib_finalize(n, private, cookie, PTL_OK); + } + + return(rc); } @@ -151,15 +154,18 @@ int tcpnal_send(nal_cb_t *n, * blocking read of the requested data. must drain out the * difference of mainpulated and requested lengths from the network */ -int tcpnal_recv(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - unsigned int niov, - struct iovec *iov, - size_t mlen, - size_t rlen) +ptl_err_t tcpnal_recv(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t mlen, + size_t rlen) { + struct iovec tiov[256]; + int ntiov; int i; if (!niov) @@ -169,16 +175,19 @@ int tcpnal_recv(nal_cb_t *n, LASSERT(rlen); LASSERT(rlen >= mlen); + ntiov = lib_extract_iov(256, tiov, niov, iov, offset, mlen); + /* FIXME * 1. Is this effecient enough? change to use readv() directly? * 2. need check return from read_connection() * - MeiJia */ - for (i = 0; i < niov; i++) - read_connection(private, iov[i].iov_base, iov[i].iov_len); + for (i = 0; i < ntiov; i++) + read_connection(private, tiov[i].iov_base, tiov[i].iov_len); finalize: - lib_finalize(n, private, cookie); + /* FIXME; we always assume success here... */ + lib_finalize(n, private, cookie, PTL_OK); if (mlen!=rlen){ char *trash=malloc(rlen-mlen); @@ -188,7 +197,7 @@ finalize: free(trash); } - return(rlen); + return(PTL_OK); } diff --git a/lnet/ulnds/tcplnd.c b/lnet/ulnds/tcplnd.c index bf28a058..8e84142 100644 --- a/lnet/ulnds/tcplnd.c +++ b/lnet/ulnds/tcplnd.c @@ -55,70 +55,69 @@ * * sends a packet to the peer, after insuring that a connection exists */ -int tcpnal_send(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, - size_t len) +ptl_err_t tcpnal_send(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, + ptl_pid_t pid, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t len) { connection c; bridge b=(bridge)n->nal_data; struct iovec tiov[257]; static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER; - int rc; + ptl_err_t rc = PTL_OK; + int sysrc; int total; + int ntiov; int i; if (!(c=force_tcp_connection((manager)b->lower, PNAL_IP(nid,b), PNAL_PORT(nid,pid), - b->local))) - return(1); + b->local))) + return(PTL_FAIL); -#if 0 /* TODO: these results should be checked. furthermore, provision must be made for the SIGPIPE which is delivered when writing on a tcp socket which has closed underneath the application. there is a linux flag in the sendmsg call which turns off the signally behaviour, but its nonstandard */ - syscall(SYS_write, c->fd,hdr,sizeof(ptl_hdr_t)); - LASSERT (niov <= 1); - if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len); -#else + LASSERT (niov <= 256); tiov[0].iov_base = hdr; tiov[0].iov_len = sizeof(ptl_hdr_t); + ntiov = 1 + lib_extract_iov(256, &tiov[1], niov, iov, offset, len); - if (niov > 0) - memcpy(&tiov[1], iov, niov * sizeof(struct iovec)); pthread_mutex_lock(&send_lock); #if 1 - for (i = total = 0; i <= niov; i++) + for (i = total = 0; i <= ntiov; i++) total += tiov[i].iov_len; - rc = syscall(SYS_writev, c->fd, tiov, niov+1); - if (rc != total) { + sysrc = syscall(SYS_writev, c->fd, tiov, ntiov); + if (sysrc != total) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, total, errno); - abort(); + rc = PTL_FAIL; } #else - for (i = total = 0; i <= niov; i++) { + for (i = total = 0; i <= ntiov; i++) { rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0); if (rc != tiov[i].iov_len) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, tiov[i].iov_len, errno); - abort(); + rc = PTL_FAIL; + break; } - total != rc; + total += rc; } #endif #if 0 @@ -131,10 +130,14 @@ int tcpnal_send(nal_cb_t *n, total, niov + 1); #endif pthread_mutex_unlock(&send_lock); -#endif - lib_finalize(n, private, cookie); - - return(0); + + if (rc == PTL_OK) { + /* NB the NAL only calls lib_finalize() if it returns PTL_OK + * from cb_send() */ + lib_finalize(n, private, cookie, PTL_OK); + } + + return(rc); } @@ -151,15 +154,18 @@ int tcpnal_send(nal_cb_t *n, * blocking read of the requested data. must drain out the * difference of mainpulated and requested lengths from the network */ -int tcpnal_recv(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - unsigned int niov, - struct iovec *iov, - size_t mlen, - size_t rlen) +ptl_err_t tcpnal_recv(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t mlen, + size_t rlen) { + struct iovec tiov[256]; + int ntiov; int i; if (!niov) @@ -169,16 +175,19 @@ int tcpnal_recv(nal_cb_t *n, LASSERT(rlen); LASSERT(rlen >= mlen); + ntiov = lib_extract_iov(256, tiov, niov, iov, offset, mlen); + /* FIXME * 1. Is this effecient enough? change to use readv() directly? * 2. need check return from read_connection() * - MeiJia */ - for (i = 0; i < niov; i++) - read_connection(private, iov[i].iov_base, iov[i].iov_len); + for (i = 0; i < ntiov; i++) + read_connection(private, tiov[i].iov_base, tiov[i].iov_len); finalize: - lib_finalize(n, private, cookie); + /* FIXME; we always assume success here... */ + lib_finalize(n, private, cookie, PTL_OK); if (mlen!=rlen){ char *trash=malloc(rlen-mlen); @@ -188,7 +197,7 @@ finalize: free(trash); } - return(rlen); + return(PTL_OK); } diff --git a/lustre/portals/include/linux/kp30.h b/lustre/portals/include/linux/kp30.h index 3d60631..7c3d824 100644 --- a/lustre/portals/include/linux/kp30.h +++ b/lustre/portals/include/linux/kp30.h @@ -326,43 +326,6 @@ do { \ s, (ptr), atomic_read(&portal_kmemory)); \ } while (0) -#ifndef SLAB_MEMALLOC -#define SLAB_MEMALLOC 0 -#endif - -#define PORTAL_SLAB_ALLOC(ptr, slab, size) \ -do { \ - LASSERT(!in_interrupt()); \ - (ptr) = kmem_cache_alloc((slab), (SLAB_KERNEL | SLAB_MEMALLOC)); \ - if ((ptr) == NULL) { \ - CERROR("PORTALS: out of memory at %s:%d (tried to alloc" \ - " '" #ptr "' from slab '" #slab "')\n", __FILE__, \ - __LINE__); \ - CERROR("PORTALS: %d total bytes allocated by portals\n", \ - atomic_read(&portal_kmemory)); \ - } else { \ - portal_kmem_inc((ptr), (size)); \ - memset((ptr), 0, (size)); \ - } \ - CDEBUG(D_MALLOC, "kmalloced '" #ptr "': %d at %p (tot %d).\n", \ - (int)(size), (ptr), atomic_read(&portal_kmemory)); \ -} while (0) - -#define PORTAL_SLAB_FREE(ptr, slab, size) \ -do { \ - int s = (size); \ - if ((ptr) == NULL) { \ - CERROR("PORTALS: free NULL '" #ptr "' (%d bytes) at " \ - "%s:%d\n", s, __FILE__, __LINE__); \ - break; \ - } \ - memset((ptr), 0x5a, s); \ - kmem_cache_free((slab), ptr); \ - portal_kmem_dec((ptr), s); \ - CDEBUG(D_MALLOC, "kfreed '" #ptr "': %d at %p (tot %d).\n", \ - s, (ptr), atomic_read (&portal_kmemory)); \ -} while (0) - /* ------------------------------------------------------------------- */ #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) diff --git a/lustre/portals/include/portals/errno.h b/lustre/portals/include/portals/errno.h index 817936a..08f084a 100644 --- a/lustre/portals/include/portals/errno.h +++ b/lustre/portals/include/portals/errno.h @@ -50,9 +50,8 @@ typedef enum { PTL_IOV_TOO_SMALL = 31, PTL_EQ_INUSE = 32, - PTL_MD_INUSE = 33, - PTL_MAX_ERRNO = 33 + PTL_MAX_ERRNO = 32 } ptl_err_t; /* If you change these, you must update the string table in api-errno.c */ diff --git a/lustre/portals/include/portals/lib-nal.h b/lustre/portals/include/portals/lib-nal.h index 4052c0c..0bf557e 100644 --- a/lustre/portals/include/portals/lib-nal.h +++ b/lustre/portals/include/portals/lib-nal.h @@ -18,47 +18,60 @@ struct nal_cb_t { lib_ni_t ni; void *nal_data; /* - * send: Sends a preformatted header and user data to a - * specified remote process. - * Can overwrite iov. + * send: Sends a preformatted header and payload data to a + * specified remote process. The payload is scattered over 'niov' + * fragments described by iov, starting at 'offset' for 'mlen' + * bytes. + * NB the NAL may NOT overwrite iov. + * PTL_OK on success => NAL has committed to send and will call + * lib_finalize on completion */ - int (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t mlen); + ptl_err_t (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen); /* as send, but with a set of page fragments (NULL if not supported) */ - int (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, ptl_kiov_t *iov, size_t mlen); + ptl_err_t (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + unsigned int niov, ptl_kiov_t *iov, + size_t offset, size_t mlen); /* - * recv: Receives an incoming message from a remote process - * Type of iov depends on options. Can overwrite iov. + * recv: Receives an incoming message from a remote process. The + * payload is to be received into the scattered buffer of 'niov' + * fragments described by iov, starting at 'offset' for 'mlen' + * bytes. Payload bytes after 'mlen' up to 'rlen' are to be + * discarded. + * NB the NAL may NOT overwrite iov. + * PTL_OK on success => NAL has committed to receive and will call + * lib_finalize on completion */ - int (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - unsigned int niov, struct iovec *iov, size_t mlen, - size_t rlen); + ptl_err_t (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen); /* as recv, but with a set of page fragments (NULL if not supported) */ - int (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, - unsigned int niov, ptl_kiov_t *iov, size_t mlen, - size_t rlen); + ptl_err_t (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, + unsigned int niov, ptl_kiov_t *iov, + size_t offset, size_t mlen, size_t rlen); /* * read: Reads a block of data from a specified user address */ - int (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr, - user_ptr src_addr, size_t len); + ptl_err_t (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr, + user_ptr src_addr, size_t len); /* * write: Writes a block of data into a specified user address */ - int (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr, - void *src_addr, size_t len); + ptl_err_t (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr, + void *src_addr, size_t len); /* * callback: Calls an event callback + * NULL => lib calls eq's callback (if any) directly. */ - int (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev); + void (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq, + ptl_event_t *ev); /* * malloc: Acquire a block of memory in a system independent @@ -74,14 +87,14 @@ struct nal_cb_t { * type of *iov depends on options. * Set to NULL if not required. */ - int (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, - void **addrkey); + ptl_err_t (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, + void **addrkey); void (*cb_unmap) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, void **addrkey); /* as (un)map, but with a set of page fragments */ - int (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, - void **addrkey); + ptl_err_t (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, + void **addrkey); void (*cb_unmap_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, void **addrkey); diff --git a/lustre/portals/include/portals/lib-p30.h b/lustre/portals/include/portals/lib-p30.h index c402828..10a7bc7 100644 --- a/lustre/portals/include/portals/lib-p30.h +++ b/lustre/portals/include/portals/lib-p30.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -42,7 +41,7 @@ do { \ nal->cb_sti(nal, flagsp); \ } -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST #define MAX_MES 2048 #define MAX_MDS 2048 @@ -98,7 +97,7 @@ lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ unsigned long flags; @@ -142,8 +141,20 @@ lib_me_free (nal_cb_t *nal, lib_me_t *me) static inline lib_msg_t * lib_msg_alloc (nal_cb_t *nal) { - /* ALWAYS called with statelock held */ - return ((lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs)); + /* NEVER called with statelock held */ + unsigned long flags; + lib_msg_t *msg; + + state_lock (nal, &flags); + msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs); + state_unlock (nal, &flags); + + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } + return(msg); } static inline void @@ -155,26 +166,13 @@ lib_msg_free (nal_cb_t *nal, lib_msg_t *msg) #else -extern kmem_cache_t *ptl_md_slab; -extern kmem_cache_t *ptl_msg_slab; -extern kmem_cache_t *ptl_me_slab; -extern kmem_cache_t *ptl_eq_slab; -extern atomic_t md_in_use_count; -extern atomic_t msg_in_use_count; -extern atomic_t me_in_use_count; -extern atomic_t eq_in_use_count; - static inline lib_eq_t * lib_eq_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_eq_t *eq; - PORTAL_SLAB_ALLOC(eq, ptl_eq_slab, sizeof(*eq)); - - if (eq == NULL) - return (NULL); - atomic_inc (&eq_in_use_count); + PORTAL_ALLOC(eq, sizeof(*eq)); return (eq); } @@ -182,21 +180,34 @@ static inline void lib_eq_free (nal_cb_t *nal, lib_eq_t *eq) { /* ALWAYS called with statelock held */ - atomic_dec (&eq_in_use_count); - PORTAL_SLAB_FREE(eq, ptl_eq_slab, sizeof(*eq)); + PORTAL_FREE(eq, sizeof(*eq)); } static inline lib_md_t * -lib_md_alloc (nal_cb_t *nal) +lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd) { /* NEVER called with statelock held */ lib_md_t *md; - PORTAL_SLAB_ALLOC(md, ptl_md_slab, sizeof(*md)); - - if (md == NULL) - return (NULL); - - atomic_inc (&md_in_use_count); + int size; + int niov; + + if ((umd->options & PTL_MD_KIOV) != 0) { + niov = umd->niov; + size = offsetof(lib_md_t, md_iov.kiov[niov]); + } else { + niov = ((umd->options & PTL_MD_IOV) != 0) ? + umd->niov : 1; + size = offsetof(lib_md_t, md_iov.iov[niov]); + } + + PORTAL_ALLOC(md, size); + + if (md != NULL) { + /* Set here in case of early free */ + md->options = umd->options; + md->md_niov = niov; + } + return (md); } @@ -204,8 +215,14 @@ static inline void lib_md_free (nal_cb_t *nal, lib_md_t *md) { /* ALWAYS called with statelock held */ - atomic_dec (&md_in_use_count); - PORTAL_SLAB_FREE(md, ptl_md_slab, sizeof(*md)); + int size; + + if ((md->options & PTL_MD_KIOV) != 0) + size = offsetof(lib_md_t, md_iov.kiov[md->md_niov]); + else + size = offsetof(lib_md_t, md_iov.iov[md->md_niov]); + + PORTAL_FREE(md, size); } static inline lib_me_t * @@ -213,12 +230,8 @@ lib_me_alloc (nal_cb_t *nal) { /* NEVER called with statelock held */ lib_me_t *me; - PORTAL_SLAB_ALLOC(me, ptl_me_slab, sizeof(*me)); - - if (me == NULL) - return (NULL); - atomic_inc (&me_in_use_count); + PORTAL_ALLOC(me, sizeof(*me)); return (me); } @@ -226,21 +239,21 @@ static inline void lib_me_free(nal_cb_t *nal, lib_me_t *me) { /* ALWAYS called with statelock held */ - atomic_dec (&me_in_use_count); - PORTAL_SLAB_FREE(me, ptl_me_slab, sizeof(*me)); + PORTAL_FREE(me, sizeof(*me)); } static inline lib_msg_t * lib_msg_alloc(nal_cb_t *nal) { - /* ALWAYS called with statelock held */ + /* NEVER called with statelock held */ lib_msg_t *msg; - PORTAL_SLAB_ALLOC(msg, ptl_msg_slab, sizeof(*msg)); - if (msg == NULL) - return (NULL); - - atomic_inc (&msg_in_use_count); + PORTAL_ALLOC(msg, sizeof(*msg)); + if (msg != NULL) { + /* NULL pointers, clear flags etc */ + memset (msg, 0, sizeof (*msg)); + msg->ack_wmd = PTL_WIRE_HANDLE_NONE; + } return (msg); } @@ -248,8 +261,7 @@ static inline void lib_msg_free(nal_cb_t *nal, lib_msg_t *msg) { /* ALWAYS called with statelock held */ - atomic_dec (&msg_in_use_count); - PORTAL_SLAB_FREE(msg, ptl_msg_slab, sizeof(*msg)); + PORTAL_FREE(msg, sizeof(*msg)); } #endif @@ -348,26 +360,40 @@ extern char *dispatch_name(int index); * Call backs will be made to write events, send acks or * replies and so on. */ -extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private); -extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg); +extern void lib_enq_event_locked (nal_cb_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev); +extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, int status); +extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private); extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd); -extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr); +extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr); + extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov); -extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len); -extern void lib_copy_buf2iov (int niov, struct iovec *iov, char *dest, ptl_size_t len); +extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_iov (int dst_niov, struct iovec *dst, + int src_niov, struct iovec *src, + ptl_size_t offset, ptl_size_t len); extern ptl_size_t lib_kiov_nob (int niov, ptl_kiov_t *iov); -extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *iov, ptl_size_t len); -extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *iov, char *src, ptl_size_t len); +extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len); +extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len); +extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, + ptl_size_t offset, ptl_size_t len); + extern void lib_assert_wire_constants (void); -extern void lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, - ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); -extern int lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - lib_md_t *md, ptl_size_t offset, ptl_size_t len); +extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, + ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen); +extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, + ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, + lib_md_t *md, ptl_size_t offset, ptl_size_t len); extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in, ptl_md_t * md_out); diff --git a/lustre/portals/include/portals/lib-types.h b/lustre/portals/include/portals/lib-types.h index 30e56af..904204b 100644 --- a/lustre/portals/include/portals/lib-types.h +++ b/lustre/portals/include/portals/lib-types.h @@ -12,11 +12,11 @@ #include #ifdef __KERNEL__ -# define PTL_USE_SLAB_CACHE # include # include # include #else +# define PTL_USE_LIB_FREELIST # include #endif @@ -139,16 +139,9 @@ typedef struct { struct lib_msg_t { struct list_head msg_list; - int send_ack; lib_md_t *md; - ptl_nid_t nid; - ptl_pid_t pid; - ptl_event_t ev; ptl_handle_wire_t ack_wmd; - union { - struct iovec iov[PTL_MD_MAX_IOV]; - ptl_kiov_t kiov[PTL_MD_MAX_IOV]; - } msg_iov; + ptl_event_t ev; }; struct lib_ptl_t { @@ -212,9 +205,8 @@ struct lib_md_t { }; #define PTL_MD_FLAG_UNLINK (1 << 0) -#define PTL_MD_FLAG_AUTO_UNLINKED (1 << 1) -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST typedef struct { void *fl_objs; /* single contiguous array of objects */ @@ -262,7 +254,7 @@ typedef struct { struct list_head ni_test_peers; -#ifndef PTL_USE_SLAB_CACHE +#ifdef PTL_USE_LIB_FREELIST lib_freelist_t ni_free_mes; lib_freelist_t ni_free_msgs; lib_freelist_t ni_free_mds; diff --git a/lustre/portals/include/portals/p30.h b/lustre/portals/include/portals/p30.h index a4ea39b..8b1495e 100644 --- a/lustre/portals/include/portals/p30.h +++ b/lustre/portals/include/portals/p30.h @@ -21,7 +21,6 @@ #include #include #include -#include #include extern int __p30_initialized; /* for libraries & test codes */ diff --git a/lustre/portals/include/portals/types.h b/lustre/portals/include/portals/types.h index e4ccebf..7ffe797 100644 --- a/lustre/portals/include/portals/types.h +++ b/lustre/portals/include/portals/types.h @@ -17,6 +17,8 @@ typedef u_int64_t __u64; # define do_gettimeofday(tv) gettimeofday(tv, NULL) #endif +#include + typedef __u64 ptl_nid_t; typedef __u32 ptl_pid_t; typedef __u32 ptl_pt_index_t; @@ -97,7 +99,8 @@ typedef enum { PTL_EVENT_PUT, PTL_EVENT_REPLY, PTL_EVENT_ACK, - PTL_EVENT_SENT + PTL_EVENT_SENT, + PTL_EVENT_UNLINK, } ptl_event_kind_t; #define PTL_SEQ_BASETYPE long @@ -112,15 +115,19 @@ typedef unsigned PTL_SEQ_BASETYPE ptl_seq_t; #pragma pack(push, 4) #endif typedef struct { - ptl_event_kind_t type; - ptl_process_id_t initiator; - ptl_pt_index_t portal; - ptl_match_bits_t match_bits; - ptl_size_t rlength, mlength, offset; - ptl_handle_me_t unlinked_me; - ptl_md_t mem_desc; - ptl_hdr_data_t hdr_data; - struct timeval arrival_time; + ptl_event_kind_t type; + ptl_err_t status; + int unlinked; + ptl_process_id_t initiator; + ptl_pt_index_t portal; + ptl_match_bits_t match_bits; + ptl_size_t rlength; + ptl_size_t mlength; + ptl_size_t offset; + ptl_md_t mem_desc; + ptl_hdr_data_t hdr_data; + struct timeval arrival_time; + volatile ptl_seq_t sequence; } ptl_event_t; #ifdef __CYGWIN__ diff --git a/lustre/portals/knals/gmnal/gmnal.h b/lustre/portals/knals/gmnal/gmnal.h index 53757ab..cdde5b7 100644 --- a/lustre/portals/knals/gmnal/gmnal.h +++ b/lustre/portals/knals/gmnal/gmnal.h @@ -353,8 +353,6 @@ int gmnal_cb_read(nal_cb_t *, void *private, void *, user_ptr, size_t); int gmnal_cb_write(nal_cb_t *, void *private, user_ptr, void *, size_t); -int gmnal_cb_callback(nal_cb_t *, void *, lib_eq_t *, ptl_event_t *); - void *gmnal_cb_malloc(nal_cb_t *, size_t); void gmnal_cb_free(nal_cb_t *, void *, size_t); @@ -384,7 +382,7 @@ void gmnal_fini(void); a->cb_recv_pages = gmnal_cb_recv_pages; \ a->cb_read = gmnal_cb_read; \ a->cb_write = gmnal_cb_write; \ - a->cb_callback = gmnal_cb_callback; \ + a->cb_callback = NULL; \ a->cb_malloc = gmnal_cb_malloc; \ a->cb_free = gmnal_cb_free; \ a->cb_map = NULL; \ diff --git a/lustre/portals/knals/gmnal/gmnal_cb.c b/lustre/portals/knals/gmnal/gmnal_cb.c index 6ae91db..e055242 100644 --- a/lustre/portals/knals/gmnal/gmnal_cb.c +++ b/lustre/portals/knals/gmnal/gmnal_cb.c @@ -126,7 +126,6 @@ int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, niov, iov, len); } else { CDEBUG(D_ERROR, "Large message send it is not supported\n"); - lib_finalize(nal_cb, private, cookie); return(PTL_FAIL); gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, niov, iov, len); @@ -200,18 +199,6 @@ int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, return(PTL_OK); } -int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - - if (eq->event_callback != NULL) { - CDEBUG(D_INFO, "found callback\n"); - eq->event_callback(ev); - } - - return(PTL_OK); -} - void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len) { void *ptr = NULL; diff --git a/lustre/portals/knals/ibnal/ibnal_cb.c b/lustre/portals/knals/ibnal/ibnal_cb.c index 2c07cc4..0688062 100644 --- a/lustre/portals/knals/ibnal/ibnal_cb.c +++ b/lustre/portals/knals/ibnal/ibnal_cb.c @@ -306,7 +306,7 @@ kibnal_send(nal_cb_t *nal, if(buf_length > MAX_MSG_SIZE) { CERROR("kibnal_send:request exceeds Transmit data size (%d).\n", MAX_MSG_SIZE); - rc = -1; + rc = PTL_FAIL; return rc; } else { @@ -363,7 +363,7 @@ kibnal_send(nal_cb_t *nal, PROF_FINISH(kibnal_send); // time stapm of send operation - rc = 1; + rc = PTL_OK; return rc; } @@ -386,7 +386,7 @@ int kibnal_send_pages(nal_cb_t * nal, ptl_kiov_t *iov, size_t mlen) { - int rc = 1; + int rc = PTL_FAIL; CDEBUG(D_NET, "kibnal_send_pages\n"); @@ -420,7 +420,7 @@ void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) // // do you need this // -int kibnal_callback(nal_cb_t * nal, +void kibnal_callback(nal_cb_t * nal, void *private, lib_eq_t *eq, ptl_event_t *ev) @@ -507,7 +507,7 @@ kibnal_recv_pages(nal_cb_t * nal, { CDEBUG(D_NET, "recv_pages not implemented\n"); - return PTL_OK; + return PTL_FAIL; } @@ -526,11 +526,12 @@ kibnal_recv(nal_cb_t *nal, CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen); /* What was actually received must be >= what sender claims to - * have sent. This is an LASSERT, since lib-move doesn't - * check cb return code yet. */ - LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen); + * have sent. */ LASSERT (mlen <= rlen); + if (krx->krx_len < sizeof (ptl_hdr_t) + rlen) + return (PTL_FAIL); + PROF_START(kibnal_recv); if(mlen != 0) { @@ -542,12 +543,12 @@ kibnal_recv(nal_cb_t *nal, PROF_START(lib_finalize); - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, cookie, PTL_OK); PROF_FINISH(lib_finalize); PROF_FINISH(kibnal_recv); - return rlen; + return PTL_OK; } // diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 43926c9..39a4ea4 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -30,7 +30,7 @@ * LIB functions follow * */ -static int +static ptl_err_t kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -38,10 +38,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -49,7 +49,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, nal->ni.nid, len, src_addr, dst_addr ); memcpy( dst_addr, src_addr, len ); - return (0); + return (PTL_OK); } static void * @@ -145,7 +145,7 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx) } int -kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) +kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -157,9 +157,17 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) LASSERT (nfrags <= EP_MAXFRAG); LASSERT (niov > 0); LASSERT (nob > 0); - + + /* skip complete frags before 'offset' */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = kiov->kiov_len; + int fraglen = kiov->kiov_len - offset; /* nob exactly spans the iovs */ LASSERT (fraglen <= nob); @@ -182,7 +190,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) /* XXX this is really crap, but we'll have to kmap until * EKC has a page (rather than vaddr) mapping interface */ - ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; CDEBUG(D_NET, "%p[%d] loading %p for %d, page %d, %d total\n", @@ -212,6 +220,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) kiov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -226,7 +235,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov) } int -kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) +kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, + int niov, struct iovec *iov) { int nfrags = ktx->ktx_nfrag; int nmapped = ktx->ktx_nmappedpages; @@ -238,8 +248,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) LASSERT (niov > 0); LASSERT (nob > 0); + /* skip complete frags before offset */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + do { - int fraglen = iov->iov_len; + int fraglen = iov->iov_len - offset; long npages = kqswnal_pages_spanned (iov->iov_base, fraglen); /* nob exactly spans the iovs */ @@ -260,12 +278,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) CDEBUG(D_NET, "%p[%d] loading %p for %d, pages %d for %ld, %d total\n", - ktx, nfrags, iov->iov_base, fraglen, basepage, npages, - nmapped); + ktx, nfrags, iov->iov_base + offset, fraglen, + basepage, npages, nmapped); elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState, kqswnal_data.kqn_eptxdmahandle, - iov->iov_base, fraglen, + iov->iov_base + offset, fraglen, basepage, &ktx->ktx_frags.iov[nfrags].Base); /* keep in loop for failure case */ ktx->ktx_nmappedpages = nmapped; @@ -284,6 +302,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov) iov++; niov--; nob -= fraglen; + offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); @@ -410,7 +429,7 @@ void kqswnal_tx_done (kqswnal_tx_t *ktx, int error) { lib_msg_t *msg; - lib_msg_t *repmsg; + lib_msg_t *repmsg = NULL; switch (ktx->ktx_state) { case KTX_FORWARDING: /* router asked me to forward this packet */ @@ -420,22 +439,30 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) case KTX_SENDING: /* packet sourced locally */ lib_finalize (&kqswnal_lib, ktx->ktx_args[0], - (lib_msg_t *)ktx->ktx_args[1]); + (lib_msg_t *)ktx->ktx_args[1], + (error == 0) ? PTL_OK : + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); break; case KTX_GETTING: /* Peer has DMA-ed direct? */ LASSERT (KQSW_OPTIMIZE_GETS); msg = (lib_msg_t *)ktx->ktx_args[1]; - repmsg = NULL; - if (error == 0) + if (error == 0) { repmsg = lib_fake_reply_msg (&kqswnal_lib, ktx->ktx_nid, msg->md); + if (repmsg == NULL) + error = -ENOMEM; + } - lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg); - - if (repmsg != NULL) - lib_finalize (&kqswnal_lib, NULL, repmsg); + if (error == 0) { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], + msg, PTL_OK); + lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK); + } else { + lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg, + (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + } break; default: @@ -461,7 +488,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status) ktx->ktx_nid, status); kqswnal_notify_peer_down(ktx); - status = -EIO; + status = -EHOSTDOWN; } else if (ktx->ktx_state == KTX_GETTING) { /* RPC completed OK; what did our peer put in the status @@ -651,7 +678,8 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv, int kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, - struct iovec *iov, ptl_kiov_t *kiov, int nob) + struct iovec *iov, ptl_kiov_t *kiov, + int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; char *buffer = (char *)page_address(krx->krx_pages[0]); @@ -682,9 +710,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, /* Ghastly hack part 1, uses the existing procedures to map the source data... */ ktx->ktx_nfrag = 0; if (kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov); + rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov); else - rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov); + rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov); if (rc != 0) { CERROR ("Can't map source data: %d\n", rc); @@ -720,7 +748,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, return (-ECONNABORTED); } -static int +static ptl_err_t kqswnal_sendmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -731,6 +759,7 @@ kqswnal_sendmsg (nal_cb_t *nal, unsigned int payload_niov, struct iovec *payload_iov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { kqswnal_tx_t *ktx; @@ -739,6 +768,7 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM int i; kqsw_csum_t csum; + int sumoff; int sumnob; #endif @@ -802,9 +832,9 @@ kqswnal_sendmsg (nal_cb_t *nal, } /* peer expects RPC completion with GET data */ - rc = kqswnal_dma_reply (ktx, - payload_niov, payload_iov, - payload_kiov, payload_nob); + rc = kqswnal_dma_reply (ktx, payload_niov, + payload_iov, payload_kiov, + payload_offset, payload_nob); if (rc == 0) return (0); @@ -820,22 +850,39 @@ kqswnal_sendmsg (nal_cb_t *nal, #if KQSW_CHECKSUM csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr)); memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum)); - for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) { + for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) { + LASSERT(i < niov); if (payload_kiov != NULL) { ptl_kiov_t *kiov = &payload_kiov[i]; - char *addr = ((char *)kmap (kiov->kiov_page)) + - kiov->kiov_offset; - - csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len)); - sumnob -= kiov->kiov_len; + + if (sumoff >= kiov->kiov_len) { + sumoff -= kiov->kiov_len; + } else { + char *addr = ((char *)kmap (kiov->kiov_page)) + + kiov->kiov_offset + sumoff; + int fragnob = kiov->kiov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + kunmap(kiov->kiov_page); + } } else { struct iovec *iov = &payload_iov[i]; - csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len)); - sumnob -= iov->iov_len; + if (sumoff > iov->iov_len) { + sumoff -= iov->iov_len; + } else { + char *addr = iov->iov_base + sumoff; + int fragnob = iov->iov_len - sumoff; + + csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob)); + sumnob -= fragnob; + sumoff = 0; + } } } - memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum)); + memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif /* Set up first frag from pre-mapped buffer (it's at least the @@ -866,10 +913,10 @@ kqswnal_sendmsg (nal_cb_t *nal, * this (and eat my hat :) */ if ((libmsg->md->options & PTL_MD_KIOV) != 0) - rc = kqswnal_map_tx_kiov (ktx, md->length, + rc = kqswnal_map_tx_kiov (ktx, 0, md->length, md->md_niov, md->md_iov.kiov); else - rc = kqswnal_map_tx_iov (ktx, md->length, + rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); if (rc < 0) { @@ -894,18 +941,20 @@ kqswnal_sendmsg (nal_cb_t *nal, /* copy payload to ktx_buffer, immediately after hdr */ if (payload_kiov != NULL) lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_kiov, payload_nob); + payload_niov, payload_kiov, + payload_offset, payload_nob); else lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE, - payload_niov, payload_iov, payload_nob); + payload_niov, payload_iov, + payload_offset, payload_nob); /* first frag includes payload */ ktx->ktx_frags.iov[0].Len += payload_nob; } else { if (payload_kiov != NULL) - rc = kqswnal_map_tx_kiov (ktx, payload_nob, + rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, payload_niov, payload_kiov); else - rc = kqswnal_map_tx_iov (ktx, payload_nob, + rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); if (rc != 0) { kqswnal_put_idle_tx (ktx); @@ -930,7 +979,7 @@ kqswnal_sendmsg (nal_cb_t *nal, return (PTL_OK); } -static int +static ptl_err_t kqswnal_send (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -940,13 +989,15 @@ kqswnal_send (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, struct iovec *payload_iov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, payload_iov, NULL, payload_nob)); + payload_niov, payload_iov, NULL, + payload_offset, payload_nob)); } -static int +static ptl_err_t kqswnal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, @@ -956,10 +1007,12 @@ kqswnal_send_pages (nal_cb_t *nal, ptl_pid_t pid, unsigned int payload_niov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid, - payload_niov, NULL, payload_kiov, payload_nob)); + payload_niov, NULL, payload_kiov, + payload_offset, payload_nob)); } int kqswnal_fwd_copy_contig = 0; @@ -1009,7 +1062,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) nob <= KQSW_TX_BUFFER_SIZE) { /* send from ktx's pre-allocated/mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob); + lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); ktx->ktx_frags.iov[0].Base = ktx->ktx_ebuffer; /* already mapped */ ktx->ktx_frags.iov[0].Len = nob; ktx->ktx_nfrag = 1; @@ -1019,7 +1072,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { /* zero copy */ ktx->ktx_nfrag = 0; /* no frags mapped yet */ - rc = kqswnal_map_tx_iov (ktx, nob, niov, iov); + rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); if (rc != 0) goto failed; @@ -1079,7 +1132,8 @@ kqswnal_reply_complete (EP_RXD *rxd) krx->krx_rpc_completed = 1; kqswnal_requeue_rx (krx); - lib_finalize (&kqswnal_lib, NULL, msg); + lib_finalize (&kqswnal_lib, NULL, msg, + (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL); kqswnal_put_idle_tx (ktx); } @@ -1288,13 +1342,14 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) } #endif -static int +static ptl_err_t kqswnal_recvmsg (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { @@ -1325,10 +1380,13 @@ kqswnal_recvmsg (nal_cb_t *nal, #endif CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen); - /* What was actually received must be >= payload. - * This is an LASSERT, as lib_finalize() doesn't have a completion status. */ - LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen); + /* What was actually received must be >= payload. */ LASSERT (mlen <= rlen); + if (krx->krx_nob < KQSW_HDR_SIZE + mlen) { + CERROR("Bad message size: have %d, need %d + %d\n", + krx->krx_nob, KQSW_HDR_SIZE, mlen); + return (PTL_FAIL); + } /* It must be OK to kmap() if required */ LASSERT (kiov == NULL || !in_interrupt ()); @@ -1343,20 +1401,37 @@ kqswnal_recvmsg (nal_cb_t *nal, page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); + if (kiov != NULL) { - iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; - iov_nob = kiov->kiov_len; + /* skip complete frags */ + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; + iov_nob = kiov->kiov_len - offset; } else { - iov_ptr = iov->iov_base; - iov_nob = iov->iov_len; + /* skip complete frags */ + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + iov_ptr = iov->iov_base + offset; + iov_nob = iov->iov_len - offset; } - + for (;;) { - /* We expect the iov to exactly match mlen */ - LASSERT (iov_nob <= mlen); - - frag = MIN (page_nob, iov_nob); + frag = mlen; + if (frag > page_nob) + frag = page_nob; + if (frag > iov_nob) + frag = iov_nob; + memcpy (iov_ptr, page_ptr, frag); #if KQSW_CHECKSUM payload_csum = kqsw_csum (payload_csum, iov_ptr, frag); @@ -1415,35 +1490,41 @@ kqswnal_recvmsg (nal_cb_t *nal, "csum_nob %d\n", hdr_csum, payload_csum, csum_frags, csum_nob); #endif - lib_finalize(nal, private, libmsg); + lib_finalize(nal, private, libmsg, PTL_OK); kqswnal_requeue_rx (krx); - return (rlen); + return (PTL_OK); } -static int +static ptl_err_t kqswnal_recv(nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, iov, NULL, + offset, mlen, rlen)); } -static int +static ptl_err_t kqswnal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *libmsg, unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { - return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen)); + return (kqswnal_recvmsg(nal, private, libmsg, + niov, NULL, kiov, + offset, mlen, rlen)); } int diff --git a/lustre/portals/knals/scimacnal/scimacnal_cb.c b/lustre/portals/knals/scimacnal/scimacnal_cb.c index b31c2ea..52afb98 100644 --- a/lustre/portals/knals/scimacnal/scimacnal_cb.c +++ b/lustre/portals/knals/scimacnal/scimacnal_cb.c @@ -176,7 +176,8 @@ kscimacnal_txrelease(mac_mblk_t *msg, mac_msg_status_t status, void *context) break; } - lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie); + lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie, + (err == 0) ? PTL_OK : PTL_FAIL); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); } @@ -225,14 +226,14 @@ kscimacnal_sendmsg(nal_cb_t *nal, if (buf_len > mac_get_mtusize(ksci->ksci_machandle)) { CERROR("kscimacnal:request exceeds TX MTU size (%ld).\n", mac_get_mtusize(ksci->ksci_machandle)); - return -EINVAL; + return PTL_FAIL; } /* save transaction info for later finalize and cleanup */ PORTAL_ALLOC(ktx, (sizeof(kscimacnal_tx_t))); if (!ktx) { - return -ENOMEM; + return PTL_NOSPACE; } ktx->ktx_nmapped = 0; /* Start with no mapped pages :) */ @@ -247,7 +248,7 @@ kscimacnal_sendmsg(nal_cb_t *nal, kscimacnal_txrelease, ktx); if (!msg) { PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return -ENOMEM; + return PTL_NOSPACE; } mac_put_mblk(msg, sizeof(ptl_hdr_t)); lastblk=msg; @@ -284,7 +285,7 @@ kscimacnal_sendmsg(nal_cb_t *nal, if(!newblk) { mac_free_msg(msg); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return -ENOMEM; + return PTL_NOSPACE; } mac_put_mblk(newblk, nob); mac_link_mblk(lastblk, newblk); @@ -315,10 +316,10 @@ kscimacnal_sendmsg(nal_cb_t *nal, CERROR("kscimacnal: mac_send() failed, rc=%d\n", rc); mac_free_msg(msg); PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t))); - return rc; + return PTL_FAIL; } - return 0; + return PTL_OK; } @@ -463,12 +464,15 @@ kscimacnal_recvmsg(nal_cb_t *nal, krx->msg, mlen, rlen, niov); /* What was actually received must be >= what sender claims to have - * sent. This is an LASSERT, since lib-move doesn't check cb return - * code yet. Also, rlen seems to be negative when mlen==0 so don't - * assert on that. - */ - LASSERT (mlen==0 || mac_msg_size(krx->msg) >= sizeof(ptl_hdr_t)+rlen); - LASSERT (mlen==0 || mlen <= rlen); + * sent. */ + LASSERT (mlen <= rlen); /* something is wrong if this isn't true */ + if (mac_msg_size(krx->msg) < sizeof(ptl_hdr_t)+mlen) { + /* We didn't receive everything lib thinks we did */ + CERROR("Bad message size: have %d, need %d + %d\n", + mac_msg_size(krx->msg), sizeof(ptl_hdr_t), mlen); + return (PTL_FAIL); + } + /* It must be OK to kmap() if required */ LASSERT (kiov == NULL || !in_interrupt ()); /* Either all pages or all vaddrs */ @@ -545,12 +549,12 @@ kscimacnal_recvmsg(nal_cb_t *nal, CDEBUG(D_NET, "Calling lib_finalize.\n"); PROF_START(lib_finalize); - lib_finalize(nal, private, cookie); + lib_finalize(nal, private, cookie, PTL_OK); PROF_FINISH(lib_finalize); CDEBUG(D_NET, "Done.\n"); - return rlen; + return PTL_OK; } diff --git a/lustre/portals/knals/socknal/socknal.c b/lustre/portals/knals/socknal/socknal.c index f61a2bc..f583620 100644 --- a/lustre/portals/knals/socknal/socknal.c +++ b/lustre/portals/knals/socknal/socknal.c @@ -56,7 +56,7 @@ static ctl_table ksocknal_ctl_table[] = { &ksocknal_data.ksnd_eager_ack, sizeof (int), 0644, NULL, &proc_dointvec}, #if SOCKNAL_ZC - {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", + {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy", &ksocknal_data.ksnd_zc_min_frag, sizeof (int), 0644, NULL, &proc_dointvec}, #endif @@ -996,15 +996,10 @@ ksocknal_destroy_conn (ksock_conn_t *conn) /* complete current receive if any */ switch (conn->ksnc_rx_state) { case SOCKNAL_RX_BODY: -#if 0 - lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie); -#else - CERROR ("Refusing to complete a partial receive from " - LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid, - conn->ksnc_ipaddr); - CERROR ("This may hang communications and " - "prevent modules from unloading\n"); -#endif + CERROR ("Completing partial receive from "LPX64 + ", ip %08x, with error\n", + conn->ksnc_peer->ksnp_nid, conn->ksnc_ipaddr); + lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL); break; case SOCKNAL_RX_BODY_FWD: ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED); diff --git a/lustre/portals/knals/socknal/socknal_cb.c b/lustre/portals/knals/socknal/socknal_cb.c index 9e04712..dca9818 100644 --- a/lustre/portals/knals/socknal/socknal_cb.c +++ b/lustre/portals/knals/socknal/socknal_cb.c @@ -29,7 +29,7 @@ * LIB functions follow * */ -int +ptl_err_t ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -37,10 +37,10 @@ ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; + return PTL_OK; } -int +ptl_err_t ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -48,20 +48,7 @@ ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; -} - -int -ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - CDEBUG(D_NET, LPX64": callback eq %p ev %p\n", - nal->ni.nid, eq, ev); - - if (eq->event_callback != NULL) - eq->event_callback(ev); - - return 0; + return PTL_OK; } void * @@ -617,7 +604,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch) if (tx->tx_isfwd) { /* was a forwarded packet? */ kpr_fwd_done (&ksocknal_data.ksnd_router, - KSOCK_TX_2_KPR_FWD_DESC (tx), 0); + KSOCK_TX_2_KPR_FWD_DESC (tx), + (tx->tx_resid == 0) ? 0 : -ECONNABORTED); EXIT; return; } @@ -625,7 +613,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch) /* local send */ ltx = KSOCK_TX_2_KSOCK_LTX (tx); - lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie); + lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie, + (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL); ksocknal_free_ltx (ltx); EXIT; @@ -1011,7 +1000,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid) return (-EHOSTUNREACH); } -int +ptl_err_t ksocknal_sendmsg(nal_cb_t *nal, void *private, lib_msg_t *cookie, @@ -1022,6 +1011,7 @@ ksocknal_sendmsg(nal_cb_t *nal, unsigned int payload_niov, struct iovec *payload_iov, ptl_kiov_t *payload_kiov, + size_t payload_offset, size_t payload_nob) { ksock_ltx_t *ltx; @@ -1084,20 +1074,19 @@ ksocknal_sendmsg(nal_cb_t *nal, ltx->ltx_tx.tx_kiov = NULL; ltx->ltx_tx.tx_nkiov = 0; - ltx->ltx_tx.tx_niov = 1 + payload_niov; - - memcpy(ltx->ltx_iov + 1, payload_iov, - payload_niov * sizeof (*payload_iov)); - + ltx->ltx_tx.tx_niov = + 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1], + payload_niov, payload_iov, + payload_offset, payload_nob); } else { /* payload is all pages */ - ltx->ltx_tx.tx_kiov = ltx->ltx_kiov; - ltx->ltx_tx.tx_nkiov = payload_niov; - ltx->ltx_tx.tx_niov = 1; - memcpy(ltx->ltx_kiov, payload_kiov, - payload_niov * sizeof (*payload_kiov)); + ltx->ltx_tx.tx_kiov = ltx->ltx_kiov; + ltx->ltx_tx.tx_nkiov = + lib_extract_kiov(payload_niov, ltx->ltx_kiov, + payload_niov, payload_kiov, + payload_offset, payload_nob); } rc = ksocknal_launch_packet(<x->ltx_tx, nid); @@ -1108,28 +1097,28 @@ ksocknal_sendmsg(nal_cb_t *nal, return (PTL_FAIL); } -int +ptl_err_t ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int payload_niov, struct iovec *payload_iov, - size_t payload_len) + size_t payload_offset, size_t payload_len) { return (ksocknal_sendmsg(nal, private, cookie, hdr, type, nid, pid, payload_niov, payload_iov, NULL, - payload_len)); + payload_offset, payload_len)); } -int +ptl_err_t ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int payload_niov, ptl_kiov_t *payload_kiov, - size_t payload_len) + size_t payload_offset, size_t payload_len) { return (ksocknal_sendmsg(nal, private, cookie, hdr, type, nid, pid, payload_niov, NULL, payload_kiov, - payload_len)); + payload_offset, payload_len)); } void @@ -1576,7 +1565,7 @@ ksocknal_process_receive (ksock_conn_t *conn) case SOCKNAL_RX_BODY: /* payload all received */ - lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie); + lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK); /* Fall through */ case SOCKNAL_RX_SLOP: @@ -1612,9 +1601,10 @@ ksocknal_process_receive (ksock_conn_t *conn) return (-EINVAL); /* keep gcc happy */ } -int +ptl_err_t ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; @@ -1627,20 +1617,22 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_kiov = NULL; - conn->ksnc_rx_niov = niov; conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov; - memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov)); + conn->ksnc_rx_niov = + lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov, + niov, iov, offset, mlen); LASSERT (mlen == lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); - return (rlen); + return (PTL_OK); } -int +ptl_err_t ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen) + unsigned int niov, ptl_kiov_t *kiov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; @@ -1653,15 +1645,16 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_niov = 0; conn->ksnc_rx_iov = NULL; - conn->ksnc_rx_nkiov = niov; conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov; - memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov)); + conn->ksnc_rx_nkiov = + lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov, + niov, kiov, offset, mlen); LASSERT (mlen == lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) + lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov)); - return (rlen); + return (PTL_OK); } int ksocknal_scheduler (void *arg) @@ -2681,7 +2674,6 @@ nal_cb_t ksocknal_lib = { cb_recv_pages: ksocknal_recv_pages, cb_read: ksocknal_read, cb_write: ksocknal_write, - cb_callback: ksocknal_callback, cb_malloc: ksocknal_malloc, cb_free: ksocknal_free, cb_printf: ksocknal_printf, diff --git a/lustre/portals/knals/toenal/toenal_cb.c b/lustre/portals/knals/toenal/toenal_cb.c index 3af9e33..50284b7 100644 --- a/lustre/portals/knals/toenal/toenal_cb.c +++ b/lustre/portals/knals/toenal/toenal_cb.c @@ -37,7 +37,7 @@ long ktoenal_packets_transmitted; * LIB functions follow * */ -int +ptl_err_t ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, size_t len) { @@ -45,10 +45,10 @@ ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; + return PTL_OK; } -int +ptl_err_t ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, size_t len) { @@ -56,20 +56,7 @@ ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, nal->ni.nid, (long)len, src_addr, dst_addr); memcpy( dst_addr, src_addr, len ); - return 0; -} - -int -ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq, - ptl_event_t *ev) -{ - CDEBUG(D_NET, LPX64": callback eq %p ev %p\n", - nal->ni.nid, eq, ev); - - if (eq->event_callback != NULL) - eq->event_callback(ev); - - return 0; + return PTL_OK; } void * @@ -322,7 +309,8 @@ ktoenal_process_transmit (ksock_conn_t *conn, unsigned long *irq_flags) { ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx); - lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie); + lib_finalize (&ktoenal_lib, ltx->ltx_private, + ltx->ltx_cookie, PTL_OK); spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags); @@ -400,10 +388,11 @@ ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx) spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags); } -int +ptl_err_t ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len) + unsigned int payload_niov, struct iovec *payload_iov, + size_t payload_off, size_t payload_len) { ptl_nid_t gatewaynid; ksock_conn_t *conn; @@ -427,6 +416,9 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, payload_niov > 0 ? payload_iov[0].iov_base : NULL, (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid); + /* XXX not implemented read-only iov with offset */ + LBUG(); + if ((conn = ktoenal_get_conn (nid)) == NULL) { /* It's not a peer; try to find a gateway */ @@ -435,14 +427,14 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, if (rc != 0) { CERROR ("Can't route to "LPX64": router error %d\n", nid, rc); - return (-1); + return (PTL_FAIL); } if ((conn = ktoenal_get_conn (gatewaynid)) == NULL) { CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n", nid, gatewaynid); - return (-1); + return (PTL_FAIL); } } @@ -457,7 +449,7 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, { CERROR ("Can't allocate tx desc\n"); ktoenal_put_conn (conn); - return (-1); + return (PTL_FAIL); } /* Init common (to sends and forwards) packet part */ @@ -483,7 +475,7 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie, } ktoenal_launch_packet (conn, <x->ltx_tx); - return (0); + return (PTL_OK); } void @@ -893,7 +885,7 @@ ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags) case SOCKNAL_RX_BODY: atomic_inc (&ktoenal_packets_received); - lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */ + lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie, PTL_OK); /* packet is done now */ /* Fall through */ case SOCKNAL_RX_SLOP: @@ -934,13 +926,17 @@ ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags) list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns); } -int +ptl_err_t ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen) + unsigned int niov, struct iovec *iov, + size_t offset, size_t mlen, size_t rlen) { ksock_conn_t *conn = (ksock_conn_t *)private; int i; + /* XXX not implemented read-only iov with offset */ + LBUG(); + conn->ksnc_cookie = msg; LASSERT (niov <= PTL_MD_MAX_IOV); @@ -954,7 +950,7 @@ ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg, conn->ksnc_rx_nob_wanted = mlen; conn->ksnc_rx_nob_left = rlen; - return (rlen); + return (PTL_OK); } int @@ -1192,7 +1188,6 @@ nal_cb_t ktoenal_lib = { cb_recv: ktoenal_recv, cb_read: ktoenal_read, cb_write: ktoenal_write, - cb_callback: ktoenal_callback, cb_malloc: ktoenal_malloc, cb_free: ktoenal_free, cb_printf: ktoenal_printf, diff --git a/lustre/portals/libcfs/module.c b/lustre/portals/libcfs/module.c index 55e1935..a9d9786 100644 --- a/lustre/portals/libcfs/module.c +++ b/lustre/portals/libcfs/module.c @@ -817,9 +817,11 @@ EXPORT_SYMBOL(PtlMDBind); EXPORT_SYMBOL(lib_iov_nob); EXPORT_SYMBOL(lib_copy_iov2buf); EXPORT_SYMBOL(lib_copy_buf2iov); +EXPORT_SYMBOL(lib_extract_iov); EXPORT_SYMBOL(lib_kiov_nob); EXPORT_SYMBOL(lib_copy_kiov2buf); EXPORT_SYMBOL(lib_copy_buf2kiov); +EXPORT_SYMBOL(lib_extract_kiov); EXPORT_SYMBOL(lib_finalize); EXPORT_SYMBOL(lib_parse); EXPORT_SYMBOL(lib_fake_reply_msg); diff --git a/lustre/portals/portals/api-eq.c b/lustre/portals/portals/api-eq.c index e73d525..964b9d8 100644 --- a/lustre/portals/portals/api-eq.c +++ b/lustre/portals/portals/api-eq.c @@ -81,12 +81,6 @@ int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev) *ev = *new_event; - /* Set the unlinked_me interface number if there is one to pass - * back, since the NAL hasn't a clue what it is and therefore can't - * set it. */ - if (!PtlHandleEqual (ev->unlinked_me, PTL_HANDLE_NONE)) - ev->unlinked_me.nal_idx = eventq.nal_idx; - /* ensure event is delivered correctly despite possible races with lib_finalize */ if (eq->sequence != new_event->sequence) { diff --git a/lustre/portals/portals/api-errno.c b/lustre/portals/portals/api-errno.c index 026c93b..b5e7aa1 100644 --- a/lustre/portals/portals/api-errno.c +++ b/lustre/portals/portals/api-errno.c @@ -50,6 +50,5 @@ const char *ptl_err_str[] = { "PTL_IOV_TOO_SMALL", "PTL_EQ_INUSE", - "PTL_MD_INUSE" }; /* If you change these, you must update the number table in portals/errno.h */ diff --git a/lustre/portals/portals/api-ni.c b/lustre/portals/portals/api-ni.c index b2e069e..18eea91 100644 --- a/lustre/portals/portals/api-ni.c +++ b/lustre/portals/portals/api-ni.c @@ -125,7 +125,7 @@ int PtlNIInit(ptl_interface_t interface, ptl_pt_index_t ptl_size, if (ptl_interfaces[i] == nal) { nal->refct++; handle->nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | i; - fprintf(stderr, "Returning existing NAL (%d)\n", i); + CDEBUG(D_OTHER, "Returning existing NAL (%d)\n", i); ptl_ni_init_mutex_exit (); return PTL_OK; } diff --git a/lustre/portals/portals/api-wrap.c b/lustre/portals/portals/api-wrap.c index e54707f..d23a6aa 100644 --- a/lustre/portals/portals/api-wrap.c +++ b/lustre/portals/portals/api-wrap.c @@ -32,7 +32,7 @@ static int do_forward(ptl_handle_any_t any_h, int cmd, void *argbuf, nal_t *nal; if (!ptl_init) { - fprintf(stderr, "PtlGetId: Not initialized\n"); + CERROR("Not initialized\n"); return PTL_NOINIT; } @@ -262,7 +262,7 @@ static int validate_md(ptl_handle_any_t current_in, ptl_md_t md_in) int i; if (!ptl_init) { - fprintf(stderr, "PtlMDAttach/Bind/Update: Not initialized\n"); + CERROR("PtlMDAttach/Bind/Update: Not initialized\n"); return PTL_NOINIT; } diff --git a/lustre/portals/portals/lib-init.c b/lustre/portals/portals/lib-init.c index ab223d6..d4d8860 100644 --- a/lustre/portals/portals/lib-init.c +++ b/lustre/portals/portals/lib-init.c @@ -38,125 +38,17 @@ # include #endif -#ifdef PTL_USE_SLAB_CACHE -static int ptl_slab_users; - -kmem_cache_t *ptl_md_slab; -kmem_cache_t *ptl_msg_slab; -kmem_cache_t *ptl_me_slab; -kmem_cache_t *ptl_eq_slab; - -atomic_t md_in_use_count; -atomic_t msg_in_use_count; -atomic_t me_in_use_count; -atomic_t eq_in_use_count; - -/* NB zeroing in ctor and on freeing ensures items that - * kmem_cache_validate() OK, but haven't been initialised - * as an MD/ME/EQ can't have valid handles - */ -static void -ptl_md_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_md_t)); -} - -static void -ptl_me_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_me_t)); -} - -static void -ptl_eq_slab_ctor (void *obj, kmem_cache_t *slab, unsigned long flags) -{ - memset (obj, 0, sizeof (lib_eq_t)); -} +#ifndef PTL_USE_LIB_FREELIST int kportal_descriptor_setup (nal_cb_t *nal) { - /* NB on failure caller must still call kportal_descriptor_cleanup */ - /* ****** */ - - /* We'll have 1 set of slabs for ALL the nals :) */ - - if (ptl_slab_users++) - return 0; - - ptl_md_slab = kmem_cache_create("portals_MD", - sizeof(lib_md_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_md_slab_ctor, NULL); - if (!ptl_md_slab) { - CERROR("couldn't allocate ptl_md_t slab"); - RETURN (PTL_NOSPACE); - } - - /* NB no ctor for msgs; they don't need handle verification */ - ptl_msg_slab = kmem_cache_create("portals_MSG", - sizeof(lib_msg_t), 0, - SLAB_HWCACHE_ALIGN, - NULL, NULL); - if (!ptl_msg_slab) { - CERROR("couldn't allocate ptl_msg_t slab"); - RETURN (PTL_NOSPACE); - } - - ptl_me_slab = kmem_cache_create("portals_ME", - sizeof(lib_me_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_me_slab_ctor, NULL); - if (!ptl_me_slab) { - CERROR("couldn't allocate ptl_me_t slab"); - RETURN (PTL_NOSPACE); - } - - ptl_eq_slab = kmem_cache_create("portals_EQ", - sizeof(lib_eq_t), 0, - SLAB_HWCACHE_ALIGN, - ptl_eq_slab_ctor, NULL); - if (!ptl_eq_slab) { - CERROR("couldn't allocate ptl_eq_t slab"); - RETURN (PTL_NOSPACE); - } - - RETURN(PTL_OK); + return PTL_OK; } void kportal_descriptor_cleanup (nal_cb_t *nal) { - int rc; - - if (--ptl_slab_users != 0) - return; - - LASSERT (atomic_read (&md_in_use_count) == 0); - LASSERT (atomic_read (&me_in_use_count) == 0); - LASSERT (atomic_read (&eq_in_use_count) == 0); - LASSERT (atomic_read (&msg_in_use_count) == 0); - - if (ptl_md_slab != NULL) { - rc = kmem_cache_destroy(ptl_md_slab); - if (rc != 0) - CERROR("unable to free MD slab\n"); - } - if (ptl_msg_slab != NULL) { - rc = kmem_cache_destroy(ptl_msg_slab); - if (rc != 0) - CERROR("unable to free MSG slab\n"); - } - if (ptl_me_slab != NULL) { - rc = kmem_cache_destroy(ptl_me_slab); - if (rc != 0) - CERROR("unable to free ME slab\n"); - } - if (ptl_eq_slab != NULL) { - rc = kmem_cache_destroy(ptl_eq_slab); - if (rc != 0) - CERROR("unable to free EQ slab\n"); - } } #else diff --git a/lustre/portals/portals/lib-md.c b/lustre/portals/portals/lib-md.c index be6949c..a1ed583 100644 --- a/lustre/portals/portals/lib-md.c +++ b/lustre/portals/portals/lib-md.c @@ -83,7 +83,7 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private, int rc; int i; - /* NB we are passes an allocated, but uninitialised/active md. + /* NB we are passed an allocated, but uninitialised/active md. * if we return success, caller may lib_md_unlink() it. * otherwise caller may only lib_md_free() it. */ @@ -94,9 +94,10 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private, return PTL_INV_EQ; } - if ((md->options & PTL_MD_IOV) != 0 && /* discontiguous MD */ - md->niov > PTL_MD_MAX_IOV) /* too many fragments */ - return PTL_IOV_TOO_MANY; + /* Must check this _before_ allocation. Also, note that non-iov + * MDs must set md_niov to 0. */ + LASSERT((md->options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0 || + md->niov <= PTL_MD_MAX_IOV); if ((md->options & max_size_opts) != 0 && /* max size used */ (md->max_size < 0 || md->max_size > md->length)) // illegal max_size @@ -239,7 +240,11 @@ int do_PtlMDAttach(nal_cb_t * nal, void *private, void *v_args, void *v_ret) lib_md_t *md; unsigned long flags; - md = lib_md_alloc (nal); + if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 && + args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */ + return (ret->rc = PTL_IOV_TOO_MANY); + + md = lib_md_alloc(nal, &args->md_in); if (md == NULL) return (ret->rc = PTL_NOSPACE); @@ -287,7 +292,11 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret) lib_md_t *md; unsigned long flags; - md = lib_md_alloc (nal); + if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 && + args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */ + return (ret->rc = PTL_IOV_TOO_MANY); + + md = lib_md_alloc(nal, &args->md_in); if (md == NULL) return (ret->rc = PTL_NOSPACE); @@ -311,34 +320,43 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret) int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *v_args, void *v_ret) { - PtlMDUnlink_in *args = v_args; + PtlMDUnlink_in *args = v_args; PtlMDUnlink_out *ret = v_ret; - - lib_md_t *md; - unsigned long flags; + ptl_event_t ev; + lib_md_t *md; + unsigned long flags; state_lock(nal, &flags); md = ptl_handle2md(&args->md_in, nal); if (md == NULL) { - ret->rc = PTL_INV_MD; - } else if (md->pending != 0) { /* being filled/spilled */ - ret->rc = PTL_MD_INUSE; - } else { - /* Callers attempting to unlink a busy MD which will get - * unlinked once the net op completes should see INUSE, - * before completion and INV_MD thereafter. LASSERT we've - * got that right... */ - LASSERT ((md->md_flags & PTL_MD_FLAG_UNLINK) == 0); - - lib_md_deconstruct(nal, md, &ret->status_out); - lib_md_unlink(nal, md); - ret->rc = PTL_OK; + state_unlock(nal, &flags); + return (ret->rc = PTL_INV_MD); + } + + /* If the MD is busy, lib_md_unlink just marks it for deletion, and + * when the NAL is done, the completion event flags that the MD was + * unlinked. Otherwise, we enqueue an event now... */ + + if (md->eq != NULL && + md->pending == 0) { + memset(&ev, 0, sizeof(ev)); + + ev.type = PTL_EVENT_UNLINK; + ev.status = PTL_OK; + ev.unlinked = 1; + lib_md_deconstruct(nal, md, &ev.mem_desc); + + lib_enq_event_locked(nal, private, md->eq, &ev); } + lib_md_deconstruct(nal, md, &ret->status_out); + lib_md_unlink(nal, md); + ret->rc = PTL_OK; + state_unlock(nal, &flags); - return (ret->rc); + return (PTL_OK); } int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args, @@ -379,6 +397,23 @@ int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args, goto out; } + /* XXX fttb, the new MD must be the same type wrt fragmentation */ + if (((new->options ^ md->options) & + (PTL_MD_IOV | PTL_MD_KIOV)) != 0) { + ret->rc = PTL_INV_MD; + goto out; + } + + if (new->niov > md->md_niov) { + ret->rc = PTL_IOV_TOO_MANY; + goto out; + } + + if (new->niov < md->md_niov) { + ret->rc = PTL_IOV_TOO_SMALL; + goto out; + } + if (!PtlHandleEqual (args->testq_in, PTL_EQ_NONE)) { test_eq = ptl_handle2eq(&args->testq_in, nal); if (test_eq == NULL) { diff --git a/lustre/portals/portals/lib-move.c b/lustre/portals/portals/lib-move.c index 6e904ba..b0a2c947 100644 --- a/lustre/portals/portals/lib-move.c +++ b/lustre/portals/portals/lib-move.c @@ -258,55 +258,78 @@ lib_iov_nob (int niov, struct iovec *iov) } void -lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len) +lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, + ptl_size_t offset, ptl_size_t len) { ptl_size_t nob; - while (len > 0) - { + if (len == 0) + return; + + /* skip complete frags before 'offset' */ + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; + LASSERT (niov > 0); + } + + do { LASSERT (niov > 0); - nob = MIN (iov->iov_len, len); - memcpy (dest, iov->iov_base, nob); + nob = MIN (iov->iov_len - offset, len); + memcpy (dest, iov->iov_base + offset, nob); len -= nob; dest += nob; niov--; iov++; - } + offset = 0; + } while (len > 0); } void -lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len) +lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, + char *src, ptl_size_t len) { ptl_size_t nob; - while (len > 0) - { + if (len == 0) + return; + + /* skip complete frags before 'offset' */ + LASSERT (niov > 0); + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + iov++; + niov--; LASSERT (niov > 0); - nob = MIN (iov->iov_len, len); - memcpy (iov->iov_base, src, nob); + } + + do { + LASSERT (niov > 0); + nob = MIN (iov->iov_len - offset, len); + memcpy (iov->iov_base + offset, src, nob); len -= nob; src += nob; niov--; iov++; - } + offset = 0; + } while (len > 0); } -static int -lib_extract_iov (struct iovec *dst, lib_md_t *md, +int +lib_extract_iov (int dst_niov, struct iovec *dst, + int src_niov, struct iovec *src, ptl_size_t offset, ptl_size_t len) { /* Initialise 'dst' to the subset of 'src' starting at 'offset', * for exactly 'len' bytes, and return the number of entries. * NB not destructive to 'src' */ - int src_niov = md->md_niov; - struct iovec *src = md->md_iov.iov; ptl_size_t frag_len; - int dst_niov; + int niov; - LASSERT (offset + len <= md->length); - if (len == 0) /* no data => */ return (0); /* no frags */ @@ -318,17 +341,17 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md, LASSERT (src_niov > 0); } - dst_niov = 1; + niov = 1; for (;;) { LASSERT (src_niov > 0); - LASSERT (dst_niov <= PTL_MD_MAX_IOV); + LASSERT (niov <= dst_niov); frag_len = src->iov_len - offset; dst->iov_base = ((char *)src->iov_base) + offset; if (len <= frag_len) { dst->iov_len = len; - return (dst_niov); + return (niov); } dst->iov_len = frag_len; @@ -336,7 +359,7 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md, len -= frag_len; dst++; src++; - dst_niov++; + niov++; src_niov--; offset = 0; } @@ -351,19 +374,22 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov) } void -lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) +lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len) { LASSERT (0); } void -lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len) +lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len) { LASSERT (0); } -static int -lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, +int +lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, ptl_size_t offset, ptl_size_t len) { LASSERT (0); @@ -383,18 +409,30 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov) } void -lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) +lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, + ptl_size_t offset, ptl_size_t len) { ptl_size_t nob; char *addr; + + if (len == 0) + return; LASSERT (!in_interrupt ()); - while (len > 0) - { + + LASSERT (niov > 0); + while (offset > kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + + do{ LASSERT (niov > 0); - nob = MIN (kiov->kiov_len, len); + nob = MIN (kiov->kiov_len - offset, len); - addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; memcpy (dest, addr, nob); kunmap (kiov->kiov_page); @@ -402,22 +440,35 @@ lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len) dest += nob; niov--; kiov++; - } + offset = 0; + } while (len > 0); } void -lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len) +lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset, + char *src, ptl_size_t len) { ptl_size_t nob; char *addr; + if (len == 0) + return; + LASSERT (!in_interrupt ()); - while (len > 0) - { + + LASSERT (niov > 0); + while (offset >= kiov->kiov_len) { + offset -= kiov->kiov_len; + kiov++; + niov--; + LASSERT (niov > 0); + } + + do { LASSERT (niov > 0); - nob = MIN (kiov->kiov_len, len); + nob = MIN (kiov->kiov_len - offset, len); - addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset; + addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset; memcpy (addr, src, nob); kunmap (kiov->kiov_page); @@ -425,23 +476,21 @@ lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len) src += nob; niov--; kiov++; - } + offset = 0; + } while (len > 0); } -static int -lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, +int +lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, + int src_niov, ptl_kiov_t *src, ptl_size_t offset, ptl_size_t len) { /* Initialise 'dst' to the subset of 'src' starting at 'offset', * for exactly 'len' bytes, and return the number of entries. * NB not destructive to 'src' */ - int src_niov = md->md_niov; - ptl_kiov_t *src = md->md_iov.kiov; ptl_size_t frag_len; - int dst_niov; + int niov; - LASSERT (offset + len <= md->length); - if (len == 0) /* no data => */ return (0); /* no frags */ @@ -453,10 +502,10 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, LASSERT (src_niov > 0); } - dst_niov = 1; + niov = 1; for (;;) { LASSERT (src_niov > 0); - LASSERT (dst_niov <= PTL_MD_MAX_IOV); + LASSERT (niov <= dst_niov); frag_len = src->kiov_len - offset; dst->kiov_page = src->kiov_page; @@ -465,7 +514,7 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, if (len <= frag_len) { dst->kiov_len = len; LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE); - return (dst_niov); + return (niov); } dst->kiov_len = frag_len; @@ -474,73 +523,66 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md, len -= frag_len; dst++; src++; - dst_niov++; + niov++; src_niov--; offset = 0; } } #endif -void +ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md, ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen) { - int niov; - if (mlen == 0) - nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen); - else if ((md->options & PTL_MD_KIOV) == 0) { - niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen); - nal->cb_recv (nal, private, msg, - niov, msg->msg_iov.iov, mlen, rlen); - } else { - niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen); - nal->cb_recv_pages (nal, private, msg, - niov, msg->msg_iov.kiov, mlen, rlen); - } + return (nal->cb_recv(nal, private, msg, + 0, NULL, + offset, mlen, rlen)); + + if ((md->options & PTL_MD_KIOV) == 0) + return (nal->cb_recv(nal, private, msg, + md->md_niov, md->md_iov.iov, + offset, mlen, rlen)); + + return (nal->cb_recv_pages(nal, private, msg, + md->md_niov, md->md_iov.kiov, + offset, mlen, rlen)); } -int +ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, lib_md_t *md, ptl_size_t offset, ptl_size_t len) { - int niov; - if (len == 0) - return (nal->cb_send (nal, private, msg, - hdr, type, nid, pid, - 0, NULL, 0)); + return (nal->cb_send(nal, private, msg, + hdr, type, nid, pid, + 0, NULL, + offset, len)); - if ((md->options & PTL_MD_KIOV) == 0) { - niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len); - return (nal->cb_send (nal, private, msg, - hdr, type, nid, pid, - niov, msg->msg_iov.iov, len)); - } - - niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len); - return (nal->cb_send_pages (nal, private, msg, - hdr, type, nid, pid, - niov, msg->msg_iov.kiov, len)); + if ((md->options & PTL_MD_KIOV) == 0) + return (nal->cb_send(nal, private, msg, + hdr, type, nid, pid, + md->md_niov, md->md_iov.iov, + offset, len)); + + return (nal->cb_send_pages(nal, private, msg, + hdr, type, nid, pid, + md->md_niov, md->md_iov.kiov, + offset, len)); } -static lib_msg_t * -get_new_msg (nal_cb_t *nal, lib_md_t *md) +static void +lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg) { /* ALWAYS called holding the state_lock */ lib_counters_t *counters = &nal->ni.counters; - lib_msg_t *msg = lib_msg_alloc (nal); - - if (msg == NULL) - return (NULL); - - memset (msg, 0, sizeof (*msg)); - - msg->send_ack = 0; + /* Here, we commit the MD to a network OP by marking it busy and + * decrementing its threshold. Come what may, the network "owns" + * the MD until a call to lib_finalize() signals completion. */ msg->md = md; - do_gettimeofday(&msg->ev.arrival_time); + md->pending++; if (md->threshold != PTL_MD_THRESH_INF) { LASSERT (md->threshold > 0); @@ -552,8 +594,24 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md) counters->msgs_max = counters->msgs_alloc; list_add (&msg->msg_list, &nal->ni.ni_active_msgs); +} - return (msg); +static void +lib_drop_message (nal_cb_t *nal, void *private, ptl_hdr_t *hdr) +{ + unsigned long flags; + + /* CAVEAT EMPTOR: this only drops messages that we've not committed + * to receive (init_msg() not called) and therefore can't cause an + * event. */ + + state_lock(nal, &flags); + nal->ni.counters.drop_count++; + nal->ni.counters.drop_length += hdr->payload_length; + state_unlock(nal, &flags); + + /* NULL msg => if NAL calls lib_finalize it will be a noop */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); } /* @@ -563,17 +621,18 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md) * of long messages. * */ -static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; ptl_size_t mlength = 0; ptl_size_t offset = 0; int unlink = 0; + ptl_err_t rc; lib_me_t *me; lib_md_t *md; - lib_msg_t *msg; unsigned long flags; - + /* Convert put fields to host byte order */ hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits); hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index); @@ -586,8 +645,10 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->payload_length, hdr->msg.put.offset, hdr->msg.put.match_bits, &mlength, &offset, &unlink); - if (me == NULL) - goto drop; + if (me == NULL) { + state_unlock(nal, &flags); + return (PTL_FAIL); + } md = me->md; CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d " @@ -595,69 +656,46 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, md->md_lh.lh_cookie, md->md_niov, offset); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); + + msg->ev.type = PTL_EVENT_PUT; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.portal = hdr->msg.put.ptl_index; + msg->ev.match_bits = hdr->msg.put.match_bits; + msg->ev.rlength = hdr->payload_length; + msg->ev.mlength = mlength; + msg->ev.offset = offset; + msg->ev.hdr_data = hdr->msg.put.hdr_data; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) && !(md->options & PTL_MD_ACK_DISABLE)) { - msg->send_ack = 1; msg->ack_wmd = hdr->msg.put.ack_wmd; - msg->nid = hdr->src_nid; - msg->pid = hdr->src_pid; - msg->ev.match_bits = hdr->msg.put.match_bits; - } - - if (md->eq) { - msg->ev.type = PTL_EVENT_PUT; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.portal = hdr->msg.put.ptl_index; - msg->ev.match_bits = hdr->msg.put.match_bits; - msg->ev.rlength = hdr->payload_length; - msg->ev.mlength = mlength; - msg->ev.offset = offset; - msg->ev.hdr_data = hdr->msg.put.hdr_data; - - /* NB if this match has exhausted the MD, we can't be sure - * that this event will the the last one associated with - * this MD in the event queue (another message already - * matching this ME/MD could end up being last). So we - * remember the ME handle anyway and check again when we're - * allocating our slot in the event queue. - */ - ptl_me2handle (&msg->ev.unlinked_me, me); - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); } ni->counters.recv_count++; ni->counters.recv_length += mlength; - /* only unlink after MD's pending count has been bumped - * in get_new_msg() otherwise lib_me_unlink() will nuke it */ - if (unlink) { - md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED; + /* only unlink after MD's pending count has been bumped in + * lib_commit_md() otherwise lib_me_unlink() will nuke it */ + if (unlink) lib_me_unlink (nal, me); - } state_unlock(nal, &flags); - lib_recv (nal, private, msg, md, offset, mlength, hdr->payload_length); - return 0; + rc = lib_recv(nal, private, msg, md, offset, mlength, + hdr->payload_length); + if (rc != PTL_OK) + CERROR(LPU64": error on receiving PUT from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); - drop: - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + return (rc); } -static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; ptl_size_t mlength = 0; @@ -665,7 +703,6 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) int unlink = 0; lib_me_t *me; lib_md_t *md; - lib_msg_t *msg; ptl_hdr_t reply; unsigned long flags; int rc; @@ -683,8 +720,10 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->msg.get.sink_length, hdr->msg.get.src_offset, hdr->msg.get.match_bits, &mlength, &offset, &unlink); - if (me == NULL) - goto drop; + if (me == NULL) { + state_unlock(nal, &flags); + return (PTL_FAIL); + } md = me->md; CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d " @@ -692,45 +731,27 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, md->md_lh.lh_cookie, md->md_niov, offset); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_GET; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.portal = hdr->msg.get.ptl_index; - msg->ev.match_bits = hdr->msg.get.match_bits; - msg->ev.rlength = hdr->payload_length; - msg->ev.mlength = mlength; - msg->ev.offset = offset; - msg->ev.hdr_data = 0; - - /* NB if this match has exhausted the MD, we can't be sure - * that this event will the the last one associated with - * this MD in the event queue (another message already - * matching this ME/MD could end up being last). So we - * remember the ME handle anyway and check again when we're - * allocating our slot in the event queue. - */ - ptl_me2handle (&msg->ev.unlinked_me, me); - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + msg->ev.type = PTL_EVENT_GET; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.portal = hdr->msg.get.ptl_index; + msg->ev.match_bits = hdr->msg.get.match_bits; + msg->ev.rlength = hdr->payload_length; + msg->ev.mlength = mlength; + msg->ev.offset = offset; + msg->ev.hdr_data = 0; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.send_count++; ni->counters.send_length += mlength; - /* only unlink after MD's refcount has been bumped - * in get_new_msg() otherwise lib_me_unlink() will nuke it */ - if (unlink) { - md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED; + /* only unlink after MD's refcount has been bumped in + * lib_commit_md() otherwise lib_me_unlink() will nuke it */ + if (unlink) lib_me_unlink (nal, me); - } state_unlock(nal, &flags); @@ -746,36 +767,25 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, hdr->src_nid, hdr->src_pid, md, offset, mlength); - if (rc != PTL_OK) { - CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n", - ni->nid, hdr->src_nid); - /* Hmm, this will create a GET event and make believe - * the reply completed, which it kind of did, only the - * source won't get her reply */ - lib_finalize (nal, private, msg); - state_lock (nal, &flags); - goto drop; - } + if (rc != PTL_OK) + CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); + + /* Discard any junk after the hdr */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); - /* Complete the incoming message */ - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); return (rc); - drop: - ni->counters.drop_count++; - ni->counters.drop_length += hdr->msg.get.sink_length; - state_unlock(nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; } -static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { lib_ni_t *ni = &nal->ni; lib_md_t *md; int rlength; int length; - lib_msg_t *msg; unsigned long flags; + ptl_err_t rc; state_lock(nal, &flags); @@ -787,7 +797,9 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) md == NULL ? "invalid" : "inactive", hdr->msg.reply.dst_wmd.wh_interface_cookie, hdr->msg.reply.dst_wmd.wh_object_cookie); - goto drop; + + state_unlock(nal, &flags); + return (PTL_FAIL); } LASSERT (md->offset == 0); @@ -801,7 +813,8 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) ni->nid, hdr->src_nid, length, hdr->msg.reply.dst_wmd.wh_object_cookie, md->length); - goto drop; + state_unlock(nal, &flags); + return (PTL_FAIL); } length = md->length; } @@ -810,46 +823,36 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) hdr->src_nid, length, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping REPLY from "LPU64": can't " - "allocate msg\n", ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_REPLY; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.rlength = rlength; - msg->ev.mlength = length; - msg->ev.offset = 0; + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.rlength = rlength; + msg->ev.mlength = length; + msg->ev.offset = 0; - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.recv_count++; ni->counters.recv_length += length; state_unlock(nal, &flags); - lib_recv (nal, private, msg, md, 0, length, rlength); - return 0; + rc = lib_recv(nal, private, msg, md, 0, length, rlength); + if (rc != PTL_OK) + CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n", + ni->nid, hdr->src_nid, rc); - drop: - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + return (rc); } -static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +static ptl_err_t +parse_ack(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg) { - lib_ni_t *ni = &nal->ni; - lib_md_t *md; - lib_msg_t *msg = NULL; - unsigned long flags; + lib_ni_t *ni = &nal->ni; + lib_md_t *md; + unsigned long flags; /* Convert ack fields to host byte order */ hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits); @@ -865,40 +868,37 @@ static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) (md == NULL) ? "invalid" : "inactive", hdr->msg.ack.dst_wmd.wh_interface_cookie, hdr->msg.ack.dst_wmd.wh_object_cookie); - goto drop; + + state_unlock(nal, &flags); + return (PTL_FAIL); } CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n", ni->nid, hdr->src_nid, hdr->msg.ack.dst_wmd.wh_object_cookie); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n", - ni->nid, hdr->src_nid); - goto drop; - } + lib_commit_md(nal, md, msg); - if (md->eq) { - msg->ev.type = PTL_EVENT_ACK; - msg->ev.initiator.nid = hdr->src_nid; - msg->ev.initiator.pid = hdr->src_pid; - msg->ev.mlength = hdr->msg.ack.mlength; - msg->ev.match_bits = hdr->msg.ack.match_bits; + msg->ev.type = PTL_EVENT_ACK; + msg->ev.initiator.nid = hdr->src_nid; + msg->ev.initiator.pid = hdr->src_pid; + msg->ev.mlength = hdr->msg.ack.mlength; + msg->ev.match_bits = hdr->msg.ack.match_bits; - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); ni->counters.recv_count++; - state_unlock(nal, &flags); - lib_recv (nal, private, msg, NULL, 0, 0, hdr->payload_length); - return 0; - drop: - nal->ni.counters.drop_count++; - state_unlock (nal, &flags); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return -1; + state_unlock(nal, &flags); + + /* We have received and matched up the ack OK, create the + * completion event now... */ + lib_finalize(nal, private, msg, PTL_OK); + + /* ...and now discard any junk after the hdr */ + (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length); + + return (PTL_OK); } static char * @@ -980,10 +980,13 @@ void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr) } /* end of print_hdr() */ -int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) +void +lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private) { unsigned long flags; - + ptl_err_t rc; + lib_msg_t *msg; + /* convert common fields to host byte order */ hdr->dest_nid = NTOH__u64 (hdr->dest_nid); hdr->src_nid = NTOH__u64 (hdr->src_nid); @@ -1005,22 +1008,16 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) nal->ni.nid, mv->magic, mv->version_major, mv->version_minor, hdr->src_nid); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } if (hdr->dest_nid != nal->ni.nid) { CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64 " (not me)\n", nal->ni.nid, hdr_type_string (hdr), hdr->src_nid, hdr->dest_nid); - - state_lock (nal, &flags); - nal->ni.counters.drop_count++; - nal->ni.counters.drop_length += hdr->payload_length; - state_unlock (nal, &flags); - - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ @@ -1030,34 +1027,59 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private) ": simulated failure\n", nal->ni.nid, hdr_type_string (hdr), hdr->src_nid); - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + lib_drop_message(nal, private, hdr); + return; } - + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping incoming %s from "LPU64 + ": can't allocate a lib_msg_t\n", + nal->ni.nid, hdr_type_string (hdr), + hdr->src_nid); + lib_drop_message(nal, private, hdr); + return; + } + + do_gettimeofday(&msg->ev.arrival_time); + switch (hdr->type) { case PTL_MSG_ACK: - return (parse_ack(nal, hdr, private)); + rc = parse_ack(nal, hdr, private, msg); + break; case PTL_MSG_PUT: - return (parse_put(nal, hdr, private)); + rc = parse_put(nal, hdr, private, msg); break; case PTL_MSG_GET: - return (parse_get(nal, hdr, private)); + rc = parse_get(nal, hdr, private, msg); break; case PTL_MSG_REPLY: - return (parse_reply(nal, hdr, private)); + rc = parse_reply(nal, hdr, private, msg); break; default: CERROR(LPU64": Dropping message from "LPU64 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid, hdr->type); - - lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length); - return (-1); + rc = PTL_FAIL; + break; + } + + if (rc != PTL_OK) { + if (msg->md != NULL) { + /* committed... */ + lib_finalize(nal, private, msg, rc); + } else { + state_lock(nal, &flags); + lib_msg_free(nal, msg); /* expects state_lock held */ + state_unlock(nal, &flags); + + lib_drop_message(nal, private, hdr); + } } } - -int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) +int +do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret) { /* * Incoming: @@ -1072,16 +1094,15 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) * Outgoing: */ - PtlPut_in *args = v_args; - PtlPut_out *ret = v_ret; - ptl_hdr_t hdr; - - lib_ni_t *ni = &nal->ni; - lib_md_t *md; - lib_msg_t *msg = NULL; + PtlPut_in *args = v_args; ptl_process_id_t *id = &args->target_in; - unsigned long flags; - int rc; + PtlPut_out *ret = v_ret; + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + ptl_hdr_t hdr; + lib_md_t *md; + unsigned long flags; + int rc; if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ fail_peer (nal, id->nid, 1)) /* shall we now? */ @@ -1090,13 +1111,22 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) nal->ni.nid, id->nid); return (ret->rc = PTL_INV_PROC); } - - ret->rc = PTL_OK; + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n", + ni->nid, id->nid); + return (ret->rc = PTL_NOSPACE); + } + state_lock(nal, &flags); + md = ptl_handle2md(&args->md_in, nal); - if (md == NULL || !md->threshold) { + if (md == NULL || md->threshold == 0) { + lib_msg_free(nal, msg); state_unlock(nal, &flags); - return ret->rc = PTL_INV_MD; + + return (ret->rc = PTL_INV_MD); } CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid, @@ -1123,57 +1153,39 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret) hdr.msg.put.offset = HTON__u32 (args->offset_in); hdr.msg.put.hdr_data = args->hdr_data_in; + lib_commit_md(nal, md, msg); + + msg->ev.type = PTL_EVENT_SENT; + msg->ev.initiator.nid = ni->nid; + msg->ev.initiator.pid = ni->pid; + msg->ev.portal = args->portal_in; + msg->ev.match_bits = args->match_bits_in; + msg->ev.rlength = md->length; + msg->ev.mlength = md->length; + msg->ev.offset = args->offset_in; + msg->ev.hdr_data = args->hdr_data_in; + + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); + ni->counters.send_count++; ni->counters.send_length += md->length; - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR("BAD: could not allocate msg!\n"); - state_unlock(nal, &flags); - return ret->rc = PTL_NOSPACE; - } - - /* - * If this memory descriptor has an event queue associated with - * it we need to allocate a message state object and record the - * information about this operation that will be recorded into - * event queue once the message has been completed. - * - * NB. We're now committed to the GET, since we just marked the MD - * busy. Callers who observe this (by getting PTL_MD_INUSE from - * PtlMDUnlink()) expect a completion event to tell them when the - * MD becomes idle. - */ - if (md->eq) { - msg->ev.type = PTL_EVENT_SENT; - msg->ev.initiator.nid = ni->nid; - msg->ev.initiator.pid = ni->pid; - msg->ev.portal = args->portal_in; - msg->ev.match_bits = args->match_bits_in; - msg->ev.rlength = md->length; - msg->ev.mlength = md->length; - msg->ev.offset = args->offset_in; - msg->ev.hdr_data = args->hdr_data_in; - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } - state_unlock(nal, &flags); rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT, id->nid, id->pid, md, 0, md->length); if (rc != PTL_OK) { - /* get_new_msg() committed us to sending by decrementing - * md->threshold, so we have to act like we did send, but - * the network dropped it. */ - lib_finalize (nal, private, msg); + CERROR(LPU64": error sending PUT to "LPU64": %d\n", + ni->nid, id->nid, rc); + lib_finalize (nal, private, msg, rc); } + /* completion will be signalled by an event */ return ret->rc = PTL_OK; } -lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, - lib_md_t *getmd) +lib_msg_t * +lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd) { /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This * returns a msg the NAL can pass to lib_finalize() so that a REPLY @@ -1185,39 +1197,38 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, * lib_finalize() of the original GET. */ lib_ni_t *ni = &nal->ni; - lib_msg_t *msg; + lib_msg_t *msg = lib_msg_alloc(nal); unsigned long flags; state_lock(nal, &flags); LASSERT (getmd->pending > 0); + if (msg == NULL) { + CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n", + peer_nid); + goto drop; + } + if (getmd->threshold == 0) { CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n", peer_nid, getmd); - goto drop; + goto drop_msg; } LASSERT (getmd->offset == 0); CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd); - msg = get_new_msg (nal, getmd); - if (msg == NULL) { - CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n", - peer_nid, getmd); - goto drop; - } + lib_commit_md (nal, getmd, msg); - if (getmd->eq) { - msg->ev.type = PTL_EVENT_REPLY; - msg->ev.initiator.nid = peer_nid; - msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ - msg->ev.rlength = msg->ev.mlength = getmd->length; - msg->ev.offset = 0; + msg->ev.type = PTL_EVENT_REPLY; + msg->ev.initiator.nid = peer_nid; + msg->ev.initiator.pid = 0; /* XXX FIXME!!! */ + msg->ev.rlength = msg->ev.mlength = getmd->length; + msg->ev.offset = 0; - lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc); ni->counters.recv_count++; ni->counters.recv_length += getmd->length; @@ -1225,7 +1236,9 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, state_unlock(nal, &flags); return msg; - + + drop_msg: + lib_msg_free(nal, msg); drop: nal->ni.counters.drop_count++; nal->ni.counters.drop_length += getmd->length; @@ -1235,7 +1248,8 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, return NULL; } -int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) +int +do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret) { /* * Incoming: @@ -1249,15 +1263,15 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) * Outgoing: */ - PtlGet_in *args = v_args; - PtlGet_out *ret = v_ret; - ptl_hdr_t hdr; - lib_msg_t *msg = NULL; - lib_ni_t *ni = &nal->ni; + PtlGet_in *args = v_args; ptl_process_id_t *id = &args->target_in; - lib_md_t *md; - unsigned long flags; - int rc; + PtlGet_out *ret = v_ret; + lib_ni_t *ni = &nal->ni; + lib_msg_t *msg; + ptl_hdr_t hdr; + lib_md_t *md; + unsigned long flags; + int rc; if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */ fail_peer (nal, id->nid, 1)) /* shall we now? */ @@ -1266,16 +1280,24 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) nal->ni.nid, id->nid); return (ret->rc = PTL_INV_PROC); } - + + msg = lib_msg_alloc(nal); + if (msg == NULL) { + CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n", + ni->nid, id->nid); + return (ret->rc = PTL_NOSPACE); + } + state_lock(nal, &flags); + md = ptl_handle2md(&args->md_in, nal); if (md == NULL || !md->threshold) { + lib_msg_free(nal, msg); state_unlock(nal, &flags); + return ret->rc = PTL_INV_MD; } - LASSERT (md->offset == 0); - CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid, (unsigned long)id->pid); @@ -1296,51 +1318,33 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret) hdr.msg.get.src_offset = HTON__u32 (args->offset_in); hdr.msg.get.sink_length = HTON__u32 (md->length); - ni->counters.send_count++; + lib_commit_md(nal, md, msg); - msg = get_new_msg (nal, md); - if (msg == NULL) { - CERROR("do_PtlGet: BAD - could not allocate cookie!\n"); - state_unlock(nal, &flags); - return ret->rc = PTL_NOSPACE; - } + msg->ev.type = PTL_EVENT_SENT; + msg->ev.initiator.nid = ni->nid; + msg->ev.initiator.pid = ni->pid; + msg->ev.portal = args->portal_in; + msg->ev.match_bits = args->match_bits_in; + msg->ev.rlength = md->length; + msg->ev.mlength = md->length; + msg->ev.offset = args->offset_in; + msg->ev.hdr_data = 0; - /* - * If this memory descriptor has an event queue associated with - * it we must allocate a message state object that will record - * the information to be filled in once the message has been - * completed. More information is in the do_PtlPut() comments. - * - * NB. We're now committed to the GET, since we just marked the MD - * busy. Callers who observe this (by getting PTL_MD_INUSE from - * PtlMDUnlink()) expect a completion event to tell them when the - * MD becomes idle. - */ - if (md->eq) { - msg->ev.type = PTL_EVENT_SENT; - msg->ev.initiator.nid = ni->nid; - msg->ev.initiator.pid = ni->pid; - msg->ev.portal = args->portal_in; - msg->ev.match_bits = args->match_bits_in; - msg->ev.rlength = md->length; - msg->ev.mlength = md->length; - msg->ev.offset = args->offset_in; - msg->ev.hdr_data = 0; - - lib_md_deconstruct(nal, md, &msg->ev.mem_desc); - } + lib_md_deconstruct(nal, md, &msg->ev.mem_desc); + + ni->counters.send_count++; state_unlock(nal, &flags); rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET, id->nid, id->pid, NULL, 0, 0); if (rc != PTL_OK) { - /* get_new_msg() committed us to sending by decrementing - * md->threshold, so we have to act like we did send, but - * the network dropped it. */ - lib_finalize (nal, private, msg); + CERROR(LPU64": error sending GET to "LPU64": %d\n", + ni->nid, id->nid, rc); + lib_finalize (nal, private, msg, rc); } + /* completion will be signalled by an event */ return ret->rc = PTL_OK; } diff --git a/lustre/portals/portals/lib-msg.c b/lustre/portals/portals/lib-msg.c index 9840ff5..36be55c 100644 --- a/lustre/portals/portals/lib-msg.c +++ b/lustre/portals/portals/lib-msg.c @@ -32,32 +32,81 @@ #include -int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) +void +lib_enq_event_locked (nal_cb_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev) { - lib_md_t *md; - lib_eq_t *eq; + ptl_event_t *eq_slot; int rc; + + ev->sequence = eq->sequence++; /* Allocate the next queue slot */ + + /* size must be a power of 2 to handle a wrapped sequence # */ + LASSERT (eq->size != 0 && + eq->size == LOWEST_BIT_SET (eq->size)); + eq_slot = eq->base + (ev->sequence & (eq->size - 1)); + + /* Copy the event into the allocated slot, ensuring all the rest of + * the event's contents have been copied _before_ the sequence + * number gets updated. A processes 'getting' an event waits on + * the next queue slot's sequence to be 'new'. When it is, _all_ + * other event fields had better be consistent. I assert + * 'sequence' is the last member, so I only need a 2 stage copy. */ + + LASSERT(sizeof (ptl_event_t) == + offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); + + rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, + offsetof (ptl_event_t, sequence)); + LASSERT (rc == PTL_OK); + +#ifdef __KERNEL__ + barrier(); +#endif + /* Updating the sequence number is what makes the event 'new' NB if + * the cb_write below isn't atomic, this could cause a race with + * PtlEQGet */ + rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, + (void *)&ev->sequence,sizeof (ev->sequence)); + LASSERT (rc == PTL_OK); + +#ifdef __KERNEL__ + barrier(); +#endif + + if (nal->cb_callback != NULL) + nal->cb_callback(nal, private, eq, ev); + else if (eq->event_callback != NULL) + eq->event_callback(ev); +} + +void +lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, int status) +{ + lib_md_t *md; + int unlink; unsigned long flags; + int rc; + ptl_hdr_t ack; /* ni went down while processing this message */ - if (nal->ni.up == 0) { - return -1; - } + if (nal->ni.up == 0) + return; if (msg == NULL) - return 0; + return; - rc = 0; - if (msg->send_ack) { - ptl_hdr_t ack; + /* Only send an ACK if the PUT completed successfully */ + if (status == PTL_OK && + !ptl_is_wire_handle_none(&msg->ack_wmd)) { - LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd)); + LASSERT(msg->ev.type == PTL_EVENT_PUT); memset (&ack, 0, sizeof (ack)); ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->nid); + ack.dest_nid = HTON__u64 (msg->ev.initiator.nid); ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->pid); + ack.dest_pid = HTON__u32 (msg->ev.initiator.pid); ack.src_pid = HTON__u32 (nal->ni.pid); ack.payload_length = 0; @@ -66,92 +115,35 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->nid, msg->pid, NULL, 0, 0); - /* If this send fails, there's nothing else to clean up */ + msg->ev.initiator.nid, msg->ev.initiator.pid, + NULL, 0, 0); + if (rc != PTL_OK) { + /* send failed: there's nothing else to clean up. */ + CERROR("Error %d sending ACK to "LPX64"\n", + rc, msg->ev.initiator.nid); + } } md = msg->md; - LASSERT (md->pending > 0); /* I've not dropped my ref yet */ - eq = md->eq; state_lock(nal, &flags); - if (eq != NULL) { - ptl_event_t *ev = &msg->ev; - ptl_event_t *eq_slot; - - /* I have to hold the lock while I bump the sequence number - * and copy the event into the queue. If not, and I was - * interrupted after bumping the sequence number, other - * events could fill the queue, including the slot I just - * allocated to this event. On resuming, I would overwrite - * a more 'recent' event with old event state, and - * processes taking events off the queue would not detect - * overflow correctly. - */ - - ev->sequence = eq->sequence++;/* Allocate the next queue slot */ - - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); - - /* Invalidate unlinked_me unless this is the last - * event for an auto-unlinked MD. Note that if md was - * auto-unlinked, md->pending can only decrease - */ - if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */ - md->pending != 1) /* not last ref */ - ev->unlinked_me = PTL_HANDLE_NONE; - - /* Copy the event into the allocated slot, ensuring all the - * rest of the event's contents have been copied _before_ - * the sequence number gets updated. A processes 'getting' - * an event waits on the next queue slot's sequence to be - * 'new'. When it is, _all_ other event fields had better - * be consistent. I assert 'sequence' is the last member, - * so I only need a 2 stage copy. - */ - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); - - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == 0); - -#ifdef __KERNEL__ - barrier(); -#endif - /* Updating the sequence number is what makes the event 'new' */ - - /* cb_write is not necessarily atomic, so this could - cause a race with PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == 0); + /* Now it's safe to drop my caller's ref */ + md->pending--; + LASSERT (md->pending >= 0); -#ifdef __KERNEL__ - barrier(); -#endif + /* Should I unlink this MD? */ + unlink = (md->pending == 0 && /* No other refs */ + (md->threshold == 0 || /* All ops done */ + md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */ - /* I must also ensure that (a) callbacks are made in the - * same order as the events land in the queue, and (b) the - * callback occurs before the event can be removed from the - * queue, so I can't drop the lock during the callback. */ - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - (void)((eq->event_callback) (ev)); - } + msg->ev.status = status; + msg->ev.unlinked = unlink; - LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0); + if (md->eq != NULL) + lib_enq_event_locked(nal, private, md->eq, &msg->ev); - md->pending--; - if (md->pending == 0 && /* no more outstanding operations on this md */ - (md->threshold == 0 || /* done its business */ - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */ + if (unlink) lib_md_unlink(nal, md); list_del (&msg->msg_list); @@ -159,6 +151,4 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) lib_msg_free(nal, msg); state_unlock(nal, &flags); - - return rc; } diff --git a/lustre/portals/unals/proclib.c b/lustre/portals/unals/proclib.c index 2627253..2a5ba0d 100644 --- a/lustre/portals/unals/proclib.c +++ b/lustre/portals/unals/proclib.c @@ -43,24 +43,24 @@ /* the following functions are stubs to satisfy the nal definition without doing anything particularily useful*/ -static int nal_write(nal_cb_t *nal, - void *private, - user_ptr dst_addr, - void *src_addr, - size_t len) +static ptl_err_t nal_write(nal_cb_t *nal, + void *private, + user_ptr dst_addr, + void *src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } -static int nal_read(nal_cb_t * nal, - void *private, - void *dst_addr, - user_ptr src_addr, - size_t len) +static ptl_err_t nal_read(nal_cb_t * nal, + void *private, + void *dst_addr, + user_ptr src_addr, + size_t len) { memcpy(dst_addr, src_addr, len); - return 0; + return PTL_OK; } static void *nal_malloc(nal_cb_t *nal, diff --git a/lustre/portals/unals/tcpnal.c b/lustre/portals/unals/tcpnal.c index bf28a058..8e84142 100644 --- a/lustre/portals/unals/tcpnal.c +++ b/lustre/portals/unals/tcpnal.c @@ -55,70 +55,69 @@ * * sends a packet to the peer, after insuring that a connection exists */ -int tcpnal_send(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, - size_t len) +ptl_err_t tcpnal_send(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, + ptl_pid_t pid, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t len) { connection c; bridge b=(bridge)n->nal_data; struct iovec tiov[257]; static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER; - int rc; + ptl_err_t rc = PTL_OK; + int sysrc; int total; + int ntiov; int i; if (!(c=force_tcp_connection((manager)b->lower, PNAL_IP(nid,b), PNAL_PORT(nid,pid), - b->local))) - return(1); + b->local))) + return(PTL_FAIL); -#if 0 /* TODO: these results should be checked. furthermore, provision must be made for the SIGPIPE which is delivered when writing on a tcp socket which has closed underneath the application. there is a linux flag in the sendmsg call which turns off the signally behaviour, but its nonstandard */ - syscall(SYS_write, c->fd,hdr,sizeof(ptl_hdr_t)); - LASSERT (niov <= 1); - if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len); -#else + LASSERT (niov <= 256); tiov[0].iov_base = hdr; tiov[0].iov_len = sizeof(ptl_hdr_t); + ntiov = 1 + lib_extract_iov(256, &tiov[1], niov, iov, offset, len); - if (niov > 0) - memcpy(&tiov[1], iov, niov * sizeof(struct iovec)); pthread_mutex_lock(&send_lock); #if 1 - for (i = total = 0; i <= niov; i++) + for (i = total = 0; i <= ntiov; i++) total += tiov[i].iov_len; - rc = syscall(SYS_writev, c->fd, tiov, niov+1); - if (rc != total) { + sysrc = syscall(SYS_writev, c->fd, tiov, ntiov); + if (sysrc != total) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, total, errno); - abort(); + rc = PTL_FAIL; } #else - for (i = total = 0; i <= niov; i++) { + for (i = total = 0; i <= ntiov; i++) { rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0); if (rc != tiov[i].iov_len) { fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", rc, tiov[i].iov_len, errno); - abort(); + rc = PTL_FAIL; + break; } - total != rc; + total += rc; } #endif #if 0 @@ -131,10 +130,14 @@ int tcpnal_send(nal_cb_t *n, total, niov + 1); #endif pthread_mutex_unlock(&send_lock); -#endif - lib_finalize(n, private, cookie); - - return(0); + + if (rc == PTL_OK) { + /* NB the NAL only calls lib_finalize() if it returns PTL_OK + * from cb_send() */ + lib_finalize(n, private, cookie, PTL_OK); + } + + return(rc); } @@ -151,15 +154,18 @@ int tcpnal_send(nal_cb_t *n, * blocking read of the requested data. must drain out the * difference of mainpulated and requested lengths from the network */ -int tcpnal_recv(nal_cb_t *n, - void *private, - lib_msg_t *cookie, - unsigned int niov, - struct iovec *iov, - size_t mlen, - size_t rlen) +ptl_err_t tcpnal_recv(nal_cb_t *n, + void *private, + lib_msg_t *cookie, + unsigned int niov, + struct iovec *iov, + size_t offset, + size_t mlen, + size_t rlen) { + struct iovec tiov[256]; + int ntiov; int i; if (!niov) @@ -169,16 +175,19 @@ int tcpnal_recv(nal_cb_t *n, LASSERT(rlen); LASSERT(rlen >= mlen); + ntiov = lib_extract_iov(256, tiov, niov, iov, offset, mlen); + /* FIXME * 1. Is this effecient enough? change to use readv() directly? * 2. need check return from read_connection() * - MeiJia */ - for (i = 0; i < niov; i++) - read_connection(private, iov[i].iov_base, iov[i].iov_len); + for (i = 0; i < ntiov; i++) + read_connection(private, tiov[i].iov_base, tiov[i].iov_len); finalize: - lib_finalize(n, private, cookie); + /* FIXME; we always assume success here... */ + lib_finalize(n, private, cookie, PTL_OK); if (mlen!=rlen){ char *trash=malloc(rlen-mlen); @@ -188,7 +197,7 @@ finalize: free(trash); } - return(rlen); + return(PTL_OK); } -- 1.8.3.1