Whamcloud - gitweb
land b_eq on HEAD
authorrread <rread>
Sat, 14 Feb 2004 03:16:22 +0000 (03:16 +0000)
committerrread <rread>
Sat, 14 Feb 2004 03:16:22 +0000 (03:16 +0000)
168 files changed:
lnet/archdep.m4
lnet/include/lnet/errno.h
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-nal.h
lnet/include/lnet/lib-p30.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/lnet.h
lnet/include/lnet/p30.h
lnet/include/lnet/types.h
lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_cb.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/iblnd/ibnal_cb.c
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/scimaclnd/scimacnal_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd_cb.c
lnet/libcfs/module.c
lnet/lnet/Makefile.am
lnet/lnet/api-eq.c
lnet/lnet/api-errno.c
lnet/lnet/api-ni.c
lnet/lnet/api-wrap.c
lnet/lnet/lib-init.c
lnet/lnet/lib-md.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/ulnds/Makefile.am
lnet/ulnds/bridge.h
lnet/ulnds/connection.c
lnet/ulnds/connection.h
lnet/ulnds/procapi.c
lnet/ulnds/procbridge.h
lnet/ulnds/proclib.c
lnet/ulnds/select.c
lnet/ulnds/socklnd/Makefile.am
lnet/ulnds/socklnd/bridge.h
lnet/ulnds/socklnd/connection.c
lnet/ulnds/socklnd/connection.h
lnet/ulnds/socklnd/procapi.c
lnet/ulnds/socklnd/procbridge.h
lnet/ulnds/socklnd/proclib.c
lnet/ulnds/socklnd/select.c
lnet/ulnds/socklnd/tcplnd.c
lnet/ulnds/tcplnd.c
lnet/utils/Makefile.am
lnet/utils/l_ioctl.c
lnet/utils/portals.c
lustre/ChangeLog
lustre/Makefile.am
lustre/configure.in
lustre/include/liblustre.h
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/kernel_patches/patches/2.6.0-test6-mm4.patch
lustre/kernel_patches/patches/bproc-patch-2.4.20
lustre/kernel_patches/patches/ext3-xattr-ptr-arith-fix.patch
lustre/ldlm/Makefile.am
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/liblustre/Makefile.am
lustre/liblustre/dir.c [new file with mode: 0644]
lustre/liblustre/file.c
lustre/liblustre/genlib.sh
lustre/liblustre/libtest.c [deleted file]
lustre/liblustre/llite_lib.c
lustre/liblustre/llite_lib.h
lustre/liblustre/namei.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/liblustre/tests/.cvsignore [new file with mode: 0644]
lustre/liblustre/tests/Makefile.am [new file with mode: 0644]
lustre/liblustre/tests/echo_test.c [new file with mode: 0644]
lustre/liblustre/tests/recovery_small.c [moved from lustre/liblustre/recovery_small.c with 99% similarity]
lustre/liblustre/tests/replay_ost_single.c [new file with mode: 0644]
lustre/liblustre/tests/replay_single.c [moved from lustre/liblustre/replay_single.c with 99% similarity, mode: 0644]
lustre/liblustre/tests/sanity.c [moved from lustre/liblustre/lltest.c with 81% similarity]
lustre/liblustre/tests/test_common.c [moved from lustre/liblustre/test_common.c with 91% similarity]
lustre/liblustre/tests/test_common.h [moved from lustre/liblustre/test_common.h with 91% similarity]
lustre/liblustre/tests/test_lock_cancel.c [moved from lustre/liblustre/test_lock_cancel.c with 100% similarity]
lustre/llite/llite_lib.c
lustre/lov/Makefile.am
lustre/lvfs/Makefile.am
lustre/mdc/Makefile.am
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_log.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/mds/mds_unlink_open.c
lustre/mgmt/mgmt_svc.c
lustre/obdclass/Makefile.am
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_config.c
lustre/obdecho/Makefile.am
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/osc/Makefile.am
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/portals/archdep.m4
lustre/portals/include/portals/errno.h
lustre/portals/include/portals/lib-nal.h
lustre/portals/include/portals/lib-p30.h
lustre/portals/include/portals/lib-types.h
lustre/portals/include/portals/p30.h
lustre/portals/include/portals/types.h
lustre/portals/knals/gmnal/gmnal.h
lustre/portals/knals/gmnal/gmnal_cb.c
lustre/portals/knals/gmnal/gmnal_comm.c
lustre/portals/knals/ibnal/ibnal_cb.c
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/portals/knals/scimacnal/scimacnal_cb.c
lustre/portals/knals/socknal/socknal.c
lustre/portals/knals/socknal/socknal_cb.c
lustre/portals/libcfs/module.c
lustre/portals/portals/Makefile.am
lustre/portals/portals/api-eq.c
lustre/portals/portals/api-errno.c
lustre/portals/portals/api-ni.c
lustre/portals/portals/api-wrap.c
lustre/portals/portals/lib-init.c
lustre/portals/portals/lib-md.c
lustre/portals/portals/lib-move.c
lustre/portals/portals/lib-msg.c
lustre/portals/unals/Makefile.am
lustre/portals/unals/bridge.h
lustre/portals/unals/connection.c
lustre/portals/unals/connection.h
lustre/portals/unals/procapi.c
lustre/portals/unals/procbridge.h
lustre/portals/unals/proclib.c
lustre/portals/unals/select.c
lustre/portals/unals/tcpnal.c
lustre/portals/utils/Makefile.am
lustre/portals/utils/l_ioctl.c
lustre/portals/utils/portals.c
lustre/ptlbd/rpc.c
lustre/ptlbd/server.c
lustre/ptlrpc/Makefile.am
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/import.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/service.c
lustre/scripts/lustre.spec.in
lustre/tests/conf-sanity.sh
lustre/tests/replay-ost-single.sh
lustre/utils/Makefile.am
lustre/utils/lrun

index e955c33..c06bc8a 100644 (file)
@@ -333,6 +333,7 @@ AC_SUBST(SCIMACNAL)
 CFLAGS="$KCFLAGS"
 CPPFLAGS="$KINCFLAGS $KCPPFLAGS $MFLAGS $enable_zerocopy $enable_affinity $with_quadrics $with_gm $with_scamac $with_ib"
 
+AM_CONDITIONAL(LIBLUSTRE, test x$host_cpu = xlib)
 AC_SUBST(MOD_LINK)
 AC_SUBST(LINUX25)
 AM_CONDITIONAL(LIBLUSTRE, test x$host_cpu = xlib)
index 817936a..08f084a 100644 (file)
@@ -50,9 +50,8 @@ typedef enum {
         PTL_IOV_TOO_SMALL   = 31,
 
        PTL_EQ_INUSE        = 32,
-       PTL_MD_INUSE        = 33,
 
-        PTL_MAX_ERRNO       = 33
+        PTL_MAX_ERRNO       = 32
 } ptl_err_t;
 /* If you change these, you must update the string table in api-errno.c */
 
index 3582b94..e9e4635 100644 (file)
@@ -19,7 +19,6 @@
 #include <portals/types.h>
 #include <linux/kp30.h>
 #include <portals/p30.h>
-#include <portals/errno.h>
 #include <portals/lib-types.h>
 #include <portals/lib-nal.h>
 #include <portals/lib-dispatch.h>
@@ -42,7 +41,7 @@ do {                                                    \
         nal->cb_sti(nal, flagsp);                       \
 }
 
-#ifdef PTL_USE_DESC_LISTS
+#ifdef PTL_USE_LIB_FREELIST
 
 #define MAX_MES         2048
 #define MAX_MDS         2048
@@ -98,7 +97,7 @@ lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal)
+lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 {
         /* NEVER called with statelock held */
         unsigned long  flags;
@@ -142,8 +141,20 @@ lib_me_free (nal_cb_t *nal, lib_me_t *me)
 static inline lib_msg_t *
 lib_msg_alloc (nal_cb_t *nal)
 {
-        /* ALWAYS called with statelock held */
-        return ((lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs));
+        /* NEVER called with statelock held */
+        unsigned long  flags;
+        lib_msg_t     *msg;
+        
+        state_lock (nal, &flags);
+        msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
+        state_unlock (nal, &flags);
+
+        if (msg != NULL) {
+                /* NULL pointers, clear flags etc */
+                memset (msg, 0, sizeof (*msg));
+                msg->ack_wmd = PTL_WIRE_HANDLE_NONE;
+        }
+        return(msg);
 }
 
 static inline void
@@ -155,22 +166,13 @@ lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
 
 #else
 
-extern atomic_t      md_in_use_count;
-extern atomic_t      msg_in_use_count;
-extern atomic_t      me_in_use_count;
-extern atomic_t      eq_in_use_count;
-
 static inline lib_eq_t *
 lib_eq_alloc (nal_cb_t *nal)
 {
         /* NEVER called with statelock held */
         lib_eq_t *eq;
-        PORTAL_ALLOC(eq, sizeof(*eq));
-
-        if (eq == NULL)
-                return (NULL);
 
-        atomic_inc (&eq_in_use_count);
+        PORTAL_ALLOC(eq, sizeof(*eq));
         return (eq);
 }
 
@@ -178,21 +180,34 @@ static inline void
 lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&eq_in_use_count);
         PORTAL_FREE(eq, sizeof(*eq));
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal)
+lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 {
         /* NEVER called with statelock held */
         lib_md_t *md;
-        PORTAL_ALLOC(md, sizeof(*md));
-
-        if (md == NULL)
-                return (NULL);
-
-        atomic_inc (&md_in_use_count);
+        int       size;
+        int       niov;
+
+        if ((umd->options & PTL_MD_KIOV) != 0) {
+                niov = umd->niov;
+                size = offsetof(lib_md_t, md_iov.kiov[niov]);
+        } else {
+                niov = ((umd->options & PTL_MD_IOV) != 0) ?
+                       umd->niov : 1;
+                size = offsetof(lib_md_t, md_iov.iov[niov]);
+        }
+
+        PORTAL_ALLOC(md, size);
+
+        if (md != NULL) {
+                /* Set here in case of early free */
+                md->options = umd->options;
+                md->md_niov = niov;
+        }
+        
         return (md);
 }
 
@@ -200,8 +215,14 @@ static inline void
 lib_md_free (nal_cb_t *nal, lib_md_t *md)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&md_in_use_count);
-        PORTAL_FREE(md, sizeof(*md));
+        int       size;
+
+        if ((md->options & PTL_MD_KIOV) != 0)
+                size = offsetof(lib_md_t, md_iov.kiov[md->md_niov]);
+        else
+                size = offsetof(lib_md_t, md_iov.iov[md->md_niov]);
+
+        PORTAL_FREE(md, size);
 }
 
 static inline lib_me_t *
@@ -209,12 +230,8 @@ lib_me_alloc (nal_cb_t *nal)
 {
         /* NEVER called with statelock held */
         lib_me_t *me;
-        PORTAL_ALLOC(me, sizeof(*me));
-
-        if (me == NULL)
-                return (NULL);
 
-        atomic_inc (&me_in_use_count);
+        PORTAL_ALLOC(me, sizeof(*me));
         return (me);
 }
 
@@ -222,21 +239,21 @@ static inline void
 lib_me_free(nal_cb_t *nal, lib_me_t *me)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&me_in_use_count);
         PORTAL_FREE(me, sizeof(*me));
 }
 
 static inline lib_msg_t *
 lib_msg_alloc(nal_cb_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* NEVER called with statelock held */
         lib_msg_t *msg;
-        PORTAL_ALLOC_ATOMIC(msg, sizeof(*msg));
 
-        if (msg == NULL)
-                return (NULL);
-        
-        atomic_inc (&msg_in_use_count);
+        PORTAL_ALLOC(msg, sizeof(*msg));
+        if (msg != NULL) {
+                /* NULL pointers, clear flags etc */
+                memset (msg, 0, sizeof (*msg));
+                msg->ack_wmd = PTL_WIRE_HANDLE_NONE;
+        }
         return (msg);
 }
 
@@ -244,7 +261,6 @@ static inline void
 lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&msg_in_use_count);
         PORTAL_FREE(msg, sizeof(*msg));
 }
 #endif
@@ -344,26 +360,41 @@ extern char *dispatch_name(int index);
  * Call backs will be made to write events, send acks or
  * replies and so on.
  */
-extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private);
-extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg);
+extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+                                  lib_eq_t *eq, ptl_event_t *ev);
+extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, 
+                          ptl_err_t status);
+extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
 extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, 
                                       lib_md_t *getmd);
-extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+
 
 extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
-extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len);
-extern void lib_copy_buf2iov (int niov, struct iovec *iov, char *dest, ptl_size_t len);
+extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
+                              ptl_size_t offset, ptl_size_t len);
+extern void lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
+                              char *src, ptl_size_t len);
+extern int lib_extract_iov (int dst_niov, struct iovec *dst,
+                            int src_niov, struct iovec *src,
+                            ptl_size_t offset, ptl_size_t len);
 
 extern ptl_size_t lib_kiov_nob (int niov, ptl_kiov_t *iov);
-extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *iov, ptl_size_t len);
-extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *iov, char *src, ptl_size_t len);
+extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
+                               ptl_size_t offset, ptl_size_t len);
+extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
+                               char *src, ptl_size_t len);
+extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
+                             int src_niov, ptl_kiov_t *src,
+                             ptl_size_t offset, ptl_size_t len);
+
 extern void lib_assert_wire_constants (void);
 
-extern void lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
-                      ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern int lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
-                     ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                     lib_md_t *md, ptl_size_t offset, ptl_size_t len);
+extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+                           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
+extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+                           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                           lib_md_t *md, ptl_size_t offset, ptl_size_t len);
 
 extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
                                ptl_md_t * md_out);
index 4052c0c..0bf557e 100644 (file)
@@ -18,47 +18,60 @@ struct nal_cb_t {
        lib_ni_t ni;
        void *nal_data;
        /*
-        * send:  Sends a preformatted header and user data to a
-        * specified remote process.
-        * Can overwrite iov.
+        * send: Sends a preformatted header and payload data to a
+        * specified remote process. The payload is scattered over 'niov'
+        * fragments described by iov, starting at 'offset' for 'mlen'
+        * bytes.  
+        * NB the NAL may NOT overwrite iov.  
+        * PTL_OK on success => NAL has committed to send and will call
+        * lib_finalize on completion
         */
-       int (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
-                       ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                       unsigned int niov, struct iovec *iov, size_t mlen);
+       ptl_err_t (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
+                             ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
+                             unsigned int niov, struct iovec *iov, 
+                             size_t offset, size_t mlen);
 
        /* as send, but with a set of page fragments (NULL if not supported) */
-       int (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
-                             ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                             unsigned int niov, ptl_kiov_t *iov, size_t mlen);
+       ptl_err_t (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
+                                   ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
+                                   unsigned int niov, ptl_kiov_t *iov, 
+                                   size_t offset, size_t mlen);
        /*
-        * recv: Receives an incoming message from a remote process
-        * Type of iov depends on options.  Can overwrite iov.
+        * recv: Receives an incoming message from a remote process.  The
+        * payload is to be received into the scattered buffer of 'niov'
+        * fragments described by iov, starting at 'offset' for 'mlen'
+        * bytes.  Payload bytes after 'mlen' up to 'rlen' are to be
+        * discarded.  
+        * NB the NAL may NOT overwrite iov.
+        * PTL_OK on success => NAL has committed to receive and will call
+        * lib_finalize on completion
         */
-       int (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
-                       unsigned int niov, struct iovec *iov, size_t mlen
-                       size_t rlen);
+       ptl_err_t (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
+                             unsigned int niov, struct iovec *iov
+                             size_t offset, size_t mlen, size_t rlen);
 
        /* as recv, but with a set of page fragments (NULL if not supported) */
-       int (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
-                             unsigned int niov, ptl_kiov_t *iov, size_t mlen
-                             size_t rlen);
+       ptl_err_t (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
+                                   unsigned int niov, ptl_kiov_t *iov
+                                   size_t offset, size_t mlen, size_t rlen);
        /*
         * read: Reads a block of data from a specified user address
         */
-       int (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr,
-                       user_ptr src_addr, size_t len);
+       ptl_err_t (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr,
+                             user_ptr src_addr, size_t len);
 
        /*
         * write: Writes a block of data into a specified user address
         */
-       int (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr,
-                        void *src_addr, size_t len);
+       ptl_err_t (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr,
+                              void *src_addr, size_t len);
 
        /*
         * callback: Calls an event callback
+        * NULL => lib calls eq's callback (if any) directly.
         */
-       int (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq,
-                        ptl_event_t *ev);
+       void (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq,
+                            ptl_event_t *ev);
 
        /*
         *  malloc: Acquire a block of memory in a system independent
@@ -74,14 +87,14 @@ struct nal_cb_t {
         * type of *iov depends on options.
         * Set to NULL if not required.
         */
-       int (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, 
-                      void **addrkey);
+       ptl_err_t (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, 
+                            void **addrkey);
        void (*cb_unmap) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, 
                          void **addrkey);
 
        /* as (un)map, but with a set of page fragments */
-       int (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, 
-                            void **addrkey);
+       ptl_err_t (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, 
+                                  void **addrkey);
        void (*cb_unmap_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, 
                          void **addrkey);
 
index 3582b94..e9e4635 100644 (file)
@@ -19,7 +19,6 @@
 #include <portals/types.h>
 #include <linux/kp30.h>
 #include <portals/p30.h>
-#include <portals/errno.h>
 #include <portals/lib-types.h>
 #include <portals/lib-nal.h>
 #include <portals/lib-dispatch.h>
@@ -42,7 +41,7 @@ do {                                                    \
         nal->cb_sti(nal, flagsp);                       \
 }
 
-#ifdef PTL_USE_DESC_LISTS
+#ifdef PTL_USE_LIB_FREELIST
 
 #define MAX_MES         2048
 #define MAX_MDS         2048
@@ -98,7 +97,7 @@ lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal)
+lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 {
         /* NEVER called with statelock held */
         unsigned long  flags;
@@ -142,8 +141,20 @@ lib_me_free (nal_cb_t *nal, lib_me_t *me)
 static inline lib_msg_t *
 lib_msg_alloc (nal_cb_t *nal)
 {
-        /* ALWAYS called with statelock held */
-        return ((lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs));
+        /* NEVER called with statelock held */
+        unsigned long  flags;
+        lib_msg_t     *msg;
+        
+        state_lock (nal, &flags);
+        msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
+        state_unlock (nal, &flags);
+
+        if (msg != NULL) {
+                /* NULL pointers, clear flags etc */
+                memset (msg, 0, sizeof (*msg));
+                msg->ack_wmd = PTL_WIRE_HANDLE_NONE;
+        }
+        return(msg);
 }
 
 static inline void
@@ -155,22 +166,13 @@ lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
 
 #else
 
-extern atomic_t      md_in_use_count;
-extern atomic_t      msg_in_use_count;
-extern atomic_t      me_in_use_count;
-extern atomic_t      eq_in_use_count;
-
 static inline lib_eq_t *
 lib_eq_alloc (nal_cb_t *nal)
 {
         /* NEVER called with statelock held */
         lib_eq_t *eq;
-        PORTAL_ALLOC(eq, sizeof(*eq));
-
-        if (eq == NULL)
-                return (NULL);
 
-        atomic_inc (&eq_in_use_count);
+        PORTAL_ALLOC(eq, sizeof(*eq));
         return (eq);
 }
 
@@ -178,21 +180,34 @@ static inline void
 lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&eq_in_use_count);
         PORTAL_FREE(eq, sizeof(*eq));
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal)
+lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 {
         /* NEVER called with statelock held */
         lib_md_t *md;
-        PORTAL_ALLOC(md, sizeof(*md));
-
-        if (md == NULL)
-                return (NULL);
-
-        atomic_inc (&md_in_use_count);
+        int       size;
+        int       niov;
+
+        if ((umd->options & PTL_MD_KIOV) != 0) {
+                niov = umd->niov;
+                size = offsetof(lib_md_t, md_iov.kiov[niov]);
+        } else {
+                niov = ((umd->options & PTL_MD_IOV) != 0) ?
+                       umd->niov : 1;
+                size = offsetof(lib_md_t, md_iov.iov[niov]);
+        }
+
+        PORTAL_ALLOC(md, size);
+
+        if (md != NULL) {
+                /* Set here in case of early free */
+                md->options = umd->options;
+                md->md_niov = niov;
+        }
+        
         return (md);
 }
 
@@ -200,8 +215,14 @@ static inline void
 lib_md_free (nal_cb_t *nal, lib_md_t *md)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&md_in_use_count);
-        PORTAL_FREE(md, sizeof(*md));
+        int       size;
+
+        if ((md->options & PTL_MD_KIOV) != 0)
+                size = offsetof(lib_md_t, md_iov.kiov[md->md_niov]);
+        else
+                size = offsetof(lib_md_t, md_iov.iov[md->md_niov]);
+
+        PORTAL_FREE(md, size);
 }
 
 static inline lib_me_t *
@@ -209,12 +230,8 @@ lib_me_alloc (nal_cb_t *nal)
 {
         /* NEVER called with statelock held */
         lib_me_t *me;
-        PORTAL_ALLOC(me, sizeof(*me));
-
-        if (me == NULL)
-                return (NULL);
 
-        atomic_inc (&me_in_use_count);
+        PORTAL_ALLOC(me, sizeof(*me));
         return (me);
 }
 
@@ -222,21 +239,21 @@ static inline void
 lib_me_free(nal_cb_t *nal, lib_me_t *me)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&me_in_use_count);
         PORTAL_FREE(me, sizeof(*me));
 }
 
 static inline lib_msg_t *
 lib_msg_alloc(nal_cb_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* NEVER called with statelock held */
         lib_msg_t *msg;
-        PORTAL_ALLOC_ATOMIC(msg, sizeof(*msg));
 
-        if (msg == NULL)
-                return (NULL);
-        
-        atomic_inc (&msg_in_use_count);
+        PORTAL_ALLOC(msg, sizeof(*msg));
+        if (msg != NULL) {
+                /* NULL pointers, clear flags etc */
+                memset (msg, 0, sizeof (*msg));
+                msg->ack_wmd = PTL_WIRE_HANDLE_NONE;
+        }
         return (msg);
 }
 
@@ -244,7 +261,6 @@ static inline void
 lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
 {
         /* ALWAYS called with statelock held */
-        atomic_dec (&msg_in_use_count);
         PORTAL_FREE(msg, sizeof(*msg));
 }
 #endif
@@ -344,26 +360,41 @@ extern char *dispatch_name(int index);
  * Call backs will be made to write events, send acks or
  * replies and so on.
  */
-extern int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private);
-extern int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t * msg);
+extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+                                  lib_eq_t *eq, ptl_event_t *ev);
+extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, 
+                          ptl_err_t status);
+extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
 extern lib_msg_t *lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, 
                                       lib_md_t *getmd);
-extern void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+
 
 extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
-extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len);
-extern void lib_copy_buf2iov (int niov, struct iovec *iov, char *dest, ptl_size_t len);
+extern void lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
+                              ptl_size_t offset, ptl_size_t len);
+extern void lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
+                              char *src, ptl_size_t len);
+extern int lib_extract_iov (int dst_niov, struct iovec *dst,
+                            int src_niov, struct iovec *src,
+                            ptl_size_t offset, ptl_size_t len);
 
 extern ptl_size_t lib_kiov_nob (int niov, ptl_kiov_t *iov);
-extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *iov, ptl_size_t len);
-extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *iov, char *src, ptl_size_t len);
+extern void lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
+                               ptl_size_t offset, ptl_size_t len);
+extern void lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
+                               char *src, ptl_size_t len);
+extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
+                             int src_niov, ptl_kiov_t *src,
+                             ptl_size_t offset, ptl_size_t len);
+
 extern void lib_assert_wire_constants (void);
 
-extern void lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
-                      ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern int lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
-                     ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                     lib_md_t *md, ptl_size_t offset, ptl_size_t len);
+extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+                           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
+extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+                           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                           lib_md_t *md, ptl_size_t offset, ptl_size_t len);
 
 extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
                                ptl_md_t * md_out);
index d9e3c11..904204b 100644 (file)
@@ -16,7 +16,7 @@
 # include <linux/smp_lock.h>
 # include <linux/types.h>
 #else
-# define PTL_USE_DESC_LISTS
+# define PTL_USE_LIB_FREELIST
 # include <sys/types.h>
 #endif
 
@@ -139,16 +139,9 @@ typedef struct {
 
 struct lib_msg_t {
         struct list_head  msg_list;
-        int               send_ack;
         lib_md_t         *md;
-        ptl_nid_t         nid;
-        ptl_pid_t         pid;
-        ptl_event_t       ev;
         ptl_handle_wire_t ack_wmd;
-        union {
-                struct iovec  iov[PTL_MD_MAX_IOV];
-                ptl_kiov_t    kiov[PTL_MD_MAX_IOV];
-        } msg_iov;
+        ptl_event_t       ev;
 };
 
 struct lib_ptl_t {
@@ -212,9 +205,8 @@ struct lib_md_t {
 };
 
 #define PTL_MD_FLAG_UNLINK            (1 << 0)
-#define PTL_MD_FLAG_AUTO_UNLINKED     (1 << 1)
 
-#ifdef PTL_USE_DESC_LISTS
+#ifdef PTL_USE_LIB_FREELIST
 typedef struct
 {
         void             *fl_objs;             /* single contiguous array of objects */
@@ -262,7 +254,7 @@ typedef struct {
         
         struct list_head ni_test_peers;
         
-#ifdef PTL_USE_DESC_LISTS
+#ifdef PTL_USE_LIB_FREELIST
         lib_freelist_t   ni_free_mes;
         lib_freelist_t   ni_free_msgs;
         lib_freelist_t   ni_free_mds;
index a4ea39b..8b1495e 100644 (file)
@@ -21,7 +21,6 @@
 #include <portals/types.h>
 #include <portals/nal.h>
 #include <portals/api.h>
-#include <portals/errno.h>
 #include <portals/nalids.h>
 
 extern int __p30_initialized;  /* for libraries & test codes  */
index a4ea39b..8b1495e 100644 (file)
@@ -21,7 +21,6 @@
 #include <portals/types.h>
 #include <portals/nal.h>
 #include <portals/api.h>
-#include <portals/errno.h>
 #include <portals/nalids.h>
 
 extern int __p30_initialized;  /* for libraries & test codes  */
index e4ccebf..7ffe797 100644 (file)
@@ -17,6 +17,8 @@ typedef u_int64_t __u64;
 # define do_gettimeofday(tv) gettimeofday(tv, NULL)
 #endif
 
+#include <portals/errno.h>
+
 typedef __u64 ptl_nid_t;
 typedef __u32 ptl_pid_t;
 typedef __u32 ptl_pt_index_t;
@@ -97,7 +99,8 @@ typedef enum {
         PTL_EVENT_PUT,
         PTL_EVENT_REPLY,
         PTL_EVENT_ACK,
-        PTL_EVENT_SENT
+        PTL_EVENT_SENT,
+       PTL_EVENT_UNLINK,
 } ptl_event_kind_t;
 
 #define PTL_SEQ_BASETYPE       long
@@ -112,15 +115,19 @@ typedef unsigned PTL_SEQ_BASETYPE ptl_seq_t;
 #pragma pack(push, 4)
 #endif
 typedef struct {
-        ptl_event_kind_t type;
-        ptl_process_id_t initiator;
-        ptl_pt_index_t portal;
-        ptl_match_bits_t match_bits;
-        ptl_size_t rlength, mlength, offset;
-        ptl_handle_me_t unlinked_me;
-        ptl_md_t mem_desc;
-        ptl_hdr_data_t hdr_data;
-        struct timeval arrival_time;
+        ptl_event_kind_t   type;
+       ptl_err_t          status;
+       int                unlinked;
+        ptl_process_id_t   initiator;
+        ptl_pt_index_t     portal;
+        ptl_match_bits_t   match_bits;
+        ptl_size_t         rlength;
+       ptl_size_t         mlength;
+       ptl_size_t         offset;
+        ptl_md_t           mem_desc;
+        ptl_hdr_data_t     hdr_data;
+        struct timeval     arrival_time;
+
         volatile ptl_seq_t sequence;
 } ptl_event_t;
 #ifdef __CYGWIN__
index 53757ab..cdde5b7 100644 (file)
@@ -353,8 +353,6 @@ int gmnal_cb_read(nal_cb_t *, void *private, void *, user_ptr, size_t);
 
 int gmnal_cb_write(nal_cb_t *, void *private, user_ptr, void *, size_t);
 
-int gmnal_cb_callback(nal_cb_t *, void *, lib_eq_t *, ptl_event_t *);
-
 void *gmnal_cb_malloc(nal_cb_t *, size_t);
 
 void gmnal_cb_free(nal_cb_t *, void *, size_t);
@@ -384,7 +382,7 @@ void  gmnal_fini(void);
                                a->cb_recv_pages = gmnal_cb_recv_pages; \
                                a->cb_read = gmnal_cb_read; \
                                a->cb_write = gmnal_cb_write; \
-                               a->cb_callback = gmnal_cb_callback; \
+                               a->cb_callback = NULL; \
                                a->cb_malloc = gmnal_cb_malloc; \
                                a->cb_free = gmnal_cb_free; \
                                a->cb_map = NULL; \
index 6ae91db..e055242 100644 (file)
@@ -126,7 +126,6 @@ int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
                                niov, iov, len);
        } else {
                CDEBUG(D_ERROR, "Large message send it is not supported\n");
-               lib_finalize(nal_cb, private, cookie);
                return(PTL_FAIL);
                gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
                                niov, iov, len);
@@ -200,18 +199,6 @@ int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst,
        return(PTL_OK);
 }
 
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, 
-                      ptl_event_t *ev)
-{
-
-       if (eq->event_callback != NULL) {
-               CDEBUG(D_INFO, "found callback\n");
-               eq->event_callback(ev);
-       }
-       
-       return(PTL_OK);
-}
-
 void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
 {
        void *ptr = NULL;
index 4171df6..a0d3530 100644 (file)
@@ -321,7 +321,6 @@ gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
 
        if (!private) {
                CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
-               lib_finalize(nal_cb, private, cookie);
                return(PTL_FAIL);
        }
 
@@ -343,10 +342,8 @@ gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
         *      let portals library know receive is complete
         */
        CDEBUG(D_PORTALS, "calling lib_finalize\n");
-       if (lib_finalize(nal_cb, private, cookie) != PTL_OK) {
-               /* TO DO what to do with failed lib_finalise? */
-               CDEBUG(D_INFO, "lib_finalize failed\n");
-       }
+       lib_finalize(nal_cb, private, cookie, PTL_OK);
+
        /*
         *      return buffer so it can be used again
         */
@@ -590,10 +587,8 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                return;
        }
        gmnal_return_stxd(nal_data, stxd);
-       if (lib_finalize(nal_cb, stxd, cookie) != PTL_OK) {
-               CDEBUG(D_INFO, "Call to lib_finalize failed for stxd [%p]\n", 
-                      stxd);
-       }
+       lib_finalize(nal_cb, stxd, cookie, PTL_OK);
+
        return;
 }
 
@@ -817,7 +812,6 @@ gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
 
        if (!srxd) {
                CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
-               lib_finalize(nal_cb, private, cookie);
                return(PTL_FAIL);
        }
 
@@ -1114,10 +1108,7 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
         *      Let our client application proceed
         */     
        CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
-       if (lib_finalize(nal_cb, srxd, srxd->cookie) != PTL_OK) {
-               CDEBUG(D_INFO, "Call to lib_finalize failed for srxd [%p]\n", 
-                      srxd);
-       }
+       lib_finalize(nal_cb, srxd, srxd->cookie, PTL_OK);
 
        /*
         *      send an ack to the sender to let him know we got the data
@@ -1282,10 +1273,7 @@ gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
 
        CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
 
-       if (lib_finalize(nal_cb, stxd, stxd->cookie) != PTL_OK) {
-               CDEBUG(D_INFO, "Call to lib_finalize failed for stxd [%p]\n", 
-                      stxd);
-       }
+       lib_finalize(nal_cb, stxd, stxd->cookie, PTL_OK);
 
        /*
         *      extract the iovec from the stxd, deregister the memory.
index 2c07cc4..0688062 100644 (file)
@@ -306,7 +306,7 @@ kibnal_send(nal_cb_t        *nal,
           if(buf_length > MAX_MSG_SIZE) { 
              CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
                       MAX_MSG_SIZE);
-             rc = -1;
+             rc = PTL_FAIL;
              return rc;
           }
           else {
@@ -363,7 +363,7 @@ kibnal_send(nal_cb_t        *nal,
 
         PROF_FINISH(kibnal_send); // time stapm of send operation 
 
-        rc = 1;
+        rc = PTL_OK;
 
         return rc; 
 }
@@ -386,7 +386,7 @@ int kibnal_send_pages(nal_cb_t * nal,
                       ptl_kiov_t *iov, 
                       size_t mlen)
 {
-   int rc = 1;
+   int rc = PTL_FAIL;
 
    CDEBUG(D_NET, "kibnal_send_pages\n");
 
@@ -420,7 +420,7 @@ void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 //
 // do you need this 
 //
-int kibnal_callback(nal_cb_t * nal, 
+void kibnal_callback(nal_cb_t * nal, 
                            void *private, 
                            lib_eq_t *eq,
                            ptl_event_t *ev)
@@ -507,7 +507,7 @@ kibnal_recv_pages(nal_cb_t * nal,
 {
 
   CDEBUG(D_NET, "recv_pages not implemented\n");
-  return PTL_OK;
+  return PTL_FAIL;
        
 }
 
@@ -526,11 +526,12 @@ kibnal_recv(nal_cb_t     *nal,
         CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
 
         /* What was actually received must be >= what sender claims to
-         * have sent.  This is an LASSERT, since lib-move doesn't
-         * check cb return code yet. */
-        LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen);
+         * have sent. */
         LASSERT (mlen <= rlen);
 
+        if (krx->krx_len < sizeof (ptl_hdr_t) + rlen)
+                return (PTL_FAIL);
+
         PROF_START(kibnal_recv);
 
         if(mlen != 0) {
@@ -542,12 +543,12 @@ kibnal_recv(nal_cb_t     *nal,
 
         PROF_START(lib_finalize);
         
-        lib_finalize(nal, private, cookie);
+        lib_finalize(nal, private, cookie, PTL_OK);
         
         PROF_FINISH(lib_finalize);
         PROF_FINISH(kibnal_recv);
 
-        return rlen;
+        return PTL_OK;
 }
 
 //
index 96749cd..4c2bd6a 100644 (file)
@@ -33,7 +33,7 @@ EP_STATUSBLK  kqswnal_rpc_failed;
  *  LIB functions follow
  *
  */
-static int
+static ptl_err_t
 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
              size_t len)
 {
@@ -41,10 +41,10 @@ kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
               size_t len)
 {
@@ -52,7 +52,7 @@ kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
                 nal->ni.nid, len, src_addr, dst_addr );
         memcpy( dst_addr, src_addr, len );
 
-        return (0);
+        return (PTL_OK);
 }
 
 static void *
@@ -157,13 +157,12 @@ kqswnal_unmap_tx (kqswnal_tx_t *ktx)
         elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
                           kqswnal_data.kqn_eptxdmahandle,
                           ktx->ktx_basepage, ktx->ktx_nmappedpages);
-
 #endif
         ktx->ktx_nmappedpages = 0;
 }
 
 int
-kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
+kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -188,8 +187,16 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before 'offset' */
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+
         do {
-                int  fraglen = kiov->kiov_len;
+                int  fraglen = kiov->kiov_len - offset;
 
                 /* nob exactly spans the iovs */
                 LASSERT (fraglen <= nob);
@@ -212,7 +219,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 /* XXX this is really crap, but we'll have to kmap until
                  * EKC has a page (rather than vaddr) mapping interface */
 
-                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, page %d, %d total\n",
@@ -257,6 +264,7 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
                 kiov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -271,7 +279,8 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
 }
 
 int
-kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
+kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob, 
+                    int niov, struct iovec *iov)
 {
         int       nfrags    = ktx->ktx_nfrag;
         int       nmapped   = ktx->ktx_nmappedpages;
@@ -295,8 +304,16 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
         LASSERT (niov > 0);
         LASSERT (nob > 0);
 
+        /* skip complete frags before offset */
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
         do {
-                int  fraglen = iov->iov_len;
+                int  fraglen = iov->iov_len - offset;
                 long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
 
                 /* nob exactly spans the iovs */
@@ -317,12 +334,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 
                 CDEBUG(D_NET,
                        "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
-                        ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
-                        nmapped);
+                       ktx, nfrags, iov->iov_base + offset, fraglen, 
+                       basepage, npages, nmapped);
 
 #if MULTIRAIL_EKC
                 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
-                             iov->iov_base, fraglen,
+                             iov->iov_base + offset, fraglen,
                              kqswnal_data.kqn_ep_tx_nmh, basepage,
                              &railmask, &ktx->ktx_frags[nfrags]);
 
@@ -336,7 +353,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
 #else
                 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
                                        kqswnal_data.kqn_eptxdmahandle,
-                                       iov->iov_base, fraglen,
+                                       iov->iov_base + offset, fraglen,
                                        basepage, &ktx->ktx_frags[nfrags].Base);
 
                 if (nfrags > 0 &&                /* previous frag mapped */
@@ -357,6 +374,7 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
                 iov++;
                 niov--;
                 nob -= fraglen;
+                offset = 0;
 
                 /* iov must not run out before end of data */
                 LASSERT (nob == 0 || niov > 0);
@@ -483,7 +501,7 @@ void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
         lib_msg_t     *msg;
-        lib_msg_t     *repmsg;
+        lib_msg_t     *repmsg = NULL;
 
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
@@ -493,21 +511,29 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 
         case KTX_SENDING:          /* packet sourced locally */
                 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
-                              (lib_msg_t *)ktx->ktx_args[1]);
+                              (lib_msg_t *)ktx->ktx_args[1],
+                              (error == 0) ? PTL_OK : 
+                              (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
                 break;
 
         case KTX_GETTING:          /* Peer has DMA-ed direct? */
                 msg = (lib_msg_t *)ktx->ktx_args[1];
-                repmsg = NULL;
 
-                if (error == 0) 
+                if (error == 0) {
                         repmsg = lib_fake_reply_msg (&kqswnal_lib, 
                                                      ktx->ktx_nid, msg->md);
+                        if (repmsg == NULL)
+                                error = -ENOMEM;
+                }
                 
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
-
-                if (repmsg != NULL) 
-                        lib_finalize (&kqswnal_lib, NULL, repmsg);
+                if (error == 0) {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
+                                      msg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
+                } else {
+                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
+                                      (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
+                }
                 break;
 
         default:
@@ -533,7 +559,7 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                         ktx->ktx_nid, status);
 
                 kqswnal_notify_peer_down(ktx);
-                status = -EIO;
+                status = -EHOSTDOWN;
 
         } else if (ktx->ktx_state == KTX_GETTING) {
                 /* RPC completed OK; what did our peer put in the status
@@ -745,7 +771,8 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
 
 int
 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, int nob)
+                   struct iovec *iov, ptl_kiov_t *kiov, 
+                   int offset, int nob)
 {
         kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_pages[0]);
@@ -779,9 +806,9 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         /* Map the source data... */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
 
         if (rc != 0) {
                 CERROR ("Can't map source data: %d\n", rc);
@@ -846,7 +873,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
         return (-ECONNABORTED);
 }
 
-static int
+static ptl_err_t
 kqswnal_sendmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
@@ -857,6 +884,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                  unsigned int  payload_niov,
                  struct iovec *payload_iov,
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         kqswnal_tx_t      *ktx;
@@ -865,6 +893,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         int                i;
         kqsw_csum_t        csum;
+        int                sumoff;
         int                sumnob;
 #endif
         
@@ -928,9 +957,9 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 }
 
                 /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx,
-                                        payload_niov, payload_iov, 
-                                        payload_kiov, payload_nob);
+                rc = kqswnal_dma_reply (ktx, payload_niov, 
+                                        payload_iov, payload_kiov, 
+                                        payload_offset, payload_nob);
                 if (rc == 0)
                         return (PTL_OK);
                 
@@ -945,22 +974,39 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 #if KQSW_CHECKSUM
         csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
         memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
-        for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
+        for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
+                LASSERT(i < niov);
                 if (payload_kiov != NULL) {
                         ptl_kiov_t *kiov = &payload_kiov[i];
-                        char       *addr = ((char *)kmap (kiov->kiov_page)) +
-                                           kiov->kiov_offset;
-                        
-                        csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
-                        sumnob -= kiov->kiov_len;
+
+                        if (sumoff >= kiov->kiov_len) {
+                                sumoff -= kiov->kiov_len;
+                        } else {
+                                char *addr = ((char *)kmap (kiov->kiov_page)) +
+                                             kiov->kiov_offset + sumoff;
+                                int   fragnob = kiov->kiov_len - sumoff;
+
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                                kunmap(kiov->kiov_page);
+                        }
                 } else {
                         struct iovec *iov = &payload_iov[i];
 
-                        csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
-                        sumnob -= iov->iov_len;
+                        if (sumoff > iov->iov_len) {
+                                sumoff -= iov->iov_len;
+                        } else {
+                                char *addr = iov->iov_base + sumoff;
+                                int   fragnob = iov->iov_len - sumoff;
+                                
+                                csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
+                                sumnob -= fragnob;
+                                sumoff = 0;
+                        }
                 }
         }
-        memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
+        memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
         
         if (kqswnal_data.kqn_optimized_gets &&
@@ -987,10 +1033,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_state = KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
-                        rc = kqswnal_map_tx_kiov (ktx, md->length,
+                        rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
                                                   md->md_niov, md->md_iov.kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, md->length,
+                        rc = kqswnal_map_tx_iov (ktx, 0, md->length,
                                                  md->md_niov, md->md_iov.iov);
 
                 if (rc < 0) {
@@ -1033,10 +1079,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 if (payload_nob > 0) {
                         if (payload_kiov != NULL)
                                 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                   payload_niov, payload_kiov, payload_nob);
+                                                   payload_niov, payload_kiov, 
+                                                   payload_offset, payload_nob);
                         else
                                 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
-                                                  payload_niov, payload_iov, payload_nob);
+                                                  payload_niov, payload_iov, 
+                                                  payload_offset, payload_nob);
                 }
         } else {
 
@@ -1052,10 +1100,10 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
 #endif
                 if (payload_kiov != NULL)
-                        rc = kqswnal_map_tx_kiov (ktx, payload_nob, 
+                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, 
                                                   payload_niov, payload_kiov);
                 else
-                        rc = kqswnal_map_tx_iov (ktx, payload_nob,
+                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
                                                  payload_niov, payload_iov);
                 if (rc != 0) {
                         kqswnal_put_idle_tx (ktx);
@@ -1078,7 +1126,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_send (nal_cb_t     *nal,
               void         *private,
               lib_msg_t    *libmsg,
@@ -1088,13 +1136,15 @@ kqswnal_send (nal_cb_t     *nal,
               ptl_pid_t     pid,
               unsigned int  payload_niov,
               struct iovec *payload_iov,
+              size_t        payload_offset,
               size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, payload_iov, NULL, payload_nob));
+                                 payload_niov, payload_iov, NULL, 
+                                 payload_offset, payload_nob));
 }
 
-static int
+static ptl_err_t
 kqswnal_send_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
@@ -1104,10 +1154,12 @@ kqswnal_send_pages (nal_cb_t     *nal,
                     ptl_pid_t     pid,
                     unsigned int  payload_niov,
                     ptl_kiov_t   *payload_kiov,
+                    size_t        payload_offset,
                     size_t        payload_nob)
 {
         return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
-                                 payload_niov, NULL, payload_kiov, payload_nob));
+                                 payload_niov, NULL, payload_kiov, 
+                                 payload_offset, payload_nob));
 }
 
 void
@@ -1161,7 +1213,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
             nob <= KQSW_TX_BUFFER_SIZE) 
         {
                 /* send from ktx's pre-mapped contiguous buffer? */
-                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
+                lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, nob);
@@ -1176,7 +1228,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         {
                 /* zero copy */
                 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
-                rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
+                rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
                 if (rc != 0)
                         goto failed;
 
@@ -1231,7 +1283,8 @@ kqswnal_dma_reply_complete (EP_RXD *rxd)
         krx->krx_rpc_reply_needed = 0;
         kqswnal_rx_done (krx);
 
-        lib_finalize (&kqswnal_lib, NULL, msg);
+        lib_finalize (&kqswnal_lib, NULL, msg,
+                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
         kqswnal_put_idle_tx (ktx);
 }
 
@@ -1461,13 +1514,14 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 }
 #endif
 
-static int
+static ptl_err_t
 kqswnal_recvmsg (nal_cb_t     *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
                  struct iovec *iov,
                  ptl_kiov_t   *kiov,
+                 size_t        offset,
                  size_t        mlen,
                  size_t        rlen)
 {
@@ -1498,10 +1552,13 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 #endif
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
-        /* What was actually received must be >= payload.
-         * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
-        LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
+        /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
+        if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       krx->krx_nob, KQSW_HDR_SIZE, mlen);
+                return (PTL_FAIL);
+        }
 
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
@@ -1516,20 +1573,37 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
 
                 LASSERT (niov > 0);
+                
                 if (kiov != NULL) {
-                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
-                        iov_nob = kiov->kiov_len;
+                        /* skip complete frags */
+                        while (offset >= kiov->kiov_len) {
+                                offset -= kiov->kiov_len;
+                                kiov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
+                        iov_nob = kiov->kiov_len - offset;
                 } else {
-                        iov_ptr = iov->iov_base;
-                        iov_nob = iov->iov_len;
+                        /* skip complete frags */
+                        while (offset >= iov->iov_len) {
+                                offset -= iov->iov_len;
+                                iov++;
+                                niov--;
+                                LASSERT (niov > 0);
+                        }
+                        iov_ptr = iov->iov_base + offset;
+                        iov_nob = iov->iov_len - offset;
                 }
-
+                
                 for (;;)
                 {
-                        /* We expect the iov to exactly match mlen */
-                        LASSERT (iov_nob <= mlen);
-                        
-                        frag = MIN (page_nob, iov_nob);
+                        frag = mlen;
+                        if (frag > page_nob)
+                                frag = page_nob;
+                        if (frag > iov_nob)
+                                frag = iov_nob;
+
                         memcpy (iov_ptr, page_ptr, frag);
 #if KQSW_CHECKSUM
                         payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
@@ -1588,33 +1662,39 @@ kqswnal_recvmsg (nal_cb_t     *nal,
                        "csum_nob %d\n",
                         hdr_csum, payload_csum, csum_frags, csum_nob);
 #endif
-        lib_finalize(nal, private, libmsg);
+        lib_finalize(nal, private, libmsg, PTL_OK);
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-static int
+static ptl_err_t
 kqswnal_recv(nal_cb_t     *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
              struct iovec *iov,
+             size_t        offset,
              size_t        mlen,
              size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, iov, NULL, 
+                                offset, mlen, rlen));
 }
 
-static int
+static ptl_err_t
 kqswnal_recv_pages (nal_cb_t     *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
                     ptl_kiov_t   *kiov,
+                    size_t        offset,
                     size_t        mlen,
                     size_t        rlen)
 {
-        return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
+        return (kqswnal_recvmsg(nal, private, libmsg, 
+                                niov, NULL, kiov, 
+                                offset, mlen, rlen));
 }
 
 int
index b31c2ea..52afb98 100644 (file)
@@ -176,7 +176,8 @@ kscimacnal_txrelease(mac_mblk_t *msg, mac_msg_status_t status, void *context)
                         break;
         }
 
-        lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie);
+        lib_finalize(ktx->ktx_nal, ktx->ktx_private, ktx->ktx_cookie,
+                     (err == 0) ? PTL_OK : PTL_FAIL);
 
         PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t)));
 }
@@ -225,14 +226,14 @@ kscimacnal_sendmsg(nal_cb_t        *nal,
         if (buf_len > mac_get_mtusize(ksci->ksci_machandle)) {
                 CERROR("kscimacnal:request exceeds TX MTU size (%ld).\n",
                                 mac_get_mtusize(ksci->ksci_machandle));
-                return -EINVAL;
+                return PTL_FAIL;
         }
 
 
         /* save transaction info for later finalize and cleanup */
         PORTAL_ALLOC(ktx, (sizeof(kscimacnal_tx_t)));
         if (!ktx) {
-                return -ENOMEM;
+                return PTL_NOSPACE;
         }
 
         ktx->ktx_nmapped = 0; /* Start with no mapped pages :) */
@@ -247,7 +248,7 @@ kscimacnal_sendmsg(nal_cb_t        *nal,
                         kscimacnal_txrelease, ktx);
         if (!msg) {
                 PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t)));
-                return -ENOMEM;
+                return PTL_NOSPACE;
         }
         mac_put_mblk(msg, sizeof(ptl_hdr_t));
         lastblk=msg;
@@ -284,7 +285,7 @@ kscimacnal_sendmsg(nal_cb_t        *nal,
                 if(!newblk) {
                         mac_free_msg(msg);
                         PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t)));
-                        return -ENOMEM;
+                        return PTL_NOSPACE;
                 }
                 mac_put_mblk(newblk, nob);
                 mac_link_mblk(lastblk, newblk);
@@ -315,10 +316,10 @@ kscimacnal_sendmsg(nal_cb_t        *nal,
                 CERROR("kscimacnal: mac_send() failed, rc=%d\n", rc);
                 mac_free_msg(msg);
                 PORTAL_FREE(ktx, (sizeof(kscimacnal_tx_t)));
-                return rc;
+                return PTL_FAIL;
         }
 
-        return 0;
+        return PTL_OK;
 }
 
 
@@ -463,12 +464,15 @@ kscimacnal_recvmsg(nal_cb_t     *nal,
                         krx->msg, mlen, rlen, niov);
 
         /* What was actually received must be >= what sender claims to have
-         * sent.  This is an LASSERT, since lib-move doesn't check cb return
-         * code yet. Also, rlen seems to be negative when mlen==0 so don't
-         * assert on that.
-         */
-        LASSERT (mlen==0 || mac_msg_size(krx->msg) >= sizeof(ptl_hdr_t)+rlen);
-        LASSERT (mlen==0 || mlen <= rlen);
+         * sent. */
+        LASSERT (mlen <= rlen); /* something is wrong if this isn't true */
+        if (mac_msg_size(krx->msg) < sizeof(ptl_hdr_t)+mlen) {
+                /* We didn't receive everything lib thinks we did */
+                CERROR("Bad message size: have %d, need %d + %d\n",
+                       mac_msg_size(krx->msg), sizeof(ptl_hdr_t), mlen);
+                return (PTL_FAIL);
+        }
+
         /* It must be OK to kmap() if required */
         LASSERT (kiov == NULL || !in_interrupt ());
         /* Either all pages or all vaddrs */
@@ -545,12 +549,12 @@ kscimacnal_recvmsg(nal_cb_t     *nal,
         CDEBUG(D_NET, "Calling lib_finalize.\n");
 
         PROF_START(lib_finalize);
-        lib_finalize(nal, private, cookie);
+        lib_finalize(nal, private, cookie, PTL_OK);
         PROF_FINISH(lib_finalize);
 
         CDEBUG(D_NET, "Done.\n");
 
-        return rlen;
+        return PTL_OK;
 }
 
 
index 9ae1c87..c47dcb4 100644 (file)
@@ -993,15 +993,11 @@ ksocknal_destroy_conn (ksock_conn_t *conn)
         /* complete current receive if any */
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_BODY:
-#if 0
-                lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
-#else
-                CERROR ("Refusing to complete a partial receive from "
-                        LPX64", ip %d.%d.%d.%d:%d\n", conn->ksnc_peer->ksnp_nid,
-                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
-                CERROR ("This may hang communications and "
-                        "prevent modules from unloading\n");
-#endif
+                CERROR("Completing partial receive from "LPX64
+                       ", ip %d.%d.%d.%d:%d, with error\n",
+                       conn->ksnc_peer->ksnp_nid,
+                       HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+                lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
                 break;
         case SOCKNAL_RX_BODY_FWD:
                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
index 82d4c64..3ecead1 100644 (file)
@@ -29,7 +29,7 @@
  *  LIB functions follow
  *
  */
-int
+ptl_err_t
 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
               user_ptr src_addr, size_t len)
 {
@@ -37,10 +37,10 @@ ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
                nal->ni.nid, (long)len, src_addr, dst_addr);
 
         memcpy( dst_addr, src_addr, len );
-        return 0;
+        return PTL_OK;
 }
 
-int
+ptl_err_t
 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
                void *src_addr, size_t len)
 {
@@ -48,20 +48,7 @@ ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
                nal->ni.nid, (long)len, src_addr, dst_addr);
 
         memcpy( dst_addr, src_addr, len );
-        return 0;
-}
-
-int
-ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
-                         ptl_event_t *ev)
-{
-        CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
-               nal->ni.nid, eq, ev);
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        return 0;
+        return PTL_OK;
 }
 
 void *
@@ -617,7 +604,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch)
 
         if (tx->tx_isfwd) {             /* was a forwarded packet? */
                 kpr_fwd_done (&ksocknal_data.ksnd_router,
-                              KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
+                              KSOCK_TX_2_KPR_FWD_DESC (tx), 
+                              (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
                 EXIT;
                 return;
         }
@@ -625,7 +613,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch)
         /* local send */
         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
 
-        lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
+        lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
+                      (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
 
         ksocknal_free_ltx (ltx);
         EXIT;
@@ -694,17 +683,17 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (rc < 0);
 
         if (!conn->ksnc_closing)
-                CERROR ("[%p] Error %d on write to "LPX64
-                        " ip %d.%d.%d.%d:%d\n",conn, rc, 
-                        conn->ksnc_peer->ksnp_nid,
-                        HIPQUAD(conn->ksnc_ipaddr),
-                        conn->ksnc_port);
+                CERROR("[%p] Error %d on write to "LPX64
+                       " ip %d.%d.%d.%d:%d\n", conn, rc,
+                       conn->ksnc_peer->ksnp_nid,
+                       HIPQUAD(conn->ksnc_ipaddr),
+                       conn->ksnc_port);
 
         ksocknal_close_conn_and_siblings (conn, rc);
         ksocknal_tx_launched (tx);
-        
+
         return (rc);
-} 
+}
 
 void
 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
@@ -742,21 +731,21 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
         ptl_nid_t     target_nid;
         int           rc;
         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
-        
+
         if (peer != NULL)
                 return (peer);
-        
+
         if (tx->tx_isfwd) {
                 CERROR ("Can't send packet to "LPX64
-                        " %s: routed target is not a peer\n", 
+                       " %s: routed target is not a peer\n",
                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
                 return (NULL);
         }
-        
+
         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
                          &target_nid);
         if (rc != 0) {
-                CERROR ("Can't route to "LPX64" %s: router error %d\n", 
+                CERROR ("Can't route to "LPX64" %s: router error %d\n",
                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
                 return (NULL);
         }
@@ -1018,7 +1007,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         return (-EHOSTUNREACH);
 }
 
-int
+ptl_err_t
 ksocknal_sendmsg(nal_cb_t     *nal, 
                  void         *private, 
                  lib_msg_t    *cookie,
@@ -1029,6 +1018,7 @@ ksocknal_sendmsg(nal_cb_t     *nal,
                  unsigned int  payload_niov, 
                  struct iovec *payload_iov, 
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         ksock_ltx_t  *ltx;
@@ -1091,20 +1081,19 @@ ksocknal_sendmsg(nal_cb_t     *nal,
                 ltx->ltx_tx.tx_kiov  = NULL;
                 ltx->ltx_tx.tx_nkiov = 0;
 
-                ltx->ltx_tx.tx_niov = 1 + payload_niov;
-
-                memcpy(ltx->ltx_iov + 1, payload_iov,
-                       payload_niov * sizeof (*payload_iov));
-
+                ltx->ltx_tx.tx_niov = 
+                        1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
+                                            payload_niov, payload_iov,
+                                            payload_offset, payload_nob);
         } else {
                 /* payload is all pages */
-                ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
-                ltx->ltx_tx.tx_nkiov = payload_niov;
-
                 ltx->ltx_tx.tx_niov = 1;
 
-                memcpy(ltx->ltx_kiov, payload_kiov, 
-                       payload_niov * sizeof (*payload_kiov));
+                ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
+                ltx->ltx_tx.tx_nkiov =
+                        lib_extract_kiov(payload_niov, ltx->ltx_kiov,
+                                         payload_niov, payload_kiov,
+                                         payload_offset, payload_nob);
         }
 
         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
@@ -1115,28 +1104,28 @@ ksocknal_sendmsg(nal_cb_t     *nal,
         return (PTL_FAIL);
 }
 
-int
+ptl_err_t
 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                unsigned int payload_niov, struct iovec *payload_iov,
-               size_t payload_len)
+               size_t payload_offset, size_t payload_len)
 {
         return (ksocknal_sendmsg(nal, private, cookie,
                                  hdr, type, nid, pid,
                                  payload_niov, payload_iov, NULL,
-                                 payload_len));
+                                 payload_offset, payload_len));
 }
 
-int
+ptl_err_t
 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
-                     size_t payload_len)
+                     size_t payload_offset, size_t payload_len)
 {
         return (ksocknal_sendmsg(nal, private, cookie,
                                  hdr, type, nid, pid,
                                  payload_niov, NULL, payload_kiov,
-                                 payload_len));
+                                 payload_offset, payload_len));
 }
 
 void
@@ -1208,7 +1197,7 @@ ksocknal_fmb_callback (void *arg, int error)
 
         /* drop peer ref taken on init */
         ksocknal_put_peer (fmb->fmb_peer);
-        
+
         spin_lock_irqsave (&fmp->fmp_lock, flags);
 
         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
@@ -1591,7 +1580,7 @@ ksocknal_process_receive (ksock_conn_t *conn)
 
         case SOCKNAL_RX_BODY:
                 /* payload all received */
-                lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
+                lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
                 /* Fall through */
 
         case SOCKNAL_RX_SLOP:
@@ -1627,9 +1616,10 @@ ksocknal_process_receive (ksock_conn_t *conn)
         return (-EINVAL);                       /* keep gcc happy */
 }
 
-int
+ptl_err_t
 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
-               unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
+               unsigned int niov, struct iovec *iov, 
+               size_t offset, size_t mlen, size_t rlen)
 {
         ksock_conn_t *conn = (ksock_conn_t *)private;
 
@@ -1642,20 +1632,22 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
 
         conn->ksnc_rx_nkiov = 0;
         conn->ksnc_rx_kiov = NULL;
-        conn->ksnc_rx_niov = niov;
         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
-        memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
+        conn->ksnc_rx_niov =
+                lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
+                                niov, iov, offset, mlen);
 
         LASSERT (mlen == 
                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-int
+ptl_err_t
 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
-                     unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
+                     unsigned int niov, ptl_kiov_t *kiov, 
+                     size_t offset, size_t mlen, size_t rlen)
 {
         ksock_conn_t *conn = (ksock_conn_t *)private;
 
@@ -1668,15 +1660,16 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
 
         conn->ksnc_rx_niov = 0;
         conn->ksnc_rx_iov  = NULL;
-        conn->ksnc_rx_nkiov = niov;
         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
-        memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
+        conn->ksnc_rx_nkiov = 
+                lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
+                                 niov, kiov, offset, mlen);
 
         LASSERT (mlen == 
                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
 
-        return (rlen);
+        return (PTL_OK);
 }
 
 int ksocknal_scheduler (void *arg)
@@ -2064,7 +2057,7 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (rc);
         }
-        
+
         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
@@ -2118,7 +2111,7 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
                 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
                         __le64_to_cpu (hdr.src_nid),
-                        portals_nid2str(SOCKNAL, 
+                        portals_nid2str(SOCKNAL,
                                         __le64_to_cpu(hdr.src_nid),
                                         ipbuf),
                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
@@ -2139,7 +2132,7 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
                         *type = SOCKNAL_CONN_BULK_IN;
                         break;
                 default:
-                        CERROR ("Unexpected type %d from "LPX64" %s\n", 
+                        CERROR ("Unexpected type %d from "LPX64" %s\n",
                                 *type, *nid,
                                 portals_nid2str(SOCKNAL, *nid, ipbuf));
                         return (-EPROTO);
@@ -2346,8 +2339,8 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         if (rc != 0) {
                 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
                         route->ksnr_peer->ksnp_nid,
-                        portals_nid2str(SOCKNAL, 
-                                        route->ksnr_peer->ksnp_nid, 
+                        portals_nid2str(SOCKNAL,
+                                        route->ksnr_peer->ksnp_nid,
                                         ipbuf));
                 goto out;
         }
@@ -2432,7 +2425,7 @@ ksocknal_autoconnect (ksock_route_t *route)
         while (!list_empty (&zombies)) {
                 char ipbuf[PTL_NALFMT_SIZE];
                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
-                
+
                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
                         NTOH__u32 (tx->tx_hdr->type),
                         NTOH__u32 (tx->tx_hdr->payload_length),
@@ -2719,7 +2712,6 @@ nal_cb_t ksocknal_lib = {
         cb_recv_pages:   ksocknal_recv_pages,
         cb_read:         ksocknal_read,
         cb_write:        ksocknal_write,
-        cb_callback:     ksocknal_callback,
         cb_malloc:       ksocknal_malloc,
         cb_free:         ksocknal_free,
         cb_printf:       ksocknal_printf,
index 2768c8d..2f5a852 100644 (file)
@@ -812,9 +812,11 @@ EXPORT_SYMBOL(PtlMDBind);
 EXPORT_SYMBOL(lib_iov_nob);
 EXPORT_SYMBOL(lib_copy_iov2buf);
 EXPORT_SYMBOL(lib_copy_buf2iov);
+EXPORT_SYMBOL(lib_extract_iov);
 EXPORT_SYMBOL(lib_kiov_nob);
 EXPORT_SYMBOL(lib_copy_kiov2buf);
 EXPORT_SYMBOL(lib_copy_buf2kiov);
+EXPORT_SYMBOL(lib_extract_kiov);
 EXPORT_SYMBOL(lib_finalize);
 EXPORT_SYMBOL(lib_parse);
 EXPORT_SYMBOL(lib_fake_reply_msg);
index 8c03749..d17db61 100644 (file)
@@ -6,5 +6,9 @@
 
 CPPFLAGS=
 INCLUDES=-I$(top_srcdir)/portals/include -I$(top_srcdir)/include
-lib_LIBRARIES= libportals.a
+noinst_LIBRARIES= libportals.a
 libportals_a_SOURCES= api-eq.c api-init.c api-me.c api-errno.c api-ni.c api-wrap.c lib-dispatch.c lib-init.c lib-me.c lib-msg.c lib-eq.c lib-md.c lib-move.c lib-ni.c lib-pid.c
+
+if LIBLUSTRE
+libportals_a_CFLAGS= -fPIC
+endif
index 9bc9c36..964b9d8 100644 (file)
@@ -81,12 +81,6 @@ int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev)
 
         *ev = *new_event;
 
-        /* Set the unlinked_me interface number if there is one to pass
-         * back, since the NAL hasn't a clue what it is and therefore can't
-         * set it. */
-        if (!PtlHandleEqual (ev->unlinked_me, PTL_HANDLE_NONE))
-                ev->unlinked_me.nal_idx = eventq.nal_idx;
-        
         /* ensure event is delivered correctly despite possible 
            races with lib_finalize */
         if (eq->sequence != new_event->sequence) {
@@ -119,6 +113,7 @@ int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
 }
 
 #ifndef __KERNEL__
+#if 0
 static jmp_buf eq_jumpbuf;
 
 static void eq_timeout(int signal)
@@ -162,6 +157,46 @@ int PtlEQWait_timeout(ptl_handle_eq_t eventq_in, ptl_event_t * event_out,
 
         return rc;
 }
+#else
+#include <errno.h>
 
-#endif
+/* FIXME
+ * Here timeout need a trick with tcpnal, definitely unclean but OK for
+ * this moment.
+ */
+
+/* global variables defined by tcpnal */
+extern int __tcpnal_eqwait_timeout_value;
+extern int __tcpnal_eqwait_timedout;
+
+int PtlEQWait_timeout(ptl_handle_eq_t eventq_in, ptl_event_t * event_out,
+                      int timeout)
+{
+        int rc;
 
+        if (!timeout)
+                return PtlEQWait(eventq_in, event_out);
+
+        __tcpnal_eqwait_timeout_value = timeout;
+
+        while ((rc = PtlEQGet(eventq_in, event_out)) == PTL_EQ_EMPTY) {
+                nal_t *nal = ptl_hndl2nal(&eventq_in);
+                
+                if (nal->yield)
+                        nal->yield(nal);
+
+                if (__tcpnal_eqwait_timedout) {
+                        if (__tcpnal_eqwait_timedout != ETIMEDOUT)
+                                printf("Warning: yield return error %d\n",
+                                        __tcpnal_eqwait_timedout);
+                        rc = PTL_EQ_EMPTY;
+                        break;
+                }
+        }
+
+        __tcpnal_eqwait_timeout_value = 0;
+
+        return rc;
+}
+#endif
+#endif /* __KERNEL__ */
index 026c93b..b5e7aa1 100644 (file)
@@ -50,6 +50,5 @@ const char *ptl_err_str[] = {
         "PTL_IOV_TOO_SMALL",
 
         "PTL_EQ_INUSE",
-        "PTL_MD_INUSE"
 };
 /* If you change these, you must update the number table in portals/errno.h */
index b2e069e..18eea91 100644 (file)
@@ -125,7 +125,7 @@ int PtlNIInit(ptl_interface_t interface, ptl_pt_index_t ptl_size,
                 if (ptl_interfaces[i] == nal) {
                         nal->refct++;
                         handle->nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | i;
-                        fprintf(stderr, "Returning existing NAL (%d)\n", i);
+                        CDEBUG(D_OTHER, "Returning existing NAL (%d)\n", i);
                         ptl_ni_init_mutex_exit ();
                         return PTL_OK;
                 }
index e54707f..d23a6aa 100644 (file)
@@ -32,7 +32,7 @@ static int do_forward(ptl_handle_any_t any_h, int cmd, void *argbuf,
         nal_t *nal;
 
         if (!ptl_init) {
-                fprintf(stderr, "PtlGetId: Not initialized\n");
+                CERROR("Not initialized\n");
                 return PTL_NOINIT;
         }
 
@@ -262,7 +262,7 @@ static int validate_md(ptl_handle_any_t current_in, ptl_md_t md_in)
         int i;
 
         if (!ptl_init) {
-                fprintf(stderr, "PtlMDAttach/Bind/Update: Not initialized\n");
+                CERROR("PtlMDAttach/Bind/Update: Not initialized\n");
                 return PTL_NOINIT;
         }
 
index 0765498..d4d8860 100644 (file)
 # include <sys/time.h>
 #endif
 
-#ifndef PTL_USE_DESC_LISTS
-static int ptl_slab_users;
-
-atomic_t md_in_use_count = ATOMIC_INIT(0);
-atomic_t msg_in_use_count = ATOMIC_INIT(0);
-atomic_t me_in_use_count = ATOMIC_INIT(0);
-atomic_t eq_in_use_count = ATOMIC_INIT(0);
+#ifndef PTL_USE_LIB_FREELIST
 
 int
 kportal_descriptor_setup (nal_cb_t *nal)
 {
-        ptl_slab_users++;
-        RETURN(PTL_OK);
+        return PTL_OK;
 }
 
 void
 kportal_descriptor_cleanup (nal_cb_t *nal)
 {
-        if (--ptl_slab_users != 0)
-                return;
-
-        LASSERT (atomic_read (&md_in_use_count) == 0);
-        LASSERT (atomic_read (&me_in_use_count) == 0);
-        LASSERT (atomic_read (&eq_in_use_count) == 0);
-        LASSERT (atomic_read (&msg_in_use_count) == 0);
 }
 #else
 
index be6949c..a1ed583 100644 (file)
@@ -83,7 +83,7 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private,
         int           rc;
         int           i;
 
-        /* NB we are passes an allocated, but uninitialised/active md.
+        /* NB we are passed an allocated, but uninitialised/active md.
          * if we return success, caller may lib_md_unlink() it.
          * otherwise caller may only lib_md_free() it.
          */
@@ -94,9 +94,10 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private,
                         return PTL_INV_EQ;
         }
 
-        if ((md->options & PTL_MD_IOV) != 0 &&  /* discontiguous MD */
-            md->niov > PTL_MD_MAX_IOV)          /* too many fragments */
-                return PTL_IOV_TOO_MANY;
+        /* Must check this _before_ allocation.  Also, note that non-iov
+         * MDs must set md_niov to 0. */
+        LASSERT((md->options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0 ||
+                md->niov <= PTL_MD_MAX_IOV);
 
         if ((md->options & max_size_opts) != 0 && /* max size used */
             (md->max_size < 0 || md->max_size > md->length)) // illegal max_size
@@ -239,7 +240,11 @@ int do_PtlMDAttach(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         lib_md_t *md;
         unsigned long flags;
 
-        md = lib_md_alloc (nal);
+        if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 &&
+            args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */
+                return (ret->rc = PTL_IOV_TOO_MANY);
+
+        md = lib_md_alloc(nal, &args->md_in);
         if (md == NULL)
                 return (ret->rc = PTL_NOSPACE);
 
@@ -287,7 +292,11 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         lib_md_t *md;
         unsigned long flags;
 
-        md = lib_md_alloc (nal);
+        if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 &&
+            args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */
+                return (ret->rc = PTL_IOV_TOO_MANY);
+
+        md = lib_md_alloc(nal, &args->md_in);
         if (md == NULL)
                 return (ret->rc = PTL_NOSPACE);
 
@@ -311,34 +320,43 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 
 int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 {
-        PtlMDUnlink_in *args = v_args;
+        PtlMDUnlink_in  *args = v_args;
         PtlMDUnlink_out *ret = v_ret;
-
-        lib_md_t *md;
-        unsigned long flags;
+        ptl_event_t      ev;
+        lib_md_t        *md;
+        unsigned long    flags;
 
         state_lock(nal, &flags);
 
         md = ptl_handle2md(&args->md_in, nal);
         if (md == NULL) {
-                ret->rc = PTL_INV_MD;
-        } else if (md->pending != 0) {           /* being filled/spilled */
-                ret->rc = PTL_MD_INUSE;
-        } else {
-                /* Callers attempting to unlink a busy MD which will get
-                 * unlinked once the net op completes should see INUSE,
-                 * before completion and INV_MD thereafter.  LASSERT we've
-                 * got that right... */
-                LASSERT ((md->md_flags & PTL_MD_FLAG_UNLINK) == 0);
-
-                lib_md_deconstruct(nal, md, &ret->status_out);
-                lib_md_unlink(nal, md);
-                ret->rc = PTL_OK;
+                state_unlock(nal, &flags);
+                return (ret->rc = PTL_INV_MD);
+        }
+
+        /* If the MD is busy, lib_md_unlink just marks it for deletion, and
+         * when the NAL is done, the completion event flags that the MD was
+         * unlinked.  Otherwise, we enqueue an event now... */
+
+        if (md->eq != NULL &&
+            md->pending == 0) {
+                memset(&ev, 0, sizeof(ev));
+
+                ev.type = PTL_EVENT_UNLINK;
+                ev.status = PTL_OK;
+                ev.unlinked = 1;
+                lib_md_deconstruct(nal, md, &ev.mem_desc);
+                
+                lib_enq_event_locked(nal, private, md->eq, &ev);
         }
 
+        lib_md_deconstruct(nal, md, &ret->status_out);
+        lib_md_unlink(nal, md);
+        ret->rc = PTL_OK;
+
         state_unlock(nal, &flags);
 
-        return (ret->rc);
+        return (PTL_OK);
 }
 
 int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args,
@@ -379,6 +397,23 @@ int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args,
                 goto out;
         }
 
+        /* XXX fttb, the new MD must be the same type wrt fragmentation */
+        if (((new->options ^ md->options) & 
+             (PTL_MD_IOV | PTL_MD_KIOV)) != 0) {
+                ret->rc = PTL_INV_MD;
+                goto out;
+        }
+
+        if (new->niov > md->md_niov) {
+                ret->rc = PTL_IOV_TOO_MANY;
+                goto out;
+        } 
+
+        if (new->niov < md->md_niov) {
+                ret->rc = PTL_IOV_TOO_SMALL;
+                goto out;
+        }
+
         if (!PtlHandleEqual (args->testq_in, PTL_EQ_NONE)) {
                 test_eq = ptl_handle2eq(&args->testq_in, nal);
                 if (test_eq == NULL) {
index d844a7a..ecd543c 100644 (file)
@@ -258,55 +258,78 @@ lib_iov_nob (int niov, struct iovec *iov)
 }
 
 void
-lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
+lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
+                  ptl_size_t offset, ptl_size_t len)
 {
         ptl_size_t nob;
 
-        while (len > 0)
-        {
+        if (len == 0)
+                return;
+        
+        /* skip complete frags before 'offset' */
+        LASSERT (niov > 0);
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+                
+        do {
                 LASSERT (niov > 0);
-                nob = MIN (iov->iov_len, len);
-                memcpy (dest, iov->iov_base, nob);
+                nob = MIN (iov->iov_len - offset, len);
+                memcpy (dest, iov->iov_base + offset, nob);
 
                 len -= nob;
                 dest += nob;
                 niov--;
                 iov++;
-        }
+                offset = 0;
+        } while (len > 0);
 }
 
 void
-lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
+lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
+                  char *src, ptl_size_t len)
 {
         ptl_size_t nob;
 
-        while (len > 0)
-        {
+        if (len == 0)
+                return;
+
+        /* skip complete frags before 'offset' */
+        LASSERT (niov > 0);
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                iov++;
+                niov--;
                 LASSERT (niov > 0);
-                nob = MIN (iov->iov_len, len);
-                memcpy (iov->iov_base, src, nob);
+        }
+        
+        do {
+                LASSERT (niov > 0);
+                nob = MIN (iov->iov_len - offset, len);
+                memcpy (iov->iov_base + offset, src, nob);
                 
                 len -= nob;
                 src += nob;
                 niov--;
                 iov++;
-        }
+                offset = 0;
+        } while (len > 0);
 }
 
-static int
-lib_extract_iov (struct iovec *dst, lib_md_t *md,
+int
+lib_extract_iov (int dst_niov, struct iovec *dst,
+                 int src_niov, struct iovec *src,
                  ptl_size_t offset, ptl_size_t len)
 {
         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
          * for exactly 'len' bytes, and return the number of entries.
          * NB not destructive to 'src' */
-        int             src_niov = md->md_niov;  
-        struct iovec   *src = md->md_iov.iov;
         ptl_size_t      frag_len;
-        int             dst_niov;
+        int             niov;
 
-        LASSERT (offset + len <= md->length);
-        
         if (len == 0)                           /* no data => */
                 return (0);                     /* no frags */
 
@@ -318,17 +341,17 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md,
                 LASSERT (src_niov > 0);
         }
 
-        dst_niov = 1;
+        niov = 1;
         for (;;) {
                 LASSERT (src_niov > 0);
-                LASSERT (dst_niov <= PTL_MD_MAX_IOV);
+                LASSERT (niov <= dst_niov);
                 
                 frag_len = src->iov_len - offset;
                 dst->iov_base = ((char *)src->iov_base) + offset;
 
                 if (len <= frag_len) {
                         dst->iov_len = len;
-                        return (dst_niov);
+                        return (niov);
                 }
                 
                 dst->iov_len = frag_len;
@@ -336,7 +359,7 @@ lib_extract_iov (struct iovec *dst, lib_md_t *md,
                 len -= frag_len;
                 dst++;
                 src++;
-                dst_niov++;
+                niov++;
                 src_niov--;
                 offset = 0;
         }
@@ -351,19 +374,22 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov)
 }
 
 void
-lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
+lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
+                   ptl_size_t offset, ptl_size_t len)
 {
         LASSERT (0);
 }
 
 void
-lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
+lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
+                   char *src, ptl_size_t len)
 {
         LASSERT (0);
 }
 
-static int
-lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
+int
+lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
+                  int src_niov, ptl_kiov_t *src,
                   ptl_size_t offset, ptl_size_t len)
 {
         LASSERT (0);
@@ -383,18 +409,30 @@ lib_kiov_nob (int niov, ptl_kiov_t *kiov)
 }
 
 void
-lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
+lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
+                   ptl_size_t offset, ptl_size_t len)
 {
         ptl_size_t  nob;
         char       *addr;
+
+        if (len == 0)
+                return;
         
         LASSERT (!in_interrupt ());
-        while (len > 0)
-        {
+
+        LASSERT (niov > 0);
+        while (offset > kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
+        do{
                 LASSERT (niov > 0);
-                nob = MIN (kiov->kiov_len, len);
+                nob = MIN (kiov->kiov_len - offset, len);
                 
-                addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
                 memcpy (dest, addr, nob);
                 kunmap (kiov->kiov_page);
                 
@@ -402,22 +440,35 @@ lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
                 dest += nob;
                 niov--;
                 kiov++;
-        }
+                offset = 0;
+        } while (len > 0);
 }
 
 void
-lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
+lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
+                   char *src, ptl_size_t len)
 {
         ptl_size_t  nob;
         char       *addr;
 
+        if (len == 0)
+                return;
+
         LASSERT (!in_interrupt ());
-        while (len > 0)
-        {
+
+        LASSERT (niov > 0);
+        while (offset >= kiov->kiov_len) {
+                offset -= kiov->kiov_len;
+                kiov++;
+                niov--;
+                LASSERT (niov > 0);
+        }
+        
+        do {
                 LASSERT (niov > 0);
-                nob = MIN (kiov->kiov_len, len);
+                nob = MIN (kiov->kiov_len - offset, len);
                 
-                addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
+                addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
                 memcpy (addr, src, nob);
                 kunmap (kiov->kiov_page);
                 
@@ -425,23 +476,21 @@ lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
                 src += nob;
                 niov--;
                 kiov++;
-        }
+                offset = 0;
+        } while (len > 0);
 }
 
-static int
-lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
+int
+lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
+                  int src_niov, ptl_kiov_t *src,
                   ptl_size_t offset, ptl_size_t len)
 {
         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
          * for exactly 'len' bytes, and return the number of entries.
          * NB not destructive to 'src' */
-        int             src_niov = md->md_niov;  
-        ptl_kiov_t     *src = md->md_iov.kiov;
         ptl_size_t      frag_len;
-        int             dst_niov;
+        int             niov;
 
-        LASSERT (offset + len <= md->length);
-        
         if (len == 0)                           /* no data => */
                 return (0);                     /* no frags */
 
@@ -453,10 +502,10 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
                 LASSERT (src_niov > 0);
         }
 
-        dst_niov = 1;
+        niov = 1;
         for (;;) {
                 LASSERT (src_niov > 0);
-                LASSERT (dst_niov <= PTL_MD_MAX_IOV);
+                LASSERT (niov <= dst_niov);
                 
                 frag_len = src->kiov_len - offset;
                 dst->kiov_page = src->kiov_page;
@@ -465,7 +514,7 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
                 if (len <= frag_len) {
                         dst->kiov_len = len;
                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
-                        return (dst_niov);
+                        return (niov);
                 }
 
                 dst->kiov_len = frag_len;
@@ -474,73 +523,66 @@ lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
                 len -= frag_len;
                 dst++;
                 src++;
-                dst_niov++;
+                niov++;
                 src_niov--;
                 offset = 0;
         }
 }
 #endif
 
-void
+ptl_err_t
 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
 {
-        int   niov;
-
         if (mlen == 0)
-                nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
-        else if ((md->options & PTL_MD_KIOV) == 0) {
-                niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
-                nal->cb_recv (nal, private, msg,
-                              niov, msg->msg_iov.iov, mlen, rlen);
-        } else {
-                niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
-                nal->cb_recv_pages (nal, private, msg, 
-                                    niov, msg->msg_iov.kiov, mlen, rlen);
-        }
+                return (nal->cb_recv(nal, private, msg,
+                                     0, NULL,
+                                     offset, mlen, rlen));
+
+        if ((md->options & PTL_MD_KIOV) == 0)
+                return (nal->cb_recv(nal, private, msg,
+                                     md->md_niov, md->md_iov.iov, 
+                                     offset, mlen, rlen));
+
+        return (nal->cb_recv_pages(nal, private, msg, 
+                                   md->md_niov, md->md_iov.kiov,
+                                   offset, mlen, rlen));
 }
 
-int
+ptl_err_t
 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
 {
-        int   niov;
-
         if (len == 0)
-                return (nal->cb_send (nal, private, msg, 
-                                      hdr, type, nid, pid,
-                                      0, NULL, 0));
+                return (nal->cb_send(nal, private, msg,
+                                     hdr, type, nid, pid,
+                                     0, NULL,
+                                     offset, len));
         
-        if ((md->options & PTL_MD_KIOV) == 0) {
-                niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
-                return (nal->cb_send (nal, private, msg, 
-                                      hdr, type, nid, pid,
-                                      niov, msg->msg_iov.iov, len));
-        }
-
-        niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
-        return (nal->cb_send_pages (nal, private, msg, 
-                                    hdr, type, nid, pid,
-                                    niov, msg->msg_iov.kiov, len));
+        if ((md->options & PTL_MD_KIOV) == 0)
+                return (nal->cb_send(nal, private, msg, 
+                                     hdr, type, nid, pid,
+                                     md->md_niov, md->md_iov.iov,
+                                     offset, len));
+
+        return (nal->cb_send_pages(nal, private, msg, 
+                                   hdr, type, nid, pid,
+                                   md->md_niov, md->md_iov.kiov,
+                                   offset, len));
 }
 
-static lib_msg_t *
-get_new_msg (nal_cb_t *nal, lib_md_t *md)
+static void
+lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg)
 {
         /* ALWAYS called holding the state_lock */
         lib_counters_t *counters = &nal->ni.counters;
-        lib_msg_t      *msg      = lib_msg_alloc (nal);
-
-        if (msg == NULL)
-                return (NULL);
-
-        memset (msg, 0, sizeof (*msg));
-
-        msg->send_ack = 0;
 
+        /* Here, we commit the MD to a network OP by marking it busy and
+         * decrementing its threshold.  Come what may, the network "owns"
+         * the MD until a call to lib_finalize() signals completion. */
         msg->md = md;
-        do_gettimeofday(&msg->ev.arrival_time);
+         
         md->pending++;
         if (md->threshold != PTL_MD_THRESH_INF) {
                 LASSERT (md->threshold > 0);
@@ -552,8 +594,24 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md)
                 counters->msgs_max = counters->msgs_alloc;
 
         list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
+}
 
-        return (msg);
+static void
+lib_drop_message (nal_cb_t *nal, void *private, ptl_hdr_t *hdr)
+{
+        unsigned long flags;
+
+        /* CAVEAT EMPTOR: this only drops messages that we've not committed
+         * to receive (init_msg() not called) and therefore can't cause an
+         * event. */
+        
+        state_lock(nal, &flags);
+        nal->ni.counters.drop_count++;
+        nal->ni.counters.drop_length += hdr->payload_length;
+        state_unlock(nal, &flags);
+
+        /* NULL msg => if NAL calls lib_finalize it will be a noop */
+        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
 }
 
 /*
@@ -563,17 +621,18 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md)
  * of long messages.
  *
  */
-static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
+static ptl_err_t
+parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 {
         lib_ni_t        *ni = &nal->ni;
         ptl_size_t       mlength = 0;
         ptl_size_t       offset = 0;
         int              unlink = 0;
+        ptl_err_t        rc;
         lib_me_t        *me;
         lib_md_t        *md;
-        lib_msg_t       *msg;
         unsigned long    flags;
-
+                
         /* Convert put fields to host byte order */
         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
@@ -586,8 +645,10 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                          hdr->payload_length, hdr->msg.put.offset,
                          hdr->msg.put.match_bits,
                          &mlength, &offset, &unlink);
-        if (me == NULL)
-                goto drop;
+        if (me == NULL) {
+                state_unlock(nal, &flags);
+                return (PTL_FAIL);
+        }
 
         md = me->md;
         CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
@@ -595,69 +656,46 @@ static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, 
                md->md_lh.lh_cookie, md->md_niov, offset);
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
-                       ni->nid, hdr->src_nid);
-                goto drop;
-        }
+        lib_commit_md(nal, md, msg);
+
+        msg->ev.type = PTL_EVENT_PUT;
+        msg->ev.initiator.nid = hdr->src_nid;
+        msg->ev.initiator.pid = hdr->src_pid;
+        msg->ev.portal = hdr->msg.put.ptl_index;
+        msg->ev.match_bits = hdr->msg.put.match_bits;
+        msg->ev.rlength = hdr->payload_length;
+        msg->ev.mlength = mlength;
+        msg->ev.offset = offset;
+        msg->ev.hdr_data = hdr->msg.put.hdr_data;
+
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
 
         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
             !(md->options & PTL_MD_ACK_DISABLE)) {
-                msg->send_ack = 1;
                 msg->ack_wmd = hdr->msg.put.ack_wmd;
-                msg->nid = hdr->src_nid;
-                msg->pid = hdr->src_pid;
-                msg->ev.match_bits = hdr->msg.put.match_bits;
-        }
-
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_PUT;
-                msg->ev.initiator.nid = hdr->src_nid;
-                msg->ev.initiator.pid = hdr->src_pid;
-                msg->ev.portal = hdr->msg.put.ptl_index;
-                msg->ev.match_bits = hdr->msg.put.match_bits;
-                msg->ev.rlength = hdr->payload_length;
-                msg->ev.mlength = mlength;
-                msg->ev.offset = offset;
-                msg->ev.hdr_data = hdr->msg.put.hdr_data;
-
-                /* NB if this match has exhausted the MD, we can't be sure
-                 * that this event will the the last one associated with
-                 * this MD in the event queue (another message already
-                 * matching this ME/MD could end up being last).  So we
-                 * remember the ME handle anyway and check again when we're
-                 * allocating our slot in the event queue.
-                 */
-                ptl_me2handle (&msg->ev.unlinked_me, me);
-
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
         }
 
         ni->counters.recv_count++;
         ni->counters.recv_length += mlength;
 
-        /* only unlink after MD's pending count has been bumped
-         * in get_new_msg() otherwise lib_me_unlink() will nuke it */
-        if (unlink) {
-                md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
+        /* only unlink after MD's pending count has been bumped in
+         * lib_commit_md() otherwise lib_me_unlink() will nuke it */
+        if (unlink)
                 lib_me_unlink (nal, me);
-        }
 
         state_unlock(nal, &flags);
 
-        lib_recv (nal, private, msg, md, offset, mlength, hdr->payload_length);
-        return 0;
+        rc = lib_recv(nal, private, msg, md, offset, mlength,
+                      hdr->payload_length);
+        if (rc != PTL_OK)
+                CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
+                       ni->nid, hdr->src_nid, rc);
 
- drop:
-        nal->ni.counters.drop_count++;
-        nal->ni.counters.drop_length += hdr->payload_length;
-        state_unlock (nal, &flags);
-        lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-        return -1;
+        return (rc);
 }
 
-static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
+static ptl_err_t
+parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 {
         lib_ni_t        *ni = &nal->ni;
         ptl_size_t       mlength = 0;
@@ -665,7 +703,6 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
         int              unlink = 0;
         lib_me_t        *me;
         lib_md_t        *md;
-        lib_msg_t       *msg;
         ptl_hdr_t        reply;
         unsigned long    flags;
         int              rc;
@@ -683,8 +720,10 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                          hdr->msg.get.sink_length, hdr->msg.get.src_offset,
                          hdr->msg.get.match_bits,
                          &mlength, &offset, &unlink);
-        if (me == NULL)
-                goto drop;
+        if (me == NULL) {
+                state_unlock(nal, &flags);
+                return (PTL_FAIL);
+        }
 
         md = me->md;
         CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
@@ -692,45 +731,27 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length, 
                md->md_lh.lh_cookie, md->md_niov, offset);
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
-                       ni->nid, hdr->src_nid);
-                goto drop;
-        }
+        lib_commit_md(nal, md, msg);
 
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_GET;
-                msg->ev.initiator.nid = hdr->src_nid;
-                msg->ev.initiator.pid = hdr->src_pid;
-                msg->ev.portal = hdr->msg.get.ptl_index;
-                msg->ev.match_bits = hdr->msg.get.match_bits;
-                msg->ev.rlength = hdr->payload_length;
-                msg->ev.mlength = mlength;
-                msg->ev.offset = offset;
-                msg->ev.hdr_data = 0;
-
-                /* NB if this match has exhausted the MD, we can't be sure
-                 * that this event will the the last one associated with
-                 * this MD in the event queue (another message already
-                 * matching this ME/MD could end up being last).  So we
-                 * remember the ME handle anyway and check again when we're
-                 * allocating our slot in the event queue.
-                 */
-                ptl_me2handle (&msg->ev.unlinked_me, me);
-
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-        }
+        msg->ev.type = PTL_EVENT_GET;
+        msg->ev.initiator.nid = hdr->src_nid;
+        msg->ev.initiator.pid = hdr->src_pid;
+        msg->ev.portal = hdr->msg.get.ptl_index;
+        msg->ev.match_bits = hdr->msg.get.match_bits;
+        msg->ev.rlength = hdr->payload_length;
+        msg->ev.mlength = mlength;
+        msg->ev.offset = offset;
+        msg->ev.hdr_data = 0;
+
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
 
         ni->counters.send_count++;
         ni->counters.send_length += mlength;
 
-        /* only unlink after MD's refcount has been bumped
-         * in get_new_msg() otherwise lib_me_unlink() will nuke it */
-        if (unlink) {
-                md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
+        /* only unlink after MD's refcount has been bumped in
+         * lib_commit_md() otherwise lib_me_unlink() will nuke it */
+        if (unlink)
                 lib_me_unlink (nal, me);
-        }
 
         state_unlock(nal, &flags);
 
@@ -749,36 +770,25 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
 
         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
-        if (rc != PTL_OK) {
-                CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
-                       ni->nid, hdr->src_nid);
-                /* Hmm, this will create a GET event and make believe
-                 * the reply completed, which it kind of did, only the
-                 * source won't get her reply */
-                lib_finalize (nal, private, msg);
-                state_lock (nal, &flags);
-                goto drop;
-        }
+        if (rc != PTL_OK)
+                CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n",
+                       ni->nid, hdr->src_nid, rc);
+
+        /* Discard any junk after the hdr */
+        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
 
-        /* Complete the incoming message */
-        lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
         return (rc);
- drop:
-        ni->counters.drop_count++;
-        ni->counters.drop_length += hdr->msg.get.sink_length;
-        state_unlock(nal, &flags);
-        lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-        return -1;
 }
 
-static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
+static ptl_err_t
+parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 {
         lib_ni_t        *ni = &nal->ni;
         lib_md_t        *md;
         int              rlength;
         int              length;
-        lib_msg_t       *msg;
         unsigned long    flags;
+        ptl_err_t        rc;
 
         state_lock(nal, &flags);
 
@@ -790,7 +800,9 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                         md == NULL ? "invalid" : "inactive",
                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
                         hdr->msg.reply.dst_wmd.wh_object_cookie);
-                goto drop;
+
+                state_unlock(nal, &flags);
+                return (PTL_FAIL);
         }
 
         LASSERT (md->offset == 0);
@@ -804,7 +816,8 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                                 ni->nid, hdr->src_nid, length,
                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
                                 md->length);
-                        goto drop;
+                        state_unlock(nal, &flags);
+                        return (PTL_FAIL);
                 }
                 length = md->length;
         }
@@ -813,46 +826,36 @@ static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                hdr->src_nid, length, rlength, 
                hdr->msg.reply.dst_wmd.wh_object_cookie);
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR(LPU64": Dropping REPLY from "LPU64": can't "
-                       "allocate msg\n", ni->nid, hdr->src_nid);
-                goto drop;
-        }
+        lib_commit_md(nal, md, msg);
 
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_REPLY;
-                msg->ev.initiator.nid = hdr->src_nid;
-                msg->ev.initiator.pid = hdr->src_pid;
-                msg->ev.rlength = rlength;
-                msg->ev.mlength = length;
-                msg->ev.offset = 0;
+        msg->ev.type = PTL_EVENT_REPLY;
+        msg->ev.initiator.nid = hdr->src_nid;
+        msg->ev.initiator.pid = hdr->src_pid;
+        msg->ev.rlength = rlength;
+        msg->ev.mlength = length;
+        msg->ev.offset = 0;
 
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-        }
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
 
         ni->counters.recv_count++;
         ni->counters.recv_length += length;
 
         state_unlock(nal, &flags);
 
-        lib_recv (nal, private, msg, md, 0, length, rlength);
-        return 0;
+        rc = lib_recv(nal, private, msg, md, 0, length, rlength);
+        if (rc != PTL_OK)
+                CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
+                       ni->nid, hdr->src_nid, rc);
 
- drop:
-        nal->ni.counters.drop_count++;
-        nal->ni.counters.drop_length += hdr->payload_length;
-        state_unlock (nal, &flags);
-        lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-        return -1;
+        return (rc);
 }
 
-static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
+static ptl_err_t
+parse_ack(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
 {
-        lib_ni_t *ni = &nal->ni;
-        lib_md_t *md;
-        lib_msg_t *msg = NULL;
-        unsigned long flags;
+        lib_ni_t      *ni = &nal->ni;
+        lib_md_t      *md;
+        unsigned long  flags;
 
         /* Convert ack fields to host byte order */
         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
@@ -868,40 +871,37 @@ static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                        (md == NULL) ? "invalid" : "inactive",
                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
                        hdr->msg.ack.dst_wmd.wh_object_cookie);
-                goto drop;
+
+                state_unlock(nal, &flags);
+                return (PTL_FAIL);
         }
 
         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
                ni->nid, hdr->src_nid, 
                hdr->msg.ack.dst_wmd.wh_object_cookie);
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
-                       ni->nid, hdr->src_nid);
-                goto drop;
-        }
+        lib_commit_md(nal, md, msg);
 
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_ACK;
-                msg->ev.initiator.nid = hdr->src_nid;
-                msg->ev.initiator.pid = hdr->src_pid;
-                msg->ev.mlength = hdr->msg.ack.mlength;
-                msg->ev.match_bits = hdr->msg.ack.match_bits;
+        msg->ev.type = PTL_EVENT_ACK;
+        msg->ev.initiator.nid = hdr->src_nid;
+        msg->ev.initiator.pid = hdr->src_pid;
+        msg->ev.mlength = hdr->msg.ack.mlength;
+        msg->ev.match_bits = hdr->msg.ack.match_bits;
 
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-        }
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
 
         ni->counters.recv_count++;
-        state_unlock(nal, &flags);
-        lib_recv (nal, private, msg, NULL, 0, 0, hdr->payload_length);
-        return 0;
 
- drop:
-        nal->ni.counters.drop_count++;
-        state_unlock (nal, &flags);
-        lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-        return -1;
+        state_unlock(nal, &flags);
+        
+        /* We have received and matched up the ack OK, create the
+         * completion event now... */
+        lib_finalize(nal, private, msg, PTL_OK);
+
+        /* ...and now discard any junk after the hdr */
+        (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
+       return (PTL_OK);
 }
 
 static char *
@@ -983,10 +983,13 @@ void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
 }                               /* end of print_hdr() */
 
 
-int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
+void 
+lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private)
 {
         unsigned long  flags;
-
+        ptl_err_t      rc;
+        lib_msg_t     *msg;
+        
         /* convert common fields to host byte order */
         hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
         hdr->src_nid = NTOH__u64 (hdr->src_nid);
@@ -1008,22 +1011,16 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                         nal->ni.nid, mv->magic, 
                         mv->version_major, mv->version_minor,
                         hdr->src_nid);
-                lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-                return (-1);
+                lib_drop_message(nal, private, hdr);
+                return;
         }
         
         if (hdr->dest_nid != nal->ni.nid) {
                 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
                        " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
                        hdr->src_nid, hdr->dest_nid);
-
-                state_lock (nal, &flags);
-                nal->ni.counters.drop_count++;
-                nal->ni.counters.drop_length += hdr->payload_length;
-                state_unlock (nal, &flags);
-
-                lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-                return (-1);
+                lib_drop_message(nal, private, hdr);
+                return;
         }
 
         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
@@ -1033,34 +1030,59 @@ int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
                        ": simulated failure\n",
                        nal->ni.nid, hdr_type_string (hdr), 
                        hdr->src_nid);
-                lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-                return (-1);
+                lib_drop_message(nal, private, hdr);
+                return;
         }
-        
+
+        msg = lib_msg_alloc(nal);
+        if (msg == NULL) {
+                CERROR(LPU64": Dropping incoming %s from "LPU64
+                       ": can't allocate a lib_msg_t\n",
+                       nal->ni.nid, hdr_type_string (hdr), 
+                       hdr->src_nid);
+                lib_drop_message(nal, private, hdr);
+                return;
+        }
+
+        do_gettimeofday(&msg->ev.arrival_time);
+
         switch (hdr->type) {
         case PTL_MSG_ACK:
-                return (parse_ack(nal, hdr, private));
+                rc = parse_ack(nal, hdr, private, msg);
+                break;
         case PTL_MSG_PUT:
-                return (parse_put(nal, hdr, private));
+                rc = parse_put(nal, hdr, private, msg);
                 break;
         case PTL_MSG_GET:
-                return (parse_get(nal, hdr, private));
+                rc = parse_get(nal, hdr, private, msg);
                 break;
         case PTL_MSG_REPLY:
-                return (parse_reply(nal, hdr, private));
+                rc = parse_reply(nal, hdr, private, msg);
                 break;
         default:
                 CERROR(LPU64": Dropping <unknown> message from "LPU64
                        ": Bad type=0x%x\n",  nal->ni.nid, hdr->src_nid,
                        hdr->type);
-
-                lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
-                return (-1);
+                rc = PTL_FAIL;
+                break;
+        }
+                
+        if (rc != PTL_OK) {
+                if (msg->md != NULL) {
+                        /* committed... */
+                        lib_finalize(nal, private, msg, rc);
+                } else {
+                        state_lock(nal, &flags);
+                        lib_msg_free(nal, msg); /* expects state_lock held */
+                        state_unlock(nal, &flags);
+
+                        lib_drop_message(nal, private, hdr);
+                }
         }
 }
 
-
-int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
+int 
+do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
 {
         /*
          * Incoming:
@@ -1075,16 +1097,15 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
          * Outgoing:
          */
 
-        PtlPut_in *args = v_args;
-        PtlPut_out *ret = v_ret;
-        ptl_hdr_t hdr;
-
-        lib_ni_t *ni = &nal->ni;
-        lib_md_t *md;
-        lib_msg_t *msg = NULL;
+        PtlPut_in        *args = v_args;
         ptl_process_id_t *id = &args->target_in;
-        unsigned long flags;
-        int           rc;
+        PtlPut_out       *ret = v_ret;
+        lib_ni_t         *ni = &nal->ni;
+        lib_msg_t        *msg;
+        ptl_hdr_t         hdr;
+        lib_md_t         *md;
+        unsigned long     flags;
+        int               rc;
         
         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
             fail_peer (nal, id->nid, 1))           /* shall we now? */
@@ -1093,13 +1114,22 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
                        nal->ni.nid, id->nid);
                 return (ret->rc = PTL_INV_PROC);
         }
-        
-        ret->rc = PTL_OK;
+
+        msg = lib_msg_alloc(nal);
+        if (msg == NULL) {
+                CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
+                       ni->nid, id->nid);
+                return (ret->rc = PTL_NOSPACE);
+        }
+
         state_lock(nal, &flags);
+
         md = ptl_handle2md(&args->md_in, nal);
-        if (md == NULL || !md->threshold) {
+        if (md == NULL || md->threshold == 0) {
+                lib_msg_free(nal, msg);
                 state_unlock(nal, &flags);
-                return ret->rc = PTL_INV_MD;
+        
+                return (ret->rc = PTL_INV_MD);
         }
 
         CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
@@ -1126,57 +1156,39 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         hdr.msg.put.offset = HTON__u32 (args->offset_in);
         hdr.msg.put.hdr_data = args->hdr_data_in;
 
+        lib_commit_md(nal, md, msg);
+        
+        msg->ev.type = PTL_EVENT_SENT;
+        msg->ev.initiator.nid = ni->nid;
+        msg->ev.initiator.pid = ni->pid;
+        msg->ev.portal = args->portal_in;
+        msg->ev.match_bits = args->match_bits_in;
+        msg->ev.rlength = md->length;
+        msg->ev.mlength = md->length;
+        msg->ev.offset = args->offset_in;
+        msg->ev.hdr_data = args->hdr_data_in;
+
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
+
         ni->counters.send_count++;
         ni->counters.send_length += md->length;
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR("BAD: could not allocate msg!\n");
-                state_unlock(nal, &flags);
-                return ret->rc = PTL_NOSPACE;
-        }
-
-        /*
-         * If this memory descriptor has an event queue associated with
-         * it we need to allocate a message state object and record the
-         * information about this operation that will be recorded into
-         * event queue once the message has been completed.
-         *
-         * NB. We're now committed to the GET, since we just marked the MD
-         * busy.  Callers who observe this (by getting PTL_MD_INUSE from
-         * PtlMDUnlink()) expect a completion event to tell them when the
-         * MD becomes idle. 
-         */
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_SENT;
-                msg->ev.initiator.nid = ni->nid;
-                msg->ev.initiator.pid = ni->pid;
-                msg->ev.portal = args->portal_in;
-                msg->ev.match_bits = args->match_bits_in;
-                msg->ev.rlength = md->length;
-                msg->ev.mlength = md->length;
-                msg->ev.offset = args->offset_in;
-                msg->ev.hdr_data = args->hdr_data_in;
-
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-        }
-
         state_unlock(nal, &flags);
         
         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
                        id->nid, id->pid, md, 0, md->length);
         if (rc != PTL_OK) {
-                /* get_new_msg() committed us to sending by decrementing
-                 * md->threshold, so we have to act like we did send, but
-                 * the network dropped it. */
-                lib_finalize (nal, private, msg);
+                CERROR(LPU64": error sending PUT to "LPU64": %d\n",
+                       ni->nid, id->nid, rc);
+                lib_finalize (nal, private, msg, rc);
         }
         
+        /* completion will be signalled by an event */
         return ret->rc = PTL_OK;
 }
 
-lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, 
-                                lib_md_t *getmd)
+lib_msg_t * 
+lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd)
 {
         /* The NAL can DMA direct to the GET md (i.e. no REPLY msg).  This
          * returns a msg the NAL can pass to lib_finalize() so that a REPLY
@@ -1188,39 +1200,38 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
          * lib_finalize() of the original GET. */
 
         lib_ni_t        *ni = &nal->ni;
-        lib_msg_t       *msg;
+        lib_msg_t       *msg = lib_msg_alloc(nal);
         unsigned long    flags;
 
         state_lock(nal, &flags);
 
         LASSERT (getmd->pending > 0);
 
+        if (msg == NULL) {
+                CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n",
+                        peer_nid);
+                goto drop;
+        }
+
         if (getmd->threshold == 0) {
                 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
                         peer_nid, getmd);
-                goto drop;
+                goto drop_msg;
         }
 
         LASSERT (getmd->offset == 0);
 
         CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
 
-        msg = get_new_msg (nal, getmd);
-        if (msg == NULL) {
-                CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n", 
-                       peer_nid, getmd);
-                goto drop;
-        }
+        lib_commit_md (nal, getmd, msg);
 
-        if (getmd->eq) {
-                msg->ev.type = PTL_EVENT_REPLY;
-                msg->ev.initiator.nid = peer_nid;
-                msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
-                msg->ev.rlength = msg->ev.mlength = getmd->length;
-                msg->ev.offset = 0;
+        msg->ev.type = PTL_EVENT_REPLY;
+        msg->ev.initiator.nid = peer_nid;
+        msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
+        msg->ev.rlength = msg->ev.mlength = getmd->length;
+        msg->ev.offset = 0;
 
-                lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
-        }
+        lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
 
         ni->counters.recv_count++;
         ni->counters.recv_length += getmd->length;
@@ -1228,7 +1239,9 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
         state_unlock(nal, &flags);
 
         return msg;
-        
+
+ drop_msg:
+        lib_msg_free(nal, msg);
  drop:
         nal->ni.counters.drop_count++;
         nal->ni.counters.drop_length += getmd->length;
@@ -1238,7 +1251,8 @@ lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
         return NULL;
 }
 
-int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
+int 
+do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
 {
         /*
          * Incoming:
@@ -1252,15 +1266,15 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
          * Outgoing:
          */
 
-        PtlGet_in *args = v_args;
-        PtlGet_out *ret = v_ret;
-        ptl_hdr_t hdr;
-        lib_msg_t *msg = NULL;
-        lib_ni_t *ni = &nal->ni;
+        PtlGet_in        *args = v_args;
         ptl_process_id_t *id = &args->target_in;
-        lib_md_t *md;
-        unsigned long flags;
-        int           rc;
+        PtlGet_out       *ret = v_ret;
+        lib_ni_t         *ni = &nal->ni;
+        lib_msg_t        *msg;
+        ptl_hdr_t         hdr;
+        lib_md_t         *md;
+        unsigned long     flags;
+        int               rc;
         
         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
             fail_peer (nal, id->nid, 1))           /* shall we now? */
@@ -1269,16 +1283,24 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
                        nal->ni.nid, id->nid);
                 return (ret->rc = PTL_INV_PROC);
         }
-        
+
+        msg = lib_msg_alloc(nal);
+        if (msg == NULL) {
+                CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
+                       ni->nid, id->nid);
+                return (ret->rc = PTL_NOSPACE);
+        }
+
         state_lock(nal, &flags);
+
         md = ptl_handle2md(&args->md_in, nal);
         if (md == NULL || !md->threshold) {
+                lib_msg_free(nal, msg);
                 state_unlock(nal, &flags);
+
                 return ret->rc = PTL_INV_MD;
         }
 
-        LASSERT (md->offset == 0);
-
         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
                (unsigned long)id->pid);
 
@@ -1299,51 +1321,33 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
         hdr.msg.get.sink_length = HTON__u32 (md->length);
 
-        ni->counters.send_count++;
+        lib_commit_md(nal, md, msg);
 
-        msg = get_new_msg (nal, md);
-        if (msg == NULL) {
-                CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
-                state_unlock(nal, &flags);
-                return ret->rc = PTL_NOSPACE;
-        }
+        msg->ev.type = PTL_EVENT_SENT;
+        msg->ev.initiator.nid = ni->nid;
+        msg->ev.initiator.pid = ni->pid;
+        msg->ev.portal = args->portal_in;
+        msg->ev.match_bits = args->match_bits_in;
+        msg->ev.rlength = md->length;
+        msg->ev.mlength = md->length;
+        msg->ev.offset = args->offset_in;
+        msg->ev.hdr_data = 0;
 
-        /*
-         * If this memory descriptor has an event queue associated with
-         * it we must allocate a message state object that will record
-         * the information to be filled in once the message has been
-         * completed.  More information is in the do_PtlPut() comments.
-         *
-         * NB. We're now committed to the GET, since we just marked the MD
-         * busy.  Callers who observe this (by getting PTL_MD_INUSE from
-         * PtlMDUnlink()) expect a completion event to tell them when the
-         * MD becomes idle. 
-         */
-        if (md->eq) {
-                msg->ev.type = PTL_EVENT_SENT;
-                msg->ev.initiator.nid = ni->nid;
-                msg->ev.initiator.pid = ni->pid;
-                msg->ev.portal = args->portal_in;
-                msg->ev.match_bits = args->match_bits_in;
-                msg->ev.rlength = md->length;
-                msg->ev.mlength = md->length;
-                msg->ev.offset = args->offset_in;
-                msg->ev.hdr_data = 0;
-
-                lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-        }
+        lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
+
+        ni->counters.send_count++;
 
         state_unlock(nal, &flags);
 
         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
                        id->nid, id->pid, NULL, 0, 0);
         if (rc != PTL_OK) {
-                /* get_new_msg() committed us to sending by decrementing
-                 * md->threshold, so we have to act like we did send, but
-                 * the network dropped it. */
-                lib_finalize (nal, private, msg);
+                CERROR(LPU64": error sending GET to "LPU64": %d\n",
+                       ni->nid, id->nid, rc);
+                lib_finalize (nal, private, msg, rc);
         }
         
+        /* completion will be signalled by an event */
         return ret->rc = PTL_OK;
 }
 
index 9840ff5..04c69b1 100644 (file)
 
 #include <portals/lib-p30.h>
 
-int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
+void
+lib_enq_event_locked (nal_cb_t *nal, void *private, 
+                      lib_eq_t *eq, ptl_event_t *ev)
 {
-        lib_md_t     *md;
-        lib_eq_t     *eq;
+        ptl_event_t  *eq_slot;
         int           rc;
+        
+        ev->sequence = eq->sequence++; /* Allocate the next queue slot */
+
+        /* size must be a power of 2 to handle a wrapped sequence # */
+        LASSERT (eq->size != 0 &&
+                 eq->size == LOWEST_BIT_SET (eq->size));
+        eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+
+        /* Copy the event into the allocated slot, ensuring all the rest of
+         * the event's contents have been copied _before_ the sequence
+         * number gets updated.  A processes 'getting' an event waits on
+         * the next queue slot's sequence to be 'new'.  When it is, _all_
+         * other event fields had better be consistent.  I assert
+         * 'sequence' is the last member, so I only need a 2 stage copy. */
+
+        LASSERT(sizeof (ptl_event_t) ==
+                offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+
+        rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
+                            offsetof (ptl_event_t, sequence));
+        LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+        barrier();
+#endif
+        /* Updating the sequence number is what makes the event 'new' NB if
+         * the cb_write below isn't atomic, this could cause a race with
+         * PtlEQGet */
+        rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
+                           (void *)&ev->sequence,sizeof (ev->sequence));
+        LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+        barrier();
+#endif
+
+        if (nal->cb_callback != NULL)
+                nal->cb_callback(nal, private, eq, ev);
+        else if (eq->event_callback != NULL)
+                eq->event_callback(ev);
+}
+
+void 
+lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+{
+        lib_md_t     *md;
+        int           unlink;
         unsigned long flags;
+        int           rc;
+        ptl_hdr_t     ack;
 
         /* ni went down while processing this message */
-        if (nal->ni.up == 0) {
-                return -1;
-        }
+        if (nal->ni.up == 0)
+                return;
 
         if (msg == NULL)
-                return 0;
+                return;
 
-        rc = 0;
-        if (msg->send_ack) {
-                ptl_hdr_t ack;
+        /* Only send an ACK if the PUT completed successfully */
+        if (status == PTL_OK &&
+            !ptl_is_wire_handle_none(&msg->ack_wmd)) {
 
-                LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
+                LASSERT(msg->ev.type == PTL_EVENT_PUT);
 
                 memset (&ack, 0, sizeof (ack));
                 ack.type     = HTON__u32 (PTL_MSG_ACK);
-                ack.dest_nid = HTON__u64 (msg->nid);
+                ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
                 ack.src_nid  = HTON__u64 (nal->ni.nid);
-                ack.dest_pid = HTON__u32 (msg->pid);
+                ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
                 ack.src_pid  = HTON__u32 (nal->ni.pid);
                 ack.payload_length = 0;
 
@@ -66,92 +115,35 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
                 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
 
                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
-                               msg->nid, msg->pid, NULL, 0, 0);
-                /* If this send fails, there's nothing else to clean up */
+                               msg->ev.initiator.nid, msg->ev.initiator.pid, 
+                               NULL, 0, 0);
+                if (rc != PTL_OK) {
+                        /* send failed: there's nothing else to clean up. */
+                        CERROR("Error %d sending ACK to "LPX64"\n", 
+                               rc, msg->ev.initiator.nid);
+                }
         }
 
         md = msg->md;
-        LASSERT (md->pending > 0);  /* I've not dropped my ref yet */
-        eq = md->eq;
 
         state_lock(nal, &flags);
 
-        if (eq != NULL) {
-                ptl_event_t  *ev = &msg->ev;
-                ptl_event_t  *eq_slot;
-
-                /* I have to hold the lock while I bump the sequence number
-                 * and copy the event into the queue.  If not, and I was
-                 * interrupted after bumping the sequence number, other
-                 * events could fill the queue, including the slot I just
-                 * allocated to this event.  On resuming, I would overwrite
-                 * a more 'recent' event with old event state, and
-                 * processes taking events off the queue would not detect
-                 * overflow correctly.
-                 */
-
-                ev->sequence = eq->sequence++;/* Allocate the next queue slot */
-
-                /* size must be a power of 2 to handle a wrapped sequence # */
-                LASSERT (eq->size != 0 &&
-                         eq->size == LOWEST_BIT_SET (eq->size));
-                eq_slot = eq->base + (ev->sequence & (eq->size - 1));
-
-                /* Invalidate unlinked_me unless this is the last
-                 * event for an auto-unlinked MD.  Note that if md was
-                 * auto-unlinked, md->pending can only decrease
-                 */
-                if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
-                    md->pending != 1)                       /* not last ref */
-                        ev->unlinked_me = PTL_HANDLE_NONE;
-
-                /* Copy the event into the allocated slot, ensuring all the
-                 * rest of the event's contents have been copied _before_
-                 * the sequence number gets updated.  A processes 'getting'
-                 * an event waits on the next queue slot's sequence to be
-                 * 'new'.  When it is, _all_ other event fields had better
-                 * be consistent.  I assert 'sequence' is the last member,
-                 * so I only need a 2 stage copy.
-                 */
-                LASSERT(sizeof (ptl_event_t) ==
-                        offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
-
-                rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
-                                    offsetof (ptl_event_t, sequence));
-                LASSERT (rc == 0);
-
-#ifdef __KERNEL__
-                barrier();
-#endif
-                /* Updating the sequence number is what makes the event 'new' */
-
-                /* cb_write is not necessarily atomic, so this could
-                   cause a race with PtlEQGet */
-                rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
-                                   (void *)&ev->sequence,sizeof (ev->sequence));
-                LASSERT (rc == 0);
+        /* Now it's safe to drop my caller's ref */
+        md->pending--;
+        LASSERT (md->pending >= 0);
 
-#ifdef __KERNEL__
-                barrier();
-#endif
+        /* Should I unlink this MD? */
+        unlink = (md->pending == 0 &&           /* No other refs */
+                  (md->threshold == 0 ||        /* All ops done */
+                   md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */
 
-                /* I must also ensure that (a) callbacks are made in the
-                 * same order as the events land in the queue, and (b) the
-                 * callback occurs before the event can be removed from the
-                 * queue, so I can't drop the lock during the callback. */
-                if (nal->cb_callback != NULL)
-                        nal->cb_callback(nal, private, eq, ev);
-                else  if (eq->event_callback != NULL)
-                        (void)((eq->event_callback) (ev));
-        }
+        msg->ev.status = status;
+        msg->ev.unlinked = unlink;
 
-        LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
-                 (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
+        if (md->eq != NULL)
+                lib_enq_event_locked(nal, private, md->eq, &msg->ev);
 
-        md->pending--;
-        if (md->pending == 0 && /* no more outstanding operations on this md */
-            (md->threshold == 0 ||              /* done its business */
-             (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
+        if (unlink)
                 lib_md_unlink(nal, md);
 
         list_del (&msg->msg_list);
@@ -159,6 +151,4 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
         lib_msg_free(nal, msg);
 
         state_unlock(nal, &flags);
-
-        return rc;
 }
index dc427b0..6035ca1 100644 (file)
@@ -1,5 +1,9 @@
 CPPFLAGS=
 INCLUDES=-I$(top_srcdir)/portals/include -I$(top_srcdir)/include -I$(srcdir)
-lib_LIBRARIES = libtcpnal.a
+noinst_LIBRARIES = libtcpnal.a
 pkginclude_HEADERS =  pqtimer.h dispatch.h table.h timer.h connection.h ipmap.h bridge.h procbridge.h
 libtcpnal_a_SOURCES = debug.c pqtimer.c select.c table.c pqtimer.h dispatch.h table.h timer.h address.c procapi.c proclib.c connection.c tcpnal.c connection.h
+
+if LIBLUSTRE
+libtcpnal_a_CFLAGS = -fPIC
+endif
index 0b4940f..9a90ab8 100644 (file)
@@ -6,6 +6,9 @@
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  */
 
+#ifndef TCPNAL_PROCBRIDGE_H
+#define TCPNAL_PROCBRIDGE_H
+
 #include <portals/lib-p30.h>
 
 typedef struct bridge {
@@ -27,3 +30,5 @@ nal_t *bridge_init(ptl_interface_t nal,
 
 typedef int (*nal_initialize)(bridge);
 extern nal_initialize nal_table[PTL_IFACE_MAX];
+
+#endif
index 29e75be..ca6999a 100644 (file)
@@ -309,7 +309,8 @@ tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
  */
 connection force_tcp_connection(manager m,
                                 unsigned int ip,
-                                unsigned short port)
+                                unsigned short port,
+                                procbridge pb)
 {
     connection conn;
     struct sockaddr_in addr;
@@ -357,6 +358,10 @@ connection force_tcp_connection(manager m,
             exit(-1);
 
         conn = allocate_connection(m, ip, port, fd);
+
+        /* let nal thread know this event right away */
+        if (conn)
+                procbridge_wakeup_nal(pb);
     }
 
     pthread_mutex_unlock(&m->conn_lock);
index fb1eaab..343ffa6 100644 (file)
@@ -7,6 +7,7 @@
  */
 
 #include <table.h>
+#include <procbridge.h>
 
 typedef struct manager {
     table connections;
@@ -26,7 +27,8 @@ typedef struct connection {
     manager m;
 } *connection;
 
-connection force_tcp_connection(manager m, unsigned int ip, unsigned int short);
+connection force_tcp_connection(manager m, unsigned int ip, unsigned int short,
+                                procbridge pb);
 manager init_connections(unsigned short, int (*f)(void *, void *), void *);
 void remove_connection(void *arg);
 void shutdown_connections(manager m);
index 2a3fbd8..bddfe9a 100644 (file)
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
+#include <sys/socket.h>
 #include <procbridge.h>
 #include <pqtimer.h>
 #include <dispatch.h>
 #include <errno.h>
 
 
+/* XXX CFS workaround, to give a chance to let nal thread wake up
+ * from waiting in select
+ */
+static int procbridge_notifier_handler(void *arg)
+{
+    static char buf[8];
+    procbridge p = (procbridge) arg;
+
+    syscall(SYS_read, p->notifier[1], buf, sizeof(buf));
+    return 1;
+}
+
+void procbridge_wakeup_nal(procbridge p)
+{
+    static char buf[8];
+    syscall(SYS_write, p->notifier[0], buf, sizeof(buf));
+}
+
 /* Function: forward
  * Arguments: nal_t *nal: pointer to my top-side nal structure
  *            id: the command to pass to the lower layer
@@ -79,6 +101,7 @@ static int procbridge_shutdown(nal_t *n, int ni)
     procbridge p=(procbridge)b->local;
 
     p->nal_flags |= NAL_FLAG_STOPPING;
+    procbridge_wakeup_nal(p);
 
     do {
         pthread_mutex_lock(&p->mutex);
@@ -104,6 +127,12 @@ static int procbridge_validate(nal_t *nal, void *base, size_t extent)
 }
 
 
+/* FIXME cfs temporary workaround! FIXME
+ * global time out value
+ */
+int __tcpnal_eqwait_timeout_value = 0;
+int __tcpnal_eqwait_timedout = 0;
+
 /* Function: yield
  * Arguments:  pid:
  *
@@ -118,7 +147,19 @@ static void procbridge_yield(nal_t *n)
     procbridge p=(procbridge)b->local;
 
     pthread_mutex_lock(&p->mutex);
-    pthread_cond_wait(&p->cond,&p->mutex);
+    if (!__tcpnal_eqwait_timeout_value) {
+        pthread_cond_wait(&p->cond,&p->mutex);
+    } else {
+        struct timeval now;
+        struct timespec timeout;
+
+        gettimeofday(&now, NULL);
+        timeout.tv_sec = now.tv_sec + __tcpnal_eqwait_timeout_value;
+        timeout.tv_nsec = now.tv_usec * 1000;
+
+        __tcpnal_eqwait_timedout =
+                pthread_cond_timedwait(&p->cond, &p->mutex, &timeout);
+    }
     pthread_mutex_unlock(&p->mutex);
 }
 
@@ -194,6 +235,19 @@ nal_t *procbridge_interface(int num_interface,
     p->nal_flags = 0;
     pthread_mutex_init(&p->nal_cb_lock, 0);
 
+    /* initialize notifier */
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, p->notifier)) {
+        perror("socketpair failed");
+        return NULL;
+    }
+
+    if (!register_io_handler(p->notifier[1], READ_HANDLER,
+                procbridge_notifier_handler, p)) {
+        perror("fail to register notifier handler");
+        return NULL;
+    }
+
+    /* create nal thread */
     if (pthread_create(&p->t, NULL, nal_thread, &args)) {
         perror("nal_init: pthread_create");
         return(NULL);
index 317e22f..965f83d 100644 (file)
@@ -25,6 +25,9 @@ typedef struct procbridge {
     pthread_cond_t cond;
     pthread_mutex_t mutex;
 
+    /* socket pair used to notify nal thread */
+    int notifier[2];
+
     int nal_flags;
 
     pthread_mutex_t nal_cb_lock;
@@ -51,5 +54,6 @@ extern nal_t *procbridge_interface(int num_interface,
                                    ptl_pt_index_t ptl_size,
                                    ptl_ac_index_t acl_size,
                                    ptl_pid_t requested_pid);
+extern void procbridge_wakeup_nal(procbridge p);
 
 #endif
index 2627253..2a5ba0d 100644 (file)
 /* the following functions are stubs to satisfy the nal definition
    without doing anything particularily useful*/
 
-static int nal_write(nal_cb_t *nal,
-                     void *private,
-                     user_ptr dst_addr,
-                     void *src_addr,
-                     size_t len)
+static ptl_err_t nal_write(nal_cb_t *nal,
+                           void *private,
+                           user_ptr dst_addr,
+                           void *src_addr,
+                           size_t len)
 {
     memcpy(dst_addr, src_addr, len);
-    return 0;
+    return PTL_OK;
 }
 
-static int nal_read(nal_cb_t * nal,
-                    void *private,
-                   void *dst_addr,
-                   user_ptr src_addr,
-                   size_t len)
+static ptl_err_t nal_read(nal_cb_t * nal,
+                          void *private,
+                          void *dst_addr,
+                          user_ptr src_addr,
+                          size_t len)
 {
        memcpy(dst_addr, src_addr, len);
-       return 0;
+       return PTL_OK;
 }
 
 static void *nal_malloc(nal_cb_t *nal,
index fe24efc..c4ccae1 100644 (file)
@@ -126,15 +126,6 @@ void select_timer_block(when until)
         timeout_pointer=&timeout;
     } else timeout_pointer=0;
 
-
-    /* FIXME
-     * temporarily add timer for endless waiting problem.
-     * FIXME
-     */
-    timeout.tv_sec = 1;
-    timeout.tv_usec = 0;
-    timeout_pointer=&timeout;
-
     FD_ZERO(&fds[0]);
     FD_ZERO(&fds[1]);
     FD_ZERO(&fds[2]);
index dc427b0..6035ca1 100644 (file)
@@ -1,5 +1,9 @@
 CPPFLAGS=
 INCLUDES=-I$(top_srcdir)/portals/include -I$(top_srcdir)/include -I$(srcdir)
-lib_LIBRARIES = libtcpnal.a
+noinst_LIBRARIES = libtcpnal.a
 pkginclude_HEADERS =  pqtimer.h dispatch.h table.h timer.h connection.h ipmap.h bridge.h procbridge.h
 libtcpnal_a_SOURCES = debug.c pqtimer.c select.c table.c pqtimer.h dispatch.h table.h timer.h address.c procapi.c proclib.c connection.c tcpnal.c connection.h
+
+if LIBLUSTRE
+libtcpnal_a_CFLAGS = -fPIC
+endif
index 0b4940f..9a90ab8 100644 (file)
@@ -6,6 +6,9 @@
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  */
 
+#ifndef TCPNAL_PROCBRIDGE_H
+#define TCPNAL_PROCBRIDGE_H
+
 #include <portals/lib-p30.h>
 
 typedef struct bridge {
@@ -27,3 +30,5 @@ nal_t *bridge_init(ptl_interface_t nal,
 
 typedef int (*nal_initialize)(bridge);
 extern nal_initialize nal_table[PTL_IFACE_MAX];
+
+#endif
index 29e75be..ca6999a 100644 (file)
@@ -309,7 +309,8 @@ tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
  */
 connection force_tcp_connection(manager m,
                                 unsigned int ip,
-                                unsigned short port)
+                                unsigned short port,
+                                procbridge pb)
 {
     connection conn;
     struct sockaddr_in addr;
@@ -357,6 +358,10 @@ connection force_tcp_connection(manager m,
             exit(-1);
 
         conn = allocate_connection(m, ip, port, fd);
+
+        /* let nal thread know this event right away */
+        if (conn)
+                procbridge_wakeup_nal(pb);
     }
 
     pthread_mutex_unlock(&m->conn_lock);
index fb1eaab..343ffa6 100644 (file)
@@ -7,6 +7,7 @@
  */
 
 #include <table.h>
+#include <procbridge.h>
 
 typedef struct manager {
     table connections;
@@ -26,7 +27,8 @@ typedef struct connection {
     manager m;
 } *connection;
 
-connection force_tcp_connection(manager m, unsigned int ip, unsigned int short);
+connection force_tcp_connection(manager m, unsigned int ip, unsigned int short,
+                                procbridge pb);
 manager init_connections(unsigned short, int (*f)(void *, void *), void *);
 void remove_connection(void *arg);
 void shutdown_connections(manager m);
index 2a3fbd8..bddfe9a 100644 (file)
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
+#include <sys/socket.h>
 #include <procbridge.h>
 #include <pqtimer.h>
 #include <dispatch.h>
 #include <errno.h>
 
 
+/* XXX CFS workaround, to give a chance to let nal thread wake up
+ * from waiting in select
+ */
+static int procbridge_notifier_handler(void *arg)
+{
+    static char buf[8];
+    procbridge p = (procbridge) arg;
+
+    syscall(SYS_read, p->notifier[1], buf, sizeof(buf));
+    return 1;
+}
+
+void procbridge_wakeup_nal(procbridge p)
+{
+    static char buf[8];
+    syscall(SYS_write, p->notifier[0], buf, sizeof(buf));
+}
+
 /* Function: forward
  * Arguments: nal_t *nal: pointer to my top-side nal structure
  *            id: the command to pass to the lower layer
@@ -79,6 +101,7 @@ static int procbridge_shutdown(nal_t *n, int ni)
     procbridge p=(procbridge)b->local;
 
     p->nal_flags |= NAL_FLAG_STOPPING;
+    procbridge_wakeup_nal(p);
 
     do {
         pthread_mutex_lock(&p->mutex);
@@ -104,6 +127,12 @@ static int procbridge_validate(nal_t *nal, void *base, size_t extent)
 }
 
 
+/* FIXME cfs temporary workaround! FIXME
+ * global time out value
+ */
+int __tcpnal_eqwait_timeout_value = 0;
+int __tcpnal_eqwait_timedout = 0;
+
 /* Function: yield
  * Arguments:  pid:
  *
@@ -118,7 +147,19 @@ static void procbridge_yield(nal_t *n)
     procbridge p=(procbridge)b->local;
 
     pthread_mutex_lock(&p->mutex);
-    pthread_cond_wait(&p->cond,&p->mutex);
+    if (!__tcpnal_eqwait_timeout_value) {
+        pthread_cond_wait(&p->cond,&p->mutex);
+    } else {
+        struct timeval now;
+        struct timespec timeout;
+
+        gettimeofday(&now, NULL);
+        timeout.tv_sec = now.tv_sec + __tcpnal_eqwait_timeout_value;
+        timeout.tv_nsec = now.tv_usec * 1000;
+
+        __tcpnal_eqwait_timedout =
+                pthread_cond_timedwait(&p->cond, &p->mutex, &timeout);
+    }
     pthread_mutex_unlock(&p->mutex);
 }
 
@@ -194,6 +235,19 @@ nal_t *procbridge_interface(int num_interface,
     p->nal_flags = 0;
     pthread_mutex_init(&p->nal_cb_lock, 0);
 
+    /* initialize notifier */
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, p->notifier)) {
+        perror("socketpair failed");
+        return NULL;
+    }
+
+    if (!register_io_handler(p->notifier[1], READ_HANDLER,
+                procbridge_notifier_handler, p)) {
+        perror("fail to register notifier handler");
+        return NULL;
+    }
+
+    /* create nal thread */
     if (pthread_create(&p->t, NULL, nal_thread, &args)) {
         perror("nal_init: pthread_create");
         return(NULL);
index 317e22f..965f83d 100644 (file)
@@ -25,6 +25,9 @@ typedef struct procbridge {
     pthread_cond_t cond;
     pthread_mutex_t mutex;
 
+    /* socket pair used to notify nal thread */
+    int notifier[2];
+
     int nal_flags;
 
     pthread_mutex_t nal_cb_lock;
@@ -51,5 +54,6 @@ extern nal_t *procbridge_interface(int num_interface,
                                    ptl_pt_index_t ptl_size,
                                    ptl_ac_index_t acl_size,
                                    ptl_pid_t requested_pid);
+extern void procbridge_wakeup_nal(procbridge p);
 
 #endif
index 2627253..2a5ba0d 100644 (file)
 /* the following functions are stubs to satisfy the nal definition
    without doing anything particularily useful*/
 
-static int nal_write(nal_cb_t *nal,
-                     void *private,
-                     user_ptr dst_addr,
-                     void *src_addr,
-                     size_t len)
+static ptl_err_t nal_write(nal_cb_t *nal,
+                           void *private,
+                           user_ptr dst_addr,
+                           void *src_addr,
+                           size_t len)
 {
     memcpy(dst_addr, src_addr, len);
-    return 0;
+    return PTL_OK;
 }
 
-static int nal_read(nal_cb_t * nal,
-                    void *private,
-                   void *dst_addr,
-                   user_ptr src_addr,
-                   size_t len)
+static ptl_err_t nal_read(nal_cb_t * nal,
+                          void *private,
+                          void *dst_addr,
+                          user_ptr src_addr,
+                          size_t len)
 {
        memcpy(dst_addr, src_addr, len);
-       return 0;
+       return PTL_OK;
 }
 
 static void *nal_malloc(nal_cb_t *nal,
index fe24efc..c4ccae1 100644 (file)
@@ -126,15 +126,6 @@ void select_timer_block(when until)
         timeout_pointer=&timeout;
     } else timeout_pointer=0;
 
-
-    /* FIXME
-     * temporarily add timer for endless waiting problem.
-     * FIXME
-     */
-    timeout.tv_sec = 1;
-    timeout.tv_usec = 0;
-    timeout_pointer=&timeout;
-
     FD_ZERO(&fds[0]);
     FD_ZERO(&fds[1]);
     FD_ZERO(&fds[2]);
index 1041d1d..0c47f42 100644 (file)
  *
  * sends a packet to the peer, after insuring that a connection exists
  */
-int tcpnal_send(nal_cb_t *n,
-               void *private,
-               lib_msg_t *cookie,
-               ptl_hdr_t *hdr,
-               int type,
-               ptl_nid_t nid,
-               ptl_pid_t pid,
-                unsigned int niov,
-                struct iovec *iov,
-               size_t len)
+ptl_err_t tcpnal_send(nal_cb_t *n,
+                      void *private,
+                      lib_msg_t *cookie,
+                      ptl_hdr_t *hdr,
+                      int type,
+                      ptl_nid_t nid,
+                      ptl_pid_t pid,
+                      unsigned int niov,
+                      struct iovec *iov,
+                      size_t offset,
+                      size_t len)
 {
     connection c;
     bridge b=(bridge)n->nal_data;
     struct iovec tiov[257];
     static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
-    int   rc;
+    ptl_err_t rc = PTL_OK;
+    int   sysrc;
     int   total;
+    int   ntiov;
     int i;
 
     if (!(c=force_tcp_connection((manager)b->lower,
                                  PNAL_IP(nid,b),
-                                 PNAL_PORT(nid,pid)))) 
-        return(1);
+                                 PNAL_PORT(nid,pid),
+                                 b->local)))
+        return(PTL_FAIL);
 
-#if 0
     /* TODO: these results should be checked. furthermore, provision
        must be made for the SIGPIPE which is delivered when
        writing on a tcp socket which has closed underneath
        the application. there is a linux flag in the sendmsg
        call which turns off the signally behaviour, but its
        nonstandard */
-    syscall(SYS_write, c->fd,hdr,sizeof(ptl_hdr_t));
-    LASSERT (niov <= 1);
-    if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len);
-#else
+
     LASSERT (niov <= 256);
 
     tiov[0].iov_base = hdr;
     tiov[0].iov_len = sizeof(ptl_hdr_t);
+    ntiov = 1 + lib_extract_iov(256, &tiov[1], niov, iov, offset, len);
 
-    if (niov > 0)
-            memcpy(&tiov[1], iov, niov * sizeof(struct iovec));
     pthread_mutex_lock(&send_lock);
 #if 1
-    for (i = total = 0; i <= niov; i++)
+    for (i = total = 0; i < ntiov; i++)
             total += tiov[i].iov_len;
     
-    rc = syscall(SYS_writev, c->fd, tiov, niov+1);
-    if (rc != total) {
+    sysrc = syscall(SYS_writev, c->fd, tiov, ntiov);
+    if (sysrc != total) {
             fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
                      rc, total, errno);
-            abort();
+            rc = PTL_FAIL;
     }
 #else
-    for (i = total = 0; i <= niov; i++) {
+    for (i = total = 0; i <= ntiov; i++) {
             rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
             
             if (rc != tiov[i].iov_len) {
                     fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
                              rc, tiov[i].iov_len, errno);
-                    abort();
+                    rc = PTL_FAIL;
+                    break;
             }
-            total != rc;
+            total += rc;
     }
 #endif
 #if 0
@@ -130,10 +130,14 @@ int tcpnal_send(nal_cb_t *n,
              total, niov + 1);
 #endif
     pthread_mutex_unlock(&send_lock);
-#endif
-    lib_finalize(n, private, cookie);
-        
-    return(0);
+
+    if (rc == PTL_OK) {
+            /* NB the NAL only calls lib_finalize() if it returns PTL_OK
+             * from cb_send() */
+            lib_finalize(n, private, cookie, PTL_OK);
+    }
+
+    return(rc);
 }
 
 
@@ -150,15 +154,18 @@ int tcpnal_send(nal_cb_t *n,
  * blocking read of the requested data. must drain out the
  * difference of mainpulated and requested lengths from the network
  */
-int tcpnal_recv(nal_cb_t *n,
-               void *private,
-               lib_msg_t *cookie,
-                unsigned int niov,
-                struct iovec *iov,
-               size_t mlen,
-               size_t rlen)
+ptl_err_t tcpnal_recv(nal_cb_t *n,
+                      void *private,
+                      lib_msg_t *cookie,
+                      unsigned int niov,
+                      struct iovec *iov,
+                      size_t offset,
+                      size_t mlen,
+                      size_t rlen)
 
 {
+    struct iovec tiov[256];
+    int ntiov;
     int i;
 
     if (!niov)
@@ -168,16 +175,19 @@ int tcpnal_recv(nal_cb_t *n,
     LASSERT(rlen);
     LASSERT(rlen >= mlen);
 
+    ntiov = lib_extract_iov(256, tiov, niov, iov, offset, mlen);
+    
     /* FIXME
      * 1. Is this effecient enough? change to use readv() directly?
      * 2. need check return from read_connection()
      * - MeiJia
      */
-    for (i = 0; i < niov; i++)
-        read_connection(private, iov[i].iov_base, iov[i].iov_len);
+    for (i = 0; i < ntiov; i++)
+        read_connection(private, tiov[i].iov_base, tiov[i].iov_len);
 
 finalize:
-    lib_finalize(n, private, cookie);
+    /* FIXME; we always assume success here... */
+    lib_finalize(n, private, cookie, PTL_OK);
 
     if (mlen!=rlen){
         char *trash=malloc(rlen-mlen);
@@ -187,7 +197,7 @@ finalize:
         free(trash);
     }
 
-    return(rlen);
+    return(PTL_OK);
 }
 
 
index 1041d1d..0c47f42 100644 (file)
  *
  * sends a packet to the peer, after insuring that a connection exists
  */
-int tcpnal_send(nal_cb_t *n,
-               void *private,
-               lib_msg_t *cookie,
-               ptl_hdr_t *hdr,
-               int type,
-               ptl_nid_t nid,
-               ptl_pid_t pid,
-                unsigned int niov,
-                struct iovec *iov,
-               size_t len)
+ptl_err_t tcpnal_send(nal_cb_t *n,
+                      void *private,
+                      lib_msg_t *cookie,
+                      ptl_hdr_t *hdr,
+                      int type,
+                      ptl_nid_t nid,
+                      ptl_pid_t pid,
+                      unsigned int niov,
+                      struct iovec *iov,
+                      size_t offset,
+                      size_t len)
 {
     connection c;
     bridge b=(bridge)n->nal_data;
     struct iovec tiov[257];
     static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
-    int   rc;
+    ptl_err_t rc = PTL_OK;
+    int   sysrc;
     int   total;
+    int   ntiov;
     int i;
 
     if (!(c=force_tcp_connection((manager)b->lower,
                                  PNAL_IP(nid,b),
-                                 PNAL_PORT(nid,pid)))) 
-        return(1);
+                                 PNAL_PORT(nid,pid),
+                                 b->local)))
+        return(PTL_FAIL);
 
-#if 0
     /* TODO: these results should be checked. furthermore, provision
        must be made for the SIGPIPE which is delivered when
        writing on a tcp socket which has closed underneath
        the application. there is a linux flag in the sendmsg
        call which turns off the signally behaviour, but its
        nonstandard */
-    syscall(SYS_write, c->fd,hdr,sizeof(ptl_hdr_t));
-    LASSERT (niov <= 1);
-    if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len);
-#else
+
     LASSERT (niov <= 256);
 
     tiov[0].iov_base = hdr;
     tiov[0].iov_len = sizeof(ptl_hdr_t);
+    ntiov = 1 + lib_extract_iov(256, &tiov[1], niov, iov, offset, len);
 
-    if (niov > 0)
-            memcpy(&tiov[1], iov, niov * sizeof(struct iovec));
     pthread_mutex_lock(&send_lock);
 #if 1
-    for (i = total = 0; i <= niov; i++)
+    for (i = total = 0; i < ntiov; i++)
             total += tiov[i].iov_len;
     
-    rc = syscall(SYS_writev, c->fd, tiov, niov+1);
-    if (rc != total) {
+    sysrc = syscall(SYS_writev, c->fd, tiov, ntiov);
+    if (sysrc != total) {
             fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
                      rc, total, errno);
-            abort();
+            rc = PTL_FAIL;
     }
 #else
-    for (i = total = 0; i <= niov; i++) {
+    for (i = total = 0; i <= ntiov; i++) {
             rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
             
             if (rc != tiov[i].iov_len) {
                     fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
                              rc, tiov[i].iov_len, errno);
-                    abort();
+                    rc = PTL_FAIL;
+                    break;
             }
-            total != rc;
+            total += rc;
     }
 #endif
 #if 0
@@ -130,10 +130,14 @@ int tcpnal_send(nal_cb_t *n,
              total, niov + 1);
 #endif
     pthread_mutex_unlock(&send_lock);
-#endif
-    lib_finalize(n, private, cookie);
-        
-    return(0);
+
+    if (rc == PTL_OK) {
+            /* NB the NAL only calls lib_finalize() if it returns PTL_OK
+             * from cb_send() */
+            lib_finalize(n, private, cookie, PTL_OK);
+    }
+
+    return(rc);
 }
 
 
@@ -150,15 +154,18 @@ int tcpnal_send(nal_cb_t *n,
  * blocking read of the requested data. must drain out the
  * difference of mainpulated and requested lengths from the network
  */
-int tcpnal_recv(nal_cb_t *n,
-               void *private,
-               lib_msg_t *cookie,
-                unsigned int niov,
-                struct iovec *iov,
-               size_t mlen,
-               size_t rlen)
+ptl_err_t tcpnal_recv(nal_cb_t *n,
+                      void *private,
+                      lib_msg_t *cookie,
+                      unsigned int niov,
+                      struct iovec *iov,
+                      size_t offset,
+                      size_t mlen,
+                      size_t rlen)
 
 {
+    struct iovec tiov[256];
+    int ntiov;
     int i;
 
     if (!niov)
@@ -168,16 +175,19 @@ int tcpnal_recv(nal_cb_t *n,
     LASSERT(rlen);
     LASSERT(rlen >= mlen);
 
+    ntiov = lib_extract_iov(256, tiov, niov, iov, offset, mlen);
+    
     /* FIXME
      * 1. Is this effecient enough? change to use readv() directly?
      * 2. need check return from read_connection()
      * - MeiJia
      */
-    for (i = 0; i < niov; i++)
-        read_connection(private, iov[i].iov_base, iov[i].iov_len);
+    for (i = 0; i < ntiov; i++)
+        read_connection(private, tiov[i].iov_base, tiov[i].iov_len);
 
 finalize:
-    lib_finalize(n, private, cookie);
+    /* FIXME; we always assume success here... */
+    lib_finalize(n, private, cookie, PTL_OK);
 
     if (mlen!=rlen){
         char *trash=malloc(rlen-mlen);
@@ -187,7 +197,7 @@ finalize:
         free(trash);
     }
 
-    return(rlen);
+    return(PTL_OK);
 }
 
 
index f1878df..6c31b3d 100644 (file)
@@ -3,17 +3,18 @@
 # This code is issued under the GNU General Public License.
 # See the file COPYING in this distribution
 
-
 COMPILE = $(CC) -Wall -g -I$(srcdir)/../include
 LINK = $(CC) -o $@
 
 if LIBLUSTRE
-tmp=
+
+noinst_LIBRARIES = libuptlctl.a
+libuptlctl_a_SOURCES = portals.c debug.c l_ioctl.c parser.c parser.h
+libuptlctl_a_CFLAGS = -fPIC
+
 else
-tmp=gmnalnid
-endif
 
-sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck $(tmp)
+sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck gmnalnid
 lib_LIBRARIES = libptlctl.a
 
 acceptor_SOURCES = acceptor.c # -lefence
@@ -33,3 +34,4 @@ debugctl_LDADD = -L. -lptlctl -lncurses # -lefence
 debugctl_DEPENDENCIES = libptlctl.a
 
 routerstat_SOURCES = routerstat.c
+endif
index c6628ff..58a408a 100644 (file)
@@ -23,7 +23,6 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <syscall.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <portals/api-support.h>
 #include <portals/ptlctl.h>
 
+#ifndef __CYGWIN__
+ #include <syscall.h>
+#else
+ #include <windows.h>
+ #include <windef.h>
+#endif
+
+static ioc_handler_t  do_ioctl;                 /* forward ref */
+static ioc_handler_t *current_ioc_handler = &do_ioctl;
+
 struct ioc_dev {
        const char * dev_name;
        int dev_fd;
@@ -48,7 +57,16 @@ struct dump_hdr {
        int opc;
 };
 
-char * dump_filename;
+char *dump_filename;
+
+void
+set_ioc_handler (ioc_handler_t *handler)
+{
+        if (handler == NULL)
+                current_ioc_handler = do_ioctl;
+        else
+                current_ioc_handler = handler;
+}
 
 static int
 open_ioc_dev(int dev_id) 
@@ -115,7 +133,7 @@ dump(int dev_id, int opc, void *buf)
 {
        FILE *fp;
        struct dump_hdr dump_hdr;
-       struct portal_ioctl_hdr * ioc_hdr = (struct  portal_ioctl_hdr *) buf;
+        struct portal_ioctl_hdr * ioc_hdr = (struct  portal_ioctl_hdr *) buf;
        int rc;
        
        printf("dumping opc %x to %s\n", opc, dump_filename);
@@ -132,17 +150,17 @@ dump(int dev_id, int opc, void *buf)
                return -EINVAL;
        }
        
-       rc = fwrite(&dump_hdr, sizeof(dump_hdr), 1, fp);
-       if (rc == 1)
-               rc = fwrite(buf, ioc_hdr->ioc_len, 1, fp);
-       fclose(fp);
-       if (rc != 1) {
-               fprintf(stderr, "%s: %s\n", dump_filename, 
-                       strerror(errno));
-               return -EINVAL;
-       }
-       
-       return 0;
+        rc = fwrite(&dump_hdr, sizeof(dump_hdr), 1, fp);
+        if (rc == 1)
+                rc = fwrite(buf, ioc_hdr->ioc_len, 1, fp);
+        fclose(fp);
+        if (rc != 1) {
+                fprintf(stderr, "%s: %s\n", dump_filename,
+                        strerror(errno));
+                return -EINVAL;
+        }
+
+        return 0;
 }
 
 /* register a device to send ioctls to.  */
@@ -184,16 +202,17 @@ set_ioctl_dump(char * file)
                free(dump_filename);
        
        dump_filename = strdup(file);
+        if (dump_filename == NULL)
+                abort();
+
+        set_ioc_handler(&dump);
        return 0;
 }
 
 int
 l_ioctl(int dev_id, int opc, void *buf)
 {
-       if (dump_filename) 
-               return dump(dev_id, opc, buf);
-       else 
-               return do_ioctl(dev_id, opc, buf);
+        return current_ioc_handler(dev_id, opc, buf);
 }
 
 /* Read an ioctl dump file, and call the ioc_func for each ioctl buffer
@@ -207,16 +226,28 @@ l_ioctl(int dev_id, int opc, void *buf)
 int 
 parse_dump(char * dump_file, int (*ioc_func)(int dev_id, int opc, void *))
 {
-       int fd, line =0;
+       int line =0;
        struct stat st;
-       char *buf, *end;
+       char *start, *buf, *end;
+#ifndef __CYGWIN__
+        int fd;
+#else
+        HANDLE fd, hmap;
+        DWORD size;
+#endif
        
+#ifndef __CYGWIN__
        fd = syscall(SYS_open, dump_file, O_RDONLY);
+        if (fd < 0) {
+                fprintf(stderr, "couldn't open %s: %s\n", dump_file, 
+                        strerror(errno));
+                exit(1);
+        }
 
 #ifndef SYS_fstat64
-#define __SYS_fstat__ SYS_fstat
+# define __SYS_fstat__ SYS_fstat
 #else
-#define __SYS_fstat__ SYS_fstat64
+# define __SYS_fstat__ SYS_fstat64
 #endif
        if (syscall(__SYS_fstat__, fd, &st)) { 
                perror("stat fails");
@@ -228,41 +259,72 @@ parse_dump(char * dump_file, int (*ioc_func)(int dev_id, int opc, void *))
                exit(1);
        }
 
-       buf = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE , fd, 0);
-       end = buf + st.st_size;
+       start = buf = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE , fd, 0);
+       end = start + st.st_size;
        close(fd);
-       while (buf < end) {
-               struct dump_hdr *dump_hdr = (struct dump_hdr *) buf;
-               struct portal_ioctl_hdr * data;
-               char tmp[8096];
-               int rc;
-               
-               line++;
+        if (start == MAP_FAILED) {
+               fprintf(stderr, "can't create file mapping\n");
+               exit(1);
+        }
+#else
+        fd = CreateFile(dump_file, GENERIC_READ, FILE_SHARE_READ, NULL,
+                        OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
+        size = GetFileSize(fd, NULL);
+        if (size < 1) {
+               fprintf(stderr, "KML is empty\n");
+               exit(1);
+       }
 
-               data = (struct portal_ioctl_hdr *) (buf + sizeof(*dump_hdr));
-               if (buf + data->ioc_len > end ) {
-                       fprintf(stderr, "dump file overflow, %p + %d > %p\n", buf,
-                               data->ioc_len, end);
-                       return -1;
-               }
+        hmap = CreateFileMapping(fd, NULL, PAGE_READONLY, 0,0, NULL);
+        start = buf = MapViewOfFile(hmap, FILE_MAP_READ, 0, 0, 0);
+        end = buf + size;
+        CloseHandle(fd);
+        if (start == NULL) {
+               fprintf(stderr, "can't create file mapping\n");
+               exit(1);
+        }
+#endif /* __CYGWIN__ */
+
+       while (buf < end) {
+                struct dump_hdr *dump_hdr = (struct dump_hdr *) buf;
+                struct portal_ioctl_hdr * data;
+                char tmp[8096];
+                int rc;
+
+                line++;
+
+                data = (struct portal_ioctl_hdr *) (buf + sizeof(*dump_hdr));
+                if (buf + data->ioc_len > end ) {
+                        fprintf(stderr, "dump file overflow, %p + %d > %p\n", buf,
+                                data->ioc_len, end);
+                        return -1;
+                }
 #if 0
-               printf ("dump_hdr: %lx data: %lx\n",
-                       (unsigned long)dump_hdr - (unsigned long)buf, (unsigned long)data - (unsigned long)buf);
-               
-               printf("%d: opcode %x len: %d  ver: %x ", line, dump_hdr->opc,
-                      data->ioc_len, data->ioc_version);
+                printf ("dump_hdr: %lx data: %lx\n",
+                        (unsigned long)dump_hdr - (unsigned long)buf, (unsigned long)data - (unsigned long)buf);
+
+                printf("%d: opcode %x len: %d  ver: %x ", line, dump_hdr->opc,
+                       data->ioc_len, data->ioc_version);
 #endif
 
-               memcpy(tmp, data, data->ioc_len);
+                memcpy(tmp, data, data->ioc_len);
 
-               rc = ioc_func(dump_hdr->dev_id, dump_hdr->opc, tmp);
-               if (rc) {
-                       printf("failed: %d\n", rc);
-                       exit(1);
-               }
+                rc = ioc_func(dump_hdr->dev_id, dump_hdr->opc, tmp);
+                if (rc) {
+                        printf("failed: %d\n", rc);
+                        exit(1);
+                }
 
-               buf += data->ioc_len + sizeof(*dump_hdr);
+                buf += data->ioc_len + sizeof(*dump_hdr);
        }
+
+#ifndef __CYGWIN__
+        munmap(start, end - start);
+#else
+        UnmapViewOfFile(start);
+        CloseHandle(hmap);
+#endif
+
        return 0;
 }
 
index 3c7ec20..fb031ae 100644 (file)
 #include <stdarg.h>
 #include <asm/byteorder.h>
 
+#ifdef __CYGWIN__
+
+#include <netinet/in.h>
+
+#warning assuming little endian
+
+#define __cpu_to_le64(x) ((__u64)(x))
+#define __le64_to_cpu(x) ((__u64)(x))
+#define __cpu_to_le32(x) ((__u32)(x))
+#define __le32_to_cpu(x) ((__u32)(x))
+#define __cpu_to_le16(x) ((__u16)(x))
+#define __le16_to_cpu(x) ((__u16)(x))
+
+#endif /* __CYGWIN__ */
 #include <portals/api-support.h>
 #include <portals/ptlctl.h>
 #include <portals/list.h>
@@ -94,6 +109,9 @@ pcfg_ioctl(struct portals_cfg *pcfg)
                 PORTAL_IOC_INIT (data);
                 data.ioc_pbuf1   = (char*)pcfg;
                 data.ioc_plen1   = sizeof(*pcfg);
+                /* XXX liblustre hack XXX */
+                data.ioc_nal_cmd = pcfg->pcfg_command;
+                data.ioc_nid = pcfg->pcfg_nid;
 
                 rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
         }
index cbdcb10..54abc71 100644 (file)
@@ -8,13 +8,23 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - ptlrpcd can be blocked, stopping ALL progress (2477)
        - recovery for initial connections (2355)
        - fixes for mds_cleanup_orphans (1934)
+       - abort_recovery crashes MDS in b_eq (mds_unlink_orphan) (2584)
        - block all file creations until orphan recovery completes (1901)
        - client remove rq_connection from request struct (2423)
        - conf-sanity test_5, proper cleanup in umount log not availale (2640)
        - recovery timer race (2670)
        - mdc_close recovey bug (2532)
+       - ptlrpc cleanup bug (2710)
+       - mds timeout on local locks (2588)
+       - namespace lock held during RPCs (2431)
        - don't try to handle a message that hasn't been replied to (2699)
-       - don't fail assertion if in recovery during cleanup (2701)
+       - client assert failure during cleanup after abort recovery (2701)
+       - leak mdc device after failed mount (2712)
+       - ptlrpc_check_set allows timedout requests to complete (2714)
+       - wait for inflight reqs when ptlrpcd finishes (2710)
+       - make sure unregistered services are removed from the srv_list
+       - reset bulk XID's when resending them (caught by 1138 test)
+       - unregister_bulk after timeout
        - fix lconf error (2694)
        * miscellania
        - return LL_SUPER_MAGIC from statfs for the filesystem type (1972)
index 1582666..045bace 100644 (file)
@@ -12,7 +12,7 @@ DIRS24 = ptlbd
 endif
 
 if LIBLUSTRE
-SUBDIRS = portals obdclass lov ptlrpc obdecho  osc utils mdc lvfs #liblustre
+SUBDIRS = portals obdclass lov ptlrpc obdecho osc utils mdc lvfs liblustre
 else
 SUBDIRS = lvfs portals obdclass include $(DIRS24) mds utils obdfilter mdc osc ost 
 SUBDIRS+= llite obdecho lov cobd tests doc scripts conf ptlrpc
index 4107a0c..2f023db 100644 (file)
@@ -73,7 +73,7 @@ AC_OUTPUT([Makefile lvfs/Makefile portals/Makefile portals/Kernelenv \
          portals/knals/scimacnal/Makefile \
          portals/knals/ibnal/Makefile \
           portals/utils/Makefile portals/tests/Makefile portals/doc/Makefile \
-          obdecho/Makefile ptlrpc/Makefile liblustre/Makefile \
+          obdecho/Makefile ptlrpc/Makefile liblustre/Makefile liblustre/tests/Makefile \
          lov/Makefile osc/Makefile mdc/Makefile mds/Makefile ost/Makefile \
          cobd/Makefile ptlbd/Makefile conf/Makefile  tests/Makefile \
          utils/Makefile utils/Lustre/Makefile obdfilter/Makefile \
index 6c6ac1d..0b6da9f 100644 (file)
@@ -111,9 +111,9 @@ static inline void *kmalloc(int size, int prot)
 #define GFP_HIGHUSER 1
 #define GFP_ATOMIC 1
 #define GFP_NOFS 1
-#define IS_ERR(a) (((a) && abs((int)(a)) < 500) ? 1 : 0)
-#define PTR_ERR(a) ((int)(a))
-#define ERR_PTR(a) ((void*)(a))
+#define IS_ERR(a) (((a) && abs((long)(a)) < 500) ? 1 : 0)
+#define PTR_ERR(a) ((long)(a))
+#define ERR_PTR(a) ((void*)((long)(a)))
 
 #define capable(foo) 1
 #define CAP_SYS_ADMIN 1
@@ -415,6 +415,11 @@ static inline int kmem_cache_destroy(kmem_cache_t *a)
 #define PAGE_CACHE_SHIFT 12
 #define PAGE_CACHE_MASK PAGE_MASK
 
+/* XXX
+ * for this moment, liblusre will not rely OST for non-page-aligned write
+ */
+#define LIBLUSTRE_HANDLE_UNALIGNED_PAGE
+
 struct page {
         void   *addr;
         unsigned long index;
@@ -424,6 +429,9 @@ struct page {
         /* internally used by liblustre file i/o */
         int     _offset;
         int     _count;
+#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
+        int     _managed;
+#endif
 };
 
 #define kmap(page) (page)->addr
@@ -461,6 +469,7 @@ static inline void __free_pages(struct page *pg, int what)
 }
 
 #define __free_page(page) __free_pages((page), 0)
+#define free_page(page) __free_page(page)
 
 static inline struct page* __grab_cache_page(unsigned long index)
 {
@@ -706,6 +715,12 @@ static inline void del_timer(struct timer_list *l)
         free(l);
 }
 
+#define time_after(a, b)                                        \
+({                                                              \
+        printf("Error: inapproiate call time_after()\n");       \
+        1;                                                      \
+})
+
 typedef struct { volatile int counter; } atomic_t;
 
 #define atomic_read(a) ((a)->counter)
index 331e8f8..99c1785 100644 (file)
@@ -388,9 +388,6 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *);
 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *,
                                       struct lustre_handle *);
 
-void *ldlm_put_lock_into_req(struct ptlrpc_request *,
-                                struct lustre_handle *, int);
-
 static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h)
 {
         return __ldlm_handle2lock(h, 0);
index b949fe1..218807c 100644 (file)
@@ -68,19 +68,21 @@ struct obd_export {
         struct obd_uuid           exp_client_uuid;
         struct list_head          exp_obd_chain;
         struct obd_device        *exp_obd;
-        struct obd_import        *exp_imp_reverse;  /* to make rpc's backwards */
+        struct obd_import        *exp_imp_reverse; /* to make RPCs backwards */
         struct ptlrpc_connection *exp_connection;
         __u32                     exp_conn_cnt;
         struct ldlm_export_data   exp_ldlm_data;
-        struct ptlrpc_request    *exp_outstanding_reply;
+        struct list_head          exp_outstanding_replies;
         time_t                    exp_last_request_time;
         spinlock_t                exp_lock; /* protects flags int below */
-        int                       exp_failed:1;
+        /* ^ protects exp_outstanding_replies too */
         int                       exp_flags;
+        int                       exp_failed:1;
+        int                       exp_libclient:1; /* liblustre client? */
         union {
                 struct mds_export_data    eu_mds_data;
                 struct filter_export_data eu_filter_data;
-                struct ec_export_data     eu_ec_data;         
+                struct ec_export_data     eu_ec_data;
                 struct osc_export_data    eu_osc_data;
         } u;
 };
index 9428296..3fa0a61 100644 (file)
@@ -226,6 +226,7 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
 #define MSG_CONNECT_RECONNECT   0x2
 #define MSG_CONNECT_REPLAYABLE  0x4
 //#define MSG_CONNECT_PEER        0x8
+#define MSG_CONNECT_LIBCLIENT   0x10
 
 /*
  *   OST requests: OBDO & OBD request records
index bb8900e..03a011a 100644 (file)
 #include <linux/lustre_import.h>
 #include <linux/lprocfs_status.h>
 
+/* Size over which to OBD_VMALLOC() rather than OBD_ALLOC() service request
+ * buffers */
+#define SVC_BUF_VMALLOC_THRESHOLD (2*PAGE_SIZE)
+
 /* The following constants determine how much memory is devoted to
  * buffering in the lustre services.
  *
  * total memory = ?_NBUFS * ?_BUFSIZE
  *
  * ?_MAXREQSIZE         # maximum request service will receive
- * larger messages will get dropped.
+ * messages larger than ?_MAXREQSIZE are dropped.
  * request buffers are auto-unlinked when less than ?_MAXREQSIZE
  * is left in them.
  */
 
 #define LDLM_NUM_THREADS        min(smp_num_cpus * smp_num_cpus * 8, 64)
-#define LDLM_NEVENT_MAX 8192UL
-#define LDLM_NEVENTS    min_t(unsigned long, num_physpages / 64,  \
-                              LDLM_NEVENT_MAX)
 #define LDLM_NBUF_MAX   256UL
-#define LDLM_NBUFS      min(LDLM_NEVENTS / 16, LDLM_NBUF_MAX)
 #define LDLM_BUFSIZE    (8 * 1024)
 #define LDLM_MAXREQSIZE (5 * 1024)
+#define LDLM_MAXMEM      (num_physpages*(PAGE_SIZE/1024))
+#define LDLM_NBUFS       min(LDLM_MAXMEM/LDLM_BUFSIZE, LDLM_NBUF_MAX)
 
 #define MDT_MAX_THREADS 32UL
 #define MDT_NUM_THREADS max(min_t(unsigned long, num_physpages / 8192, \
                                   MDT_MAX_THREADS), 2UL)
-#define MDS_NEVENT_MAX  8192UL
-#define MDS_NEVENTS     min_t(unsigned long, num_physpages / 64, \
-                              MDS_NEVENT_MAX)
 #define MDS_NBUF_MAX    512UL
-#define MDS_NBUFS       min(MDS_NEVENTS / 16, MDS_NBUF_MAX)
 #define MDS_BUFSIZE     (8 * 1024)
 /* Assume file name length = FNAME_MAX = 256 (true for extN).
  *        path name length = PATH_MAX = 4096
  * except in the open case where there are a large number of OSTs in a LOV.
  */
 #define MDS_MAXREQSIZE  (5 * 1024)
+#define MDS_MAXMEM      (num_physpages*(PAGE_SIZE/512))
+#define MDS_NBUFS       min(MDS_MAXMEM/MDS_BUFSIZE, MDS_NBUF_MAX)
 
 #define OST_MAX_THREADS 36UL
 #define OST_NUM_THREADS max(min_t(unsigned long, num_physpages / 8192, \
                                   OST_MAX_THREADS), 2UL)
-#define OST_NEVENT_MAX  16384UL
-#define OST_NEVENTS     min_t(unsigned long, num_physpages / 16, \
-                              OST_NEVENT_MAX)
 #define OST_NBUF_MAX    5000UL
-#define OST_NBUFS       min(OST_NEVENTS / 2, OST_NBUF_MAX)
 #define OST_BUFSIZE     (8 * 1024)
 /* OST_MAXREQSIZE ~= 1640 bytes =
  * lustre_msg + obdo + 16 * obd_ioobj + 64 * niobuf_remote
  * - OST_MAXREQSIZE must be at least 1 page of cookies plus some spillover
  */
 #define OST_MAXREQSIZE  (5 * 1024)
+#define OST_MAXMEM      (num_physpages*(PAGE_SIZE/512))
+#define OST_NBUFS       min(OST_MAXMEM/OST_BUFSIZE, OST_NBUF_MAX)
 
 #define PTLBD_NUM_THREADS        4
-#define PTLBD_NEVENTS    1024
 #define PTLBD_NBUFS      20
 #define PTLBD_BUFSIZE    (32 * 1024)
 #define PTLBD_MAXREQSIZE 1024
@@ -198,21 +195,66 @@ struct ptlrpc_request_set {
 
 struct ptlrpc_bulk_desc;
 
+/*
+ * ptlrpc callback & work item stuff
+ */
+struct ptlrpc_cb_id {
+        void   (*cbid_fn)(ptl_event_t *ev);     /* specific callback fn */
+        void    *cbid_arg;                      /* additional arg */
+};
+
+#define RS_MAX_LOCKS 4
+#define RS_DEBUG     1
+
+struct ptlrpc_reply_state {
+        struct ptlrpc_cb_id   rs_cb_id;
+        struct list_head      rs_list;
+        struct list_head      rs_exp_list;
+        struct list_head      rs_obd_list;
+#if RS_DEBUG
+        struct list_head      rs_debug_list;
+#endif
+        /* updates to following flag serialised by srv_request_lock */
+        unsigned int          rs_difficult:1;   /* ACK/commit stuff */
+        unsigned int          rs_scheduled:1;   /* being handled? */
+        unsigned int          rs_scheduled_ever:1; /* any schedule attempts? */
+        unsigned int          rs_handled:1;     /* been handled yet? */
+        unsigned int          rs_on_net:1;      /* reply_out_callback pending? */
+
+        int                   rs_size;
+        __u64                 rs_transno;
+        __u64                 rs_xid;
+        struct obd_export    *rs_export;
+        struct ptlrpc_srv_ni *rs_srv_ni;
+        ptl_handle_md_t       rs_md_h;
+
+        /* locks awaiting client reply ACK */
+        int                   rs_nlocks;
+        struct lustre_handle  rs_locks[RS_MAX_LOCKS];
+        ldlm_mode_t           rs_modes[RS_MAX_LOCKS];
+        /* last member: variable sized reply message */
+        struct lustre_msg     rs_msg;
+};
+
 struct ptlrpc_request {
         int rq_type; /* one of PTL_RPC_MSG_* */
         struct list_head rq_list;
         int rq_status;
         spinlock_t rq_lock;
-        unsigned int rq_intr:1, rq_replied:1, rq_want_ack:1, rq_err:1,
+        /* client-side flags */
+        unsigned int rq_intr:1, rq_replied:1, rq_err:1,
             rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
-            rq_no_resend:1, rq_resent:1, rq_waiting:1, rq_receiving_reply:1;
+            rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1;
         int rq_phase;
-                
+        /* client-side refcount for SENT race */
         atomic_t rq_refcount;
 
         int rq_request_portal; /* XXX FIXME bug 249 */
         int rq_reply_portal; /* XXX FIXME bug 249 */
 
+        /* client-side # reply bytes actually received  */
+        int rq_nob_received;
+
         int rq_reqlen;
         struct lustre_msg *rq_reqmsg;
 
@@ -230,20 +272,25 @@ struct ptlrpc_request {
 
         int rq_import_generation;
         enum lustre_imp_state rq_send_state;
-        wait_queue_head_t rq_reply_waitq; /* XXX also _for_ack */
 
-        /* incoming reply */
-        ptl_md_t rq_reply_md;
-        ptl_handle_md_t rq_reply_md_h;
-
-        /* outgoing req/rep */
-        ptl_md_t rq_req_md;
+        /* client+server request */
+        ptl_handle_md_t      rq_req_md_h;
+        struct ptlrpc_cb_id  rq_req_cbid;
 
+        /* server-side... */
+        struct timeval                     rq_arrival_time; /* request arrival time */
+        struct ptlrpc_reply_state         *rq_reply_state; /* separated reply state */
+        struct ptlrpc_request_buffer_desc *rq_rqbd; /* incoming request buffer */
+        
+        /* client-only incoming reply */
+        ptl_handle_md_t      rq_reply_md_h;
+        wait_queue_head_t    rq_reply_waitq;
+        struct ptlrpc_cb_id  rq_reply_cbid;
+        
         struct ptlrpc_peer rq_peer; /* XXX see service.c can this be factored away? */
         struct obd_export *rq_export;
         struct obd_import *rq_import;
-        struct ptlrpc_service *rq_svc;
-
+        
         void (*rq_replay_cb)(struct ptlrpc_request *);
         void (*rq_commit_cb)(struct ptlrpc_request *);
         void  *rq_cb_data;
@@ -256,17 +303,11 @@ struct ptlrpc_request {
         struct ptlrpc_request_set *rq_set;
         void *rq_interpret_reply;               /* Async completion handler */
         union ptlrpc_async_args rq_async_args;  /* Async completion context */
-
-        /* Only used on the server side for tracking acks. */
-        struct ptlrpc_req_ack_lock {
-                struct lustre_handle lock;
-                __u32                mode;
-        } rq_ack_locks[REQ_MAX_ACK_LOCKS];
 };
 
 
 #define RQ_PHASE_NEW           0xebc0de00
-#define RQ_PHASE_RPC          0xebc0de01
+#define RQ_PHASE_RPC           0xebc0de01
 #define RQ_PHASE_BULK          0xebc0de02
 #define RQ_PHASE_INTERPRET     0xebc0de03
 #define RQ_PHASE_COMPLETE      0xebc0de04
@@ -276,20 +317,19 @@ struct ptlrpc_request {
 
 #define PTLRPC_REQUEST_COMPLETE(req) ((req)->rq_phase > RQ_PHASE_RPC)
 
-#define DEBUG_REQ_FLAGS(req)                                                   \
-        ((req->rq_phase == RQ_PHASE_NEW) ? "New" :                             \
-         (req->rq_phase == RQ_PHASE_RPC) ? "RPC" :                             \
-         (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" :                 \
-         (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" :                   \
-         (req->rq_phase == RQ_PHASE_BULK) ? "Bulk" : "?phase?"),               \
-        FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                   \
-        FLAG(req->rq_want_ack, "A"), FLAG(req->rq_err, "E"),                   \
-        FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),  \
-        FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                 \
-        FLAG(req->rq_no_resend, "N"), FLAG(req->rq_resent, "s"),               \
+#define DEBUG_REQ_FLAGS(req)                                                    \
+        ((req->rq_phase == RQ_PHASE_NEW) ? "New" :                              \
+         (req->rq_phase == RQ_PHASE_RPC) ? "Rpc" :                              \
+         (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" :                  \
+         (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" : "?phase?"),        \
+        FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                    \
+        FLAG(req->rq_err, "E"),                                                 \
+        FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),   \
+        FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                  \
+        FLAG(req->rq_no_resend, "N"),                                           \
         FLAG(req->rq_waiting, "W")
 
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s"
 
 #define DEBUG_REQ(level, req, fmt, args...)                                    \
 do {                                                                           \
@@ -312,20 +352,19 @@ CDEBUG(level, "@@@ " fmt                                                       \
 } while (0)
 
 struct ptlrpc_bulk_page {
-        struct ptlrpc_bulk_desc *bp_desc;
         struct list_head bp_link;
         int bp_buflen;
         int bp_pageoffset;                      /* offset within a page */
         struct page *bp_page;
 };
 
-#define BULK_GET_SOURCE          0
+#define BULK_GET_SOURCE   0
 #define BULK_PUT_SINK     1
 #define BULK_GET_SINK     2
 #define BULK_PUT_SOURCE   3
 
 struct ptlrpc_bulk_desc {
-        unsigned int bd_complete:1;
+        unsigned int bd_success:1;              /* completed successfully */
         unsigned int bd_network_rw:1;           /* accessible to the network */
         unsigned int bd_type:2;                 /* {put,get}{source,sink} */
         unsigned int bd_registered:1;           /* client side */
@@ -335,17 +374,17 @@ struct ptlrpc_bulk_desc {
         struct obd_import *bd_import;
         __u32 bd_portal;
         struct ptlrpc_request *bd_req;          /* associated request */
-        wait_queue_head_t bd_waitq;             /* server side only WQ */
-        struct list_head bd_page_list;
-        __u32 bd_page_count;
-        __u32 bd_last_xid;
-        
-        ptl_md_t bd_md;
-        ptl_handle_md_t bd_md_h;
-        ptl_handle_me_t bd_me_h;
+        wait_queue_head_t      bd_waitq;        /* server side only WQ */
+        int                    bd_page_count;   /* # pages (== entries in bd_iov) */
+        int                    bd_max_pages;    /* allocated size of bd_iov */
+        int                    bd_nob;          /* # bytes covered */
+        int                    bd_nob_transferred; /* # bytes GOT/PUT */
 
-        int bd_callback_count;                  /* server side callbacks */
+        __u64                  bd_last_xid;
 
+        struct ptlrpc_cb_id    bd_cbid;         /* network callback info */
+        ptl_handle_md_t        bd_md_h;         /* associated MD */
+        
 #ifdef __KERNEL__
         ptl_kiov_t bd_iov[PTL_MD_MAX_IOV];
 #else
@@ -363,9 +402,12 @@ struct ptlrpc_thread {
 struct ptlrpc_request_buffer_desc {
         struct list_head       rqbd_list;
         struct ptlrpc_srv_ni  *rqbd_srv_ni;
-        ptl_handle_me_t        rqbd_me_h;
-        atomic_t               rqbd_refcount;
+        ptl_handle_md_t        rqbd_md_h;
+        int                    rqbd_refcount;
+        int                    rqbd_eventcount;
         char                  *rqbd_buffer;
+        struct ptlrpc_cb_id    rqbd_cbid;
+        struct ptlrpc_request  rqbd_req;
 };
 
 /* event queues are per-ni, because one day we may get a hardware
@@ -376,57 +418,64 @@ struct ptlrpc_ni { /* Generic interface state */
         char                   *pni_name;
         int                     pni_number;
         ptl_handle_ni_t         pni_ni_h;
-        ptl_handle_eq_t         pni_request_out_eq_h;
-        ptl_handle_eq_t         pni_reply_in_eq_h;
-        ptl_handle_eq_t         pni_reply_out_eq_h;
-        ptl_handle_eq_t         pni_bulk_put_source_eq_h;
-        ptl_handle_eq_t         pni_bulk_put_sink_eq_h;
-        ptl_handle_eq_t         pni_bulk_get_source_eq_h;
-        ptl_handle_eq_t         pni_bulk_get_sink_eq_h;
+        ptl_handle_eq_t         pni_eq_h;
 };
 
 struct ptlrpc_srv_ni {
         /* Interface-specific service state */
         struct ptlrpc_service  *sni_service;    /* owning service */
         struct ptlrpc_ni       *sni_ni;         /* network interface */
-        ptl_handle_eq_t         sni_eq_h;       /* event queue handle */
-        struct list_head        sni_rqbds;      /* all the request buffer descriptors */
-        __u32                   sni_nrqbds;     /* # request buffers */
-        atomic_t                sni_nrqbds_receiving; /* # request buffers posted */
+        struct list_head        sni_rqbds;      /* all the request buffers */
+        struct list_head        sni_active_replies; /* all the active replies */
+        int                     sni_nrqbd_receiving; /* # posted request buffers */
 };
 
-struct ptlrpc_service {
-        time_t srv_time;
-        time_t srv_timeout;
-
-        struct list_head srv_ni_list;          /* list of interfaces */
-        __u32            srv_max_req_size;     /* biggest request to receive */
-        __u32            srv_buf_size;         /* # bytes in a request buffer */
+typedef int (*svc_handler_t)(struct ptlrpc_request *req);
 
+struct ptlrpc_service {
+        struct list_head srv_list;              /* chain thru all services */
+        int              srv_max_req_size;      /* biggest request to receive */
+        int              srv_buf_size;          /* size of individual buffers */
+        int              srv_nbufs;             /* total # req buffer descs allocated */
+        int              srv_nthreads;          /* # running threads */
+        int              srv_n_difficult_replies; /* # 'difficult' replies */
+        int              srv_n_active_reqs;     /* # reqs being served */
+        
         __u32 srv_req_portal;
         __u32 srv_rep_portal;
 
-        __u32 srv_xid;
+        int               srv_n_queued_reqs;    /* # reqs waiting to be served */
+        struct list_head  srv_request_queue;    /* reqs waiting for service */
+
+        atomic_t          srv_outstanding_replies;
+        struct list_head  srv_reply_queue;      /* replies waiting for service */
 
         wait_queue_head_t srv_waitq; /* all threads sleep on this */
 
-        spinlock_t srv_lock;
-        struct list_head srv_threads;
-        int (*srv_handler)(struct ptlrpc_request *req);
+        struct list_head   srv_threads;
+        struct obd_device *srv_obddev;
+        svc_handler_t      srv_handler;
+        
         char *srv_name;  /* only statically allocated strings here; we don't clean them */
-        struct proc_dir_entry *srv_procroot;
-        struct lprocfs_stats  *srv_stats;
 
-        int                  srv_interface_rover;
+        spinlock_t               srv_lock;
+
+        struct proc_dir_entry   *srv_procroot;
+        struct lprocfs_stats    *srv_stats;
+        
         struct ptlrpc_srv_ni srv_interfaces[0];
 };
 
-typedef int (*svc_handler_t)(struct ptlrpc_request *req);
-
 /* ptlrpc/events.c */
 extern struct ptlrpc_ni ptlrpc_interfaces[];
 extern int              ptlrpc_ninterfaces;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, struct ptlrpc_peer *peer);
+extern void request_out_callback (ptl_event_t *ev);
+extern void reply_in_callback(ptl_event_t *ev);
+extern void client_bulk_callback (ptl_event_t *ev);
+extern void request_in_callback(ptl_event_t *ev);
+extern void reply_out_callback(ptl_event_t *ev);
+extern void server_bulk_callback (ptl_event_t *ev);
 
 /* ptlrpc/connection.c */
 void ptlrpc_dump_connections(void);
@@ -439,28 +488,28 @@ void ptlrpc_init_connection(void);
 void ptlrpc_cleanup_connection(void);
 
 /* ptlrpc/niobuf.c */
-int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *);
-int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *);
-void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
+int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
+void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
 int ptlrpc_register_bulk(struct ptlrpc_request *req);
 void ptlrpc_unregister_bulk (struct ptlrpc_request *req);
 
-static inline int ptlrpc_bulk_complete (struct ptlrpc_bulk_desc *desc) 
+static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc) 
 {
         unsigned long flags;
         int           rc;
 
         spin_lock_irqsave (&desc->bd_lock, flags);
-        rc = desc->bd_complete;
+        rc = desc->bd_network_rw;
         spin_unlock_irqrestore (&desc->bd_lock, flags);
         return (rc);
 }
 
+int ptlrpc_send_reply(struct ptlrpc_request *req, int);
 int ptlrpc_reply(struct ptlrpc_request *req);
 int ptlrpc_error(struct ptlrpc_request *req);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
 int ptl_send_rpc(struct ptlrpc_request *request);
-void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd);
+void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
 
 /* ptlrpc/client.c */
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
@@ -468,6 +517,39 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
 void ptlrpc_cleanup_client(struct obd_import *imp);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
 
+static inline int
+ptlrpc_client_receiving_reply (struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        int           rc;
+        
+        spin_lock_irqsave(&req->rq_lock, flags);
+        rc = req->rq_receiving_reply;
+        spin_unlock_irqrestore(&req->rq_lock, flags);
+        return (rc);
+}
+
+static inline int
+ptlrpc_client_replied (struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        int           rc;
+        
+        spin_lock_irqsave(&req->rq_lock, flags);
+        rc = req->rq_replied;
+        spin_unlock_irqrestore(&req->rq_lock, flags);
+        return (rc);
+}
+
+static inline void
+ptlrpc_wake_client_req (struct ptlrpc_request *req)
+{
+        if (req->rq_set == NULL)
+                wake_up(&req->rq_reply_waitq);
+        else
+                wake_up(&req->rq_set->set_waitq);
+}
+
 int ptlrpc_queue_wait(struct ptlrpc_request *req);
 int ptlrpc_replay_req(struct ptlrpc_request *req);
 void ptlrpc_unregister_reply(struct ptlrpc_request *req);
@@ -493,28 +575,32 @@ void ptlrpc_req_finished(struct ptlrpc_request *request);
 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
-                                               int type, int portal);
+                                               int npages, int type, int portal);
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
-                                              int type, int portal);
+                                              int npages, int type, int portal);
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
-int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
-                          struct page *page, int pageoffset, int len);
-void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
+void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+                           struct page *page, int pageoffset, int len);
 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                       struct obd_import *imp);
 __u64 ptlrpc_next_xid(void);
 
 /* ptlrpc/service.c */
-struct ptlrpc_service *
-ptlrpc_init_svc(__u32 nevents, __u32 nbufs, __u32 bufsize, __u32 max_req_size,
-                int req_portal, int rep_portal, svc_handler_t, char *name,
-                struct proc_dir_entry *proc_entry);
+void ptlrpc_save_lock (struct ptlrpc_request *req, 
+                       struct lustre_handle *lock, int mode);
+void ptlrpc_commit_replies (struct obd_device *obd);
+void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
+struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
+                                       int req_portal, int rep_portal, 
+                                       svc_handler_t, char *name,
+                                       struct proc_dir_entry *proc_entry);
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
 int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
                            int cnt, char *base_name);
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                         char *name);
 int ptlrpc_unregister_service(struct ptlrpc_service *service);
+int liblustre_check_services (void *arg);
 
 struct ptlrpc_svc_data {
         char *name;
@@ -535,6 +621,7 @@ int lustre_pack_request(struct ptlrpc_request *, int count, int *lens,
                         char **bufs);
 int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
                       char **bufs);
+void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
 int lustre_msg_size(int count, int *lengths);
 int lustre_unpack_msg(struct lustre_msg *m, int len);
 void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
@@ -571,7 +658,6 @@ void ptlrpc_lprocfs_unregister_obd(struct obd_device *obddev);
 #endif
 
 /* ptlrpc/llog_server.c */
-struct llog_obd_ctxt;
 int llog_origin_handle_create(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_read_header(struct ptlrpc_request *req);
index 619010b..ec90c84 100644 (file)
@@ -480,7 +480,8 @@ struct obd_device {
         int                              obd_replayed_requests;
         int                              obd_requests_queued_for_recovery;
         wait_queue_head_t                obd_next_transno_waitq;
-        wait_queue_head_t                obd_commit_waitq;
+        struct list_head                 obd_uncommitted_replies;
+        spinlock_t                       obd_uncommitted_replies_lock;
         struct timer_list                obd_recovery_timer;
         struct list_head                 obd_recovery_queue;
         struct list_head                 obd_delayed_reply_queue;
@@ -666,7 +667,7 @@ static inline void obd_transno_commit_cb(struct obd_device *obd, __u64 transno,
                obd->obd_name, transno);
         if (transno > obd->obd_last_committed) {
                 obd->obd_last_committed = transno;
-                wake_up(&obd->obd_commit_waitq);
+                ptlrpc_commit_replies (obd);
         }
 }
 
index 6293972..ff8d63b 100644 (file)
 +++ 25/arch/parisc/lib/checksum.c      2003-10-05 00:33:23.000000000 -0700
 @@ -16,8 +16,10 @@
   *
-  * $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+  * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   */
 -#include <net/checksum.h>
 +#include <linux/module.h>
 --- linux-2.6.0-test6/drivers/char/ftape/compressor/zftape-compress.c  2003-06-14 12:18:32.000000000 -0700
 +++ 25/drivers/char/ftape/compressor/zftape-compress.c 2003-10-05 00:33:24.000000000 -0700
 @@ -31,6 +31,7 @@
-  char zftc_rev[] = "$Revision: 1.3 $";
-  char zftc_dat[] = "$Date: 2003/12/03 05:12:20 $";
+  char zftc_rev[] = "$Revision: 1.4 $";
+  char zftc_dat[] = "$Date: 2004/02/14 03:14:33 $";
  
 +#include <linux/version.h>
  #include <linux/errno.h>
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divamnt.c    2003-09-27 18:57:44.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/divamnt.c   2003-10-05 00:33:24.000000000 -0700
 @@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * Driver for Eicon DIVA Server ISDN cards.
   * Maint module
 -#include "di_defs.h"
  #include "debug_if.h"
  
--static char *main_revision = "$Revision: 1.3 $";
-+static char *main_revision = "$Revision: 1.3 $";
+-static char *main_revision = "$Revision: 1.4 $";
++static char *main_revision = "$Revision: 1.4 $";
  
  static int major;
  
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/divasmain.c  2003-09-27 18:57:44.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/divasmain.c 2003-10-05 00:33:24.000000000 -0700
 @@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * Low level driver for Eicon DIVA Server ISDN cards.
   *
  #include "diva_dma.h"
  #include "diva_pci.h"
  
--static char *main_revision = "$Revision: 1.3 $";
-+static char *main_revision = "$Revision: 1.3 $";
+-static char *main_revision = "$Revision: 1.4 $";
++static char *main_revision = "$Revision: 1.4 $";
  
  static int major;
  
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/dqueue.c     2003-06-14 12:18:22.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/dqueue.c    2003-10-05 00:33:24.000000000 -0700
 @@ -1,10 +1,10 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * Driver for Eicon DIVA Server ISDN cards.
   * User Mode IDI Interface
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/mntfunc.c    2003-09-27 18:57:44.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/mntfunc.c   2003-10-05 00:33:24.000000000 -0700
 @@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * Driver for Eicon DIVA Server ISDN cards.
   * Maint module
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/os_capi.h    2003-06-14 12:18:25.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/os_capi.h   2003-10-05 00:33:24.000000000 -0700
 @@ -1,10 +1,10 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * ISDN interface module for Eicon active cards DIVA.
   * CAPI Interface OS include files 
 --- linux-2.6.0-test6/drivers/isdn/hardware/eicon/platform.h   2003-09-27 18:57:44.000000000 -0700
 +++ 25/drivers/isdn/hardware/eicon/platform.h  2003-10-05 00:33:24.000000000 -0700
 @@ -1,4 +1,4 @@
--/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
-+/* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+-/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
++/* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   *
   * platform.h
   * 
 +++ 25/drivers/media/video/planb.c     2003-10-05 00:33:24.000000000 -0700
 @@ -27,7 +27,6 @@
  
- /* $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $ */
+ /* $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ */
  
 -#include <linux/version.h>
  #include <linux/init.h>
 --- linux-2.6.0-test6/drivers/mtd/chips/map_rom.c      2003-06-14 12:18:24.000000000 -0700
 +++ 25/drivers/mtd/chips/map_rom.c     2003-10-05 00:33:24.000000000 -0700
 @@ -4,7 +4,6 @@
-  * $Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $
+  * $Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $
   */
  
 -#include <linux/version.h>
  #include <linux/hdlc.h>
  
  /* Version */
--static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $ for Linux\n";
-+static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.3 2003/12/03 05:12:20 phil Exp $ for Linux\n";
+-static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n";
++static const char version[] = "$Id: 2.6.0-test6-mm4.patch,v 1.4 2004/02/14 03:14:33 rread Exp $ for Linux\n";
  static int debug;
  static int quartz;
  
index 5411d9c..54d1f68 100644 (file)
@@ -1,4 +1,4 @@
-$Id: bproc-patch-2.4.20,v 1.3 2003/12/03 05:12:25 phil Exp $
+$Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
 
 Index: linux/fs/exec.c
 ===================================================================
@@ -764,7 +764,7 @@ Index: linux/kernel/bproc_hook.c
 + *  along with this program; if not, write to the Free Software
 + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 + *
-+ * $Id: bproc-patch-2.4.20,v 1.3 2003/12/03 05:12:25 phil Exp $
++ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
 + *-----------------------------------------------------------------------*/
 +#include <linux/kernel.h>
 +#include <linux/sched.h>
@@ -832,7 +832,7 @@ Index: linux/include/linux/bproc.h
 + *  along with this program; if not, write to the Free Software
 + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 + *
-+ * $Id: bproc-patch-2.4.20,v 1.3 2003/12/03 05:12:25 phil Exp $
++ * $Id: bproc-patch-2.4.20,v 1.4 2004/02/14 03:14:37 rread Exp $
 + *-----------------------------------------------------------------------*/
 +#ifndef _LINUX_BPROC_H
 +#define _LINUX_BPROC_H
index 05fcf61..818596c 100644 (file)
@@ -1,7 +1,7 @@
 Index: linux-2.4.20/fs/ext3/xattr.c
 ===================================================================
---- linux-2.4.20.orig/fs/ext3/xattr.c  2003-11-13 17:14:52.000000000 +0300
-+++ linux-2.4.20/fs/ext3/xattr.c       2003-11-21 16:43:48.000000000 +0300
+--- linux-2.4.20.orig/fs/ext3/xattr.c  2003-11-13 10:59:33.000000000 +0800
++++ linux-2.4.20/fs/ext3/xattr.c       2003-11-25 21:16:51.000000000 +0800
 @@ -1293,9 +1293,10 @@
                                goto cleanup;
                        memcpy(header, HDR(bh), bh->b_size);
index 768f778..0da12fc 100644 (file)
@@ -13,6 +13,7 @@ if LIBLUSTRE
 lib_LIBRARIES = libldlm.a
 libldlm_a_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_lib.c \
   ldlm_plain.c ldlm_extent.c ldlm_request.c ldlm_lockd.c ldlm_internal.h
+libldlm_a_CFLAGS = -fPIC
 endif
 
 include $(top_srcdir)/Rules
index 4b7eb3b..d1e2b49 100644 (file)
@@ -497,6 +497,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         export = req->rq_export = class_conn2export(&conn);
         LASSERT(export != NULL);
 
+        /* request from liblustre? */
+        if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
+                export->exp_libclient = 1;
+
         if (export->exp_connection != NULL)
                 ptlrpc_put_connection(export->exp_connection);
         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
@@ -888,6 +892,8 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         int recovery_done = 0;
         int rc2;
 
+        LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
+
         if (rc) {
                 /* Just like ptlrpc_error, but without the sending. */
                 rc = lustre_pack_reply(req, 0, NULL, NULL);
@@ -895,6 +901,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 req->rq_type = PTL_RPC_MSG_ERR;
         }
 
+        LASSERT (!req->rq_reply_state->rs_difficult);
         LASSERT(list_empty(&req->rq_list));
         /* XXX a bit like the request-dup code in queue_recovery_request */
         OBD_ALLOC(saved_req, sizeof *saved_req);
@@ -905,6 +912,8 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 LBUG();
         memcpy(saved_req, req, sizeof *saved_req);
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
+        /* the copied req takes over the reply state */
+        req->rq_reply_state = NULL;
         req = saved_req;
         req->rq_reqmsg = reqmsg;
         class_export_get(req->rq_export);
@@ -954,180 +963,131 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         return 1;
 }
 
-static void ptlrpc_abort_reply (struct ptlrpc_request *req)
-{
-        /* On return, we must be sure that the ACK callback has either
-         * happened or will not happen.  Note that the SENT callback will
-         * happen come what may since we successfully posted the PUT. */
-        int rc;
-        struct l_wait_info lwi;
-        unsigned long flags;
-
- again:
-        /* serialise with ACK callback */
-        spin_lock_irqsave (&req->rq_lock, flags);
-        if (!req->rq_want_ack) {
-                spin_unlock_irqrestore (&req->rq_lock, flags);
-                /* The ACK callback has happened already.  Although the
-                 * SENT callback might still be outstanding (yes really) we
-                 * don't care; this is just like normal completion. */
-                return;
-        }
-        spin_unlock_irqrestore (&req->rq_lock, flags);
-
-        /* Have a bash at unlinking the MD.  This will fail until the SENT
-         * callback has happened since the MD is busy from the PUT.  If the
-         * ACK still hasn't arrived after then, a successful unlink will
-         * ensure the ACK callback never happens. */
-        rc = PtlMDUnlink (req->rq_reply_md_h);
-        switch (rc) {
-        default:
-                LBUG ();
-        case PTL_OK:
-                /* SENT callback happened; ACK callback preempted */
-                LASSERT (req->rq_want_ack);
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_want_ack = 0;
-                spin_unlock_irqrestore (&req->rq_lock, flags);
-                return;
-        case PTL_INV_MD:
-                return;
-        case PTL_MD_INUSE:
-                /* Still sending or ACK callback in progress: wait until
-                 * either callback has completed and try again.
-                 * Actually we can't wait for the SENT callback because
-                 * there's no state the SENT callback can touch that will
-                 * allow it to communicate with us!  So we just wait here
-                 * for a short time, effectively polling for the SENT
-                 * callback by calling PtlMDUnlink() again, to see if it
-                 * has finished.  Note that if the ACK does arrive, its
-                 * callback wakes us in short order. --eeb */
-                lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
-                rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack,
-                                  &lwi);
-                CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
-                /* NB go back and test rq_want_ack with locking, to ensure
-                 * if ACK callback happened, it has completed stopped
-                 * referencing this req. */
-                goto again;
-        }
-}
-
-void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
+int
+target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
 {
-        int i;
-        int netrc;
-        unsigned long flags;
-        struct ptlrpc_req_ack_lock *ack_lock;
-        struct l_wait_info lwi = { 0 };
-        wait_queue_t commit_wait;
-        struct obd_device *obd =
-                req->rq_export ? req->rq_export->exp_obd : NULL;
-        struct obd_export *exp = NULL;
-
-        if (req->rq_export) {
-                for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
-                        if (req->rq_ack_locks[i].mode) {
-                                exp = req->rq_export;
-                                break;
+        if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
+                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+                DEBUG_REQ(D_ERROR, req, "dropping reply");
+                /* NB this does _not_ send with ACK disabled, to simulate
+                 * sending OK, but timing out for the ACK */
+                if (req->rq_reply_state != NULL) {
+                        if (!req->rq_reply_state->rs_difficult) {
+                                lustre_free_reply_state (req->rq_reply_state);
+                                req->rq_reply_state = NULL;
+                        } else {
+                                struct ptlrpc_service *svc =
+                                        req->rq_rqbd->rqbd_srv_ni->sni_service;
+                                atomic_inc(&svc->srv_outstanding_replies);
                         }
                 }
+              &