Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
authorphil <phil>
Fri, 4 Jun 2004 15:14:58 +0000 (15:14 +0000)
committerphil <phil>
Fri, 4 Jun 2004 15:14:58 +0000 (15:14 +0000)
- b1_4 is basically the parent of HEAD, because that's the direction
  that changes flow, as strange as that sounds.  so there's a
  HEAD_BASE tag which sits on b1_4

177 files changed:
ldiskfs/kernel_patches/patches/iopen-2.6-suse.patch
ldiskfs/kernel_patches/series/ldiskfs-2.6-suse.series
ldiskfs/ldiskfs/autoMakefile.am
lnet/archdep.m4
lnet/include/linux/kp30.h
lnet/include/linux/kpr.h
lnet/include/linux/libcfs.h
lnet/include/linux/portals_lib.h
lnet/include/lnet/api-support.h
lnet/include/lnet/api.h
lnet/include/lnet/arg-blocks.h [deleted file]
lnet/include/lnet/errno.h
lnet/include/lnet/internal.h
lnet/include/lnet/lib-dispatch.h [deleted file]
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-nal.h [deleted file]
lnet/include/lnet/lib-p30.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/nal.h
lnet/include/lnet/types.h
lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_api.c
lnet/klnds/gmlnd/gmlnd_cb.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/qswlnd/qswlnd.c
lnet/klnds/qswlnd/qswlnd.h
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/libcfs/module.c
lnet/lnet/Makefile.in
lnet/lnet/Makefile.mk
lnet/lnet/api-eq.c [deleted file]
lnet/lnet/api-errno.c
lnet/lnet/api-init.c [deleted file]
lnet/lnet/api-me.c [deleted file]
lnet/lnet/api-ni.c
lnet/lnet/api-wrap.c
lnet/lnet/autoMakefile.am
lnet/lnet/lib-dispatch.c [deleted file]
lnet/lnet/lib-eq.c
lnet/lnet/lib-init.c
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/lib-ni.c
lnet/lnet/lib-pid.c
lnet/lnet/module.c
lnet/ulnds/address.c
lnet/ulnds/bridge.h
lnet/ulnds/procapi.c
lnet/ulnds/procbridge.h
lnet/ulnds/proclib.c
lnet/ulnds/socklnd/address.c
lnet/ulnds/socklnd/bridge.h
lnet/ulnds/socklnd/procapi.c
lnet/ulnds/socklnd/procbridge.h
lnet/ulnds/socklnd/proclib.c
lnet/ulnds/socklnd/tcplnd.c
lnet/ulnds/tcplnd.c
lustre/ChangeLog
lustre/autoMakefile.am
lustre/autogen.sh
lustre/configure.in
lustre/include/linux/lustre_compat25.h
lustre/include/linux/lustre_net.h
lustre/kernel_patches/patches/ext-2.4-patch-1-chaos.patch
lustre/kernel_patches/patches/ext-2.4-patch-1-suse-2.4.19.patch
lustre/kernel_patches/patches/ext-2.4-patch-1-suse.patch
lustre/kernel_patches/patches/ext-2.4-patch-1.patch
lustre/kernel_patches/patches/ext-2.4-patch-4.patch
lustre/kernel_patches/patches/ext3-htree-2.4.19-pre1.patch
lustre/kernel_patches/patches/ext3-htree-2.4.21-chaos.patch
lustre/kernel_patches/patches/ext3-htree-2.4.22-rh.patch
lustre/kernel_patches/patches/ext3-htree-rename_fix.patch [new file with mode: 0644]
lustre/kernel_patches/patches/ext3-htree-suse.patch
lustre/kernel_patches/patches/ext3-htree.patch
lustre/kernel_patches/patches/ext3-pdirops-2.4.24-chaos.patch
lustre/kernel_patches/patches/htree-ext3-2.4.18.patch
lustre/kernel_patches/patches/iopen-2.6-suse.patch
lustre/kernel_patches/patches/loop-sync-2.4.21-suse.patch [new file with mode: 0644]
lustre/kernel_patches/patches/lustre_version.patch
lustre/kernel_patches/patches/md_path_lookup-2.6-suse.patch [new file with mode: 0644]
lustre/kernel_patches/patches/vfs_intent-2.6-suse.patch
lustre/kernel_patches/patches/vfs_nointent-2.6-suse.patch
lustre/kernel_patches/series/2.6-suse.series
lustre/kernel_patches/series/ldiskfs-2.6-suse.series
lustre/kernel_patches/series/suse-2.4.21-2
lustre/kernel_patches/targets/2.6-suse.target
lustre/ldiskfs/autoMakefile.am
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/tests/Makefile.am
lustre/llite/file.c
lustre/lov/lov_obd.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/mds/mds_unlink_open.c
lustre/obdclass/class_obd.c
lustre/obdclass/simple.c [deleted file]
lustre/obdfilter/filter_io_26.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/portals/archdep.m4
lustre/portals/include/linux/kp30.h
lustre/portals/include/linux/kpr.h
lustre/portals/include/linux/libcfs.h
lustre/portals/include/linux/portals_lib.h
lustre/portals/include/portals/api-support.h
lustre/portals/include/portals/api.h
lustre/portals/include/portals/arg-blocks.h [deleted file]
lustre/portals/include/portals/errno.h
lustre/portals/include/portals/lib-dispatch.h [deleted file]
lustre/portals/include/portals/lib-nal.h [deleted file]
lustre/portals/include/portals/lib-p30.h
lustre/portals/include/portals/lib-types.h
lustre/portals/include/portals/nal.h
lustre/portals/include/portals/types.h
lustre/portals/knals/gmnal/gmnal.h
lustre/portals/knals/gmnal/gmnal_api.c
lustre/portals/knals/gmnal/gmnal_cb.c
lustre/portals/knals/gmnal/gmnal_comm.c
lustre/portals/knals/qswnal/qswnal.c
lustre/portals/knals/qswnal/qswnal.h
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/portals/knals/socknal/socknal.c
lustre/portals/knals/socknal/socknal.h
lustre/portals/knals/socknal/socknal_cb.c
lustre/portals/libcfs/module.c
lustre/portals/portals/Makefile.in
lustre/portals/portals/Makefile.mk
lustre/portals/portals/api-eq.c [deleted file]
lustre/portals/portals/api-errno.c
lustre/portals/portals/api-init.c [deleted file]
lustre/portals/portals/api-me.c [deleted file]
lustre/portals/portals/api-ni.c
lustre/portals/portals/api-wrap.c
lustre/portals/portals/autoMakefile.am
lustre/portals/portals/lib-dispatch.c [deleted file]
lustre/portals/portals/lib-eq.c
lustre/portals/portals/lib-init.c
lustre/portals/portals/lib-md.c
lustre/portals/portals/lib-me.c
lustre/portals/portals/lib-move.c
lustre/portals/portals/lib-msg.c
lustre/portals/portals/lib-ni.c
lustre/portals/portals/lib-pid.c
lustre/portals/portals/module.c
lustre/portals/unals/address.c
lustre/portals/unals/bridge.h
lustre/portals/unals/procapi.c
lustre/portals/unals/procbridge.h
lustre/portals/unals/proclib.c
lustre/portals/unals/tcpnal.c
lustre/ptlbd/autoMakefile.am
lustre/ptlrpc/events.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/scripts/cvsdiffclient
lustre/scripts/land1.sh
lustre/scripts/lmake
lustre/scripts/lustre-kernel-2.4.spec.in
lustre/scripts/merge1.sh
lustre/tests/.cvsignore
lustre/tests/Makefile.am
lustre/tests/cfg/local.sh
lustre/tests/recovery-small.sh
lustre/tests/rename_many.c [new file with mode: 0644]
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/tests/sanity.sh
lustre/utils/lconf
lustre/utils/llmount.c

index 2133355..8a8d115 100644 (file)
@@ -8,8 +8,8 @@
 
 Index: linux-stage/fs/ext3/Makefile
 ===================================================================
---- linux-stage.orig/fs/ext3/Makefile  2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/Makefile       2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/Makefile  2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/fs/ext3/Makefile       2004-05-11 17:21:21.000000000 -0400
 @@ -4,7 +4,7 @@
  
  obj-$(CONFIG_EXT3_FS) += ext3.o
@@ -21,8 +21,8 @@ Index: linux-stage/fs/ext3/Makefile
  ext3-$(CONFIG_EXT3_FS_XATTR)   += xattr.o xattr_user.o xattr_trusted.o
 Index: linux-stage/fs/ext3/inode.c
 ===================================================================
---- linux-stage.orig/fs/ext3/inode.c   2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/inode.c        2004-05-07 17:21:59.000000000 -0400
+--- linux-stage.orig/fs/ext3/inode.c   2004-05-11 17:21:21.000000000 -0400
++++ linux-stage/fs/ext3/inode.c        2004-05-11 17:21:21.000000000 -0400
 @@ -37,6 +37,7 @@
  #include <linux/mpage.h>
  #include <linux/uio.h>
@@ -43,8 +43,8 @@ Index: linux-stage/fs/ext3/inode.c
        bh = iloc.bh;
 Index: linux-stage/fs/ext3/iopen.c
 ===================================================================
---- linux-stage.orig/fs/ext3/iopen.c   2004-05-07 16:00:17.000000000 -0400
-+++ linux-stage/fs/ext3/iopen.c        2004-05-07 17:22:37.000000000 -0400
+--- linux-stage.orig/fs/ext3/iopen.c   1969-12-31 19:00:00.000000000 -0500
++++ linux-stage/fs/ext3/iopen.c        2004-05-11 17:21:21.000000000 -0400
 @@ -0,0 +1,272 @@
 +/*
 + * linux/fs/ext3/iopen.c
@@ -320,8 +320,8 @@ Index: linux-stage/fs/ext3/iopen.c
 +}
 Index: linux-stage/fs/ext3/iopen.h
 ===================================================================
---- linux-stage.orig/fs/ext3/iopen.h   2004-05-07 16:00:17.000000000 -0400
-+++ linux-stage/fs/ext3/iopen.h        2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/iopen.h   1969-12-31 19:00:00.000000000 -0500
++++ linux-stage/fs/ext3/iopen.h        2004-05-11 17:21:21.000000000 -0400
 @@ -0,0 +1,15 @@
 +/*
 + * iopen.h
@@ -340,8 +340,8 @@ Index: linux-stage/fs/ext3/iopen.h
 +                                         struct inode *inode, int rehash);
 Index: linux-stage/fs/ext3/namei.c
 ===================================================================
---- linux-stage.orig/fs/ext3/namei.c   2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/namei.c        2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/namei.c   2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/fs/ext3/namei.c        2004-05-11 17:21:21.000000000 -0400
 @@ -37,6 +37,7 @@
  #include <linux/buffer_head.h>
  #include <linux/smp_lock.h>
@@ -420,30 +420,30 @@ Index: linux-stage/fs/ext3/namei.c
  }
 Index: linux-stage/fs/ext3/super.c
 ===================================================================
---- linux-stage.orig/fs/ext3/super.c   2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/super.c        2004-05-07 17:21:59.000000000 -0400
+--- linux-stage.orig/fs/ext3/super.c   2004-05-11 17:21:21.000000000 -0400
++++ linux-stage/fs/ext3/super.c        2004-05-11 17:44:53.000000000 -0400
 @@ -536,7 +536,7 @@
        Opt_user_xattr, Opt_nouser_xattr, Opt_acl, Opt_noacl, Opt_noload,
        Opt_commit, Opt_journal_update, Opt_journal_inum,
        Opt_abort, Opt_data_journal, Opt_data_ordered, Opt_data_writeback,
--      Opt_ignore, Opt_err,
-+      Opt_ignore, Opt_err, Opt_iopen, Opt_noiopen, Opt_iopen_nopriv,
+-      Opt_ignore, Opt_barrier,
++      Opt_ignore, Opt_barrier, Opt_iopen, Opt_noiopen, Opt_iopen_nopriv,
+       Opt_err,
  };
  
- static match_table_t tokens = {
-@@ -575,6 +575,9 @@
-       {Opt_ignore, "noquota"},
+@@ -577,6 +577,9 @@
        {Opt_ignore, "quota"},
        {Opt_ignore, "usrquota"},
-+      {Opt_iopen,  "iopen"},
-+      {Opt_noiopen,  "noiopen"},
-+      {Opt_iopen_nopriv,  "iopen_nopriv"},
+       {Opt_barrier, "barrier=%u"},
++      {Opt_iopen, "iopen"},
++      {Opt_noiopen, "noiopen"},
++      {Opt_iopen_nopriv, "iopen_nopriv"},
        {Opt_err, NULL}
  };
  
-@@ -762,6 +765,18 @@
-               case Opt_abort:
-                       set_opt(sbi->s_mount_opt, ABORT);
+@@ -772,6 +775,18 @@
+                       else
+                               clear_opt(sbi->s_mount_opt, BARRIER);
                        break;
 +              case Opt_iopen:
 +                      set_opt (sbi->s_mount_opt, IOPEN);
@@ -462,14 +462,14 @@ Index: linux-stage/fs/ext3/super.c
                default:
 Index: linux-stage/include/linux/ext3_fs.h
 ===================================================================
---- linux-stage.orig/include/linux/ext3_fs.h   2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/include/linux/ext3_fs.h        2004-05-07 16:00:17.000000000 -0400
-@@ -325,6 +325,8 @@
- #define EXT3_MOUNT_NO_UID32           0x2000  /* Disable 32-bit UIDs */
+--- linux-stage.orig/include/linux/ext3_fs.h   2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/include/linux/ext3_fs.h        2004-05-11 17:21:21.000000000 -0400
+@@ -326,6 +326,8 @@
  #define EXT3_MOUNT_XATTR_USER         0x4000  /* Extended user attributes */
  #define EXT3_MOUNT_POSIX_ACL          0x8000  /* POSIX Access Control Lists */
-+#define EXT3_MOUNT_IOPEN             0x10000  /* Allow access via iopen */
-+#define EXT3_MOUNT_IOPEN_NOPRIV              0x20000  /* Make iopen world-readable */
+ #define EXT3_MOUNT_BARRIER            0x10000 /* Use block barriers */
++#define EXT3_MOUNT_IOPEN                0x20000 /* Allow access via iopen */
++#define EXT3_MOUNT_IOPEN_NOPRIV         0x40000 /* Make iopen world-readable */
  
  /* Compatibility, for having both ext2_fs.h and ext3_fs.h included at once */
  #ifndef _LINUX_EXT2_FS_H
index cff99dd..d27088e 100644 (file)
@@ -7,3 +7,4 @@ ext3-init-generation-2.6-suse.patch
 ext3-ea-in-inode-2.6-suse.patch
 export-ext3-2.6-suse.patch
 ext3-include-fixes-2.6-suse.patch
+ext3-htree-rename_fix.patch 
index b24081e..11838d6 100644 (file)
@@ -1,6 +1,8 @@
+if MODULES
 if LDISKFS
 modulefs_DATA = ldiskfs$(KMODEXT)
 endif
+endif
 
 ldiskfs_linux_headers := $(addprefix linux/,$(subst ext3,ldiskfs,$(notdir $(linux_headers))))
 
index 636ee1d..cb6e0a2 100644 (file)
@@ -218,12 +218,13 @@ if test x$enable_modules != xno ; then
        fi
        LUSTRE_MODULE_TRY_MAKE(
                [#include <linux/version.h>],
-               [LINUXRELEASE=UTS_RELEASE],
+               [char *LINUXRELEASE;
+                LINUXRELEASE=UTS_RELEASE;],
                [$makerule LUSTRE_KERNEL_TEST=conftest.i],
                [test -s kernel-tests/conftest.i],
                [
                        # LINUXRELEASE="UTS_RELEASE"
-                       eval $(grep LINUXRELEASE kernel-tests/conftest.i)
+                       eval $(grep "LINUXRELEASE=" kernel-tests/conftest.i)
                ],[
                        AC_MSG_RESULT([unknown])
                        AC_MSG_ERROR([Could not preprocess test program.  Consult config.log for details.])
index c55dd37..6ef28a8 100644 (file)
@@ -7,12 +7,6 @@
 #include <linux/libcfs.h>
 #define PORTAL_DEBUG
 
-#ifndef offsetof
-# define offsetof(typ,memb)     ((unsigned long)((char *)&(((typ *)0)->memb)))
-#endif
-
-#define LOWEST_BIT_SET(x)       ((x) & ~((x) - 1))
-
 #ifdef __KERNEL__
 # include <linux/vmalloc.h>
 # include <linux/time.h>
@@ -647,7 +641,6 @@ enum {
         TCPNAL    = 5,
         ROUTER    = 6,
         IBNAL     = 7,
-        CRAY_KB_ERNAL = 8,
         NAL_ENUM_END_MARKER
 };
 
index 51d2d2f..1127698 100644 (file)
@@ -4,7 +4,7 @@
 #ifndef _KPR_H
 #define _KPR_H
 
-# include <portals/lib-nal.h> /* for ptl_hdr_t */
+# include <portals/lib-types.h> /* for ptl_hdr_t */
 
 /******************************************************************************/
 /* Kernel Portals Router interface */
index c2a15f4..a205163 100644 (file)
@@ -79,9 +79,11 @@ extern unsigned int portal_cerror;
 #define S_PTLROUTER   0x00100000
 #define S_COBD        0x00200000
 #define S_IBNAL       0x00400000
-#define S_LMV         0x00800000
-#define S_SM          0x01000000
-#define S_CMOBD       0x02000000
+#define S_SM          0x00800000
+#define S_ASOBD       0x01000000
+#define S_LMV         0x02000000
+#define S_CMOBD       0x04000000
+
 /* If you change these values, please keep portals/utils/debug.c
  * up to date! */
 
index 609290d..b4741cc 100644 (file)
@@ -77,8 +77,10 @@ static inline char *strdup(const char *str)
 #endif
 
 #ifdef __KERNEL__
+# define NTOH__u16(var) le16_to_cpu(var)
 # define NTOH__u32(var) le32_to_cpu(var)
 # define NTOH__u64(var) le64_to_cpu(var)
+# define HTON__u16(var) cpu_to_le16(var)
 # define HTON__u32(var) cpu_to_le32(var)
 # define HTON__u64(var) cpu_to_le64(var)
 #else
@@ -92,8 +94,10 @@ static inline char *strdup(const char *str)
        };       \
        (ret);     \
     })
+# define NTOH__u16(var) (var)
 # define NTOH__u32(var) (var)
 # define NTOH__u64(var) (expansion_u64(var))
+# define HTON__u16(var) (var)
 # define HTON__u32(var) (var)
 # define HTON__u64(var) (expansion_u64(var))
 #endif
index cfae78c..c5994c6 100644 (file)
@@ -19,9 +19,4 @@
 
 #include <portals/internal.h>
 #include <portals/nal.h>
-#include <portals/arg-blocks.h>
 
-/* Hack for 2.4.18 macro name collision */
-#ifdef yield
-#undef yield
-#endif
index 6d382bb..c7aaced 100644 (file)
@@ -5,7 +5,6 @@
 
 #include <portals/types.h>
 
-#ifndef PTL_NO_WRAP
 int PtlInit(int *);
 void PtlFini(void);
 
@@ -17,8 +16,6 @@ int PtlNIInitialized(ptl_interface_t);
 
 int PtlNIFini(ptl_handle_ni_t interface_in);
 
-#endif
-
 int PtlGetId(ptl_handle_ni_t ni_handle, ptl_process_id_t *id);
 
 
@@ -32,9 +29,7 @@ int PtlNIStatus(ptl_handle_ni_t interface_in, ptl_sr_index_t register_in,
 int PtlNIDist(ptl_handle_ni_t interface_in, ptl_process_id_t process_in,
               unsigned long *distance_out);
 
-#ifndef PTL_NO_WRAP
 int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * interface_out);
-#endif
 
 
 /* 
@@ -74,16 +69,12 @@ int PtlMEUnlink(ptl_handle_me_t current_in);
 
 int PtlMEUnlinkList(ptl_handle_me_t current_in);
 
-int PtlTblDump(ptl_handle_ni_t ni, int index_in);
-int PtlMEDump(ptl_handle_me_t current_in);
-
 
 
 /*
  * Memory descriptors
  */
 
-#ifndef PTL_NO_WRAP
 int PtlMDAttach(ptl_handle_me_t current_in, ptl_md_t md_in,
                 ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out);
 
@@ -95,7 +86,6 @@ int PtlMDUnlink(ptl_handle_md_t md_in);
 int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t * old_inout,
                 ptl_md_t * new_inout, ptl_handle_eq_t testq_in);
 
-#endif
 
 /* These should not be called by users */
 int PtlMDUpdate_internal(ptl_handle_md_t md_in, ptl_md_t * old_inout,
@@ -108,16 +98,11 @@ int PtlMDUpdate_internal(ptl_handle_md_t md_in, ptl_md_t * old_inout,
 /*
  * Event queues
  */
-#ifndef PTL_NO_WRAP
-
-/* These should be called by users */
 int PtlEQAlloc(ptl_handle_ni_t ni_in, ptl_size_t count_in,
                ptl_eq_handler_t handler,
                ptl_handle_eq_t *handle_out);
 int PtlEQFree(ptl_handle_eq_t eventq_in);
 
-int PtlEQCount(ptl_handle_eq_t eventq_in, ptl_size_t * count_out);
-
 int PtlEQGet(ptl_handle_eq_t eventq_in, ptl_event_t * event_out);
 
 
@@ -125,7 +110,6 @@ int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t * event_out);
 
 int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
              ptl_event_t *event_out, int *which_out);
-#endif
 
 /*
  * Access Control Table
diff --git a/lnet/include/lnet/arg-blocks.h b/lnet/include/lnet/arg-blocks.h
deleted file mode 100644 (file)
index 21e30d5..0000000
+++ /dev/null
@@ -1,268 +0,0 @@
-#ifndef PTL_BLOCKS_H
-#define PTL_BLOCKS_H
-
-#include "build_check.h"
-
-/*
- * blocks.h
- *
- * Argument block types for the Portals 3.0 library
- * Generated by idl
- *
- */
-
-#include <portals/types.h>
-
-/* put LIB_MAX_DISPATCH last here  -- these must match the
-   assignements to the dispatch table in lib-p30/dispatch.c */
-#define PTL_GETID     1
-#define PTL_NISTATUS  2
-#define PTL_NIDIST    3
-// #define PTL_NIDEBUG   4
-#define PTL_MEATTACH  5
-#define PTL_MEINSERT  6
-// #define PTL_MEPREPEND 7
-#define PTL_MEUNLINK  8
-#define PTL_TBLDUMP   9 
-#define PTL_MEDUMP   10
-#define PTL_MDATTACH 11
-// #define PTL_MDINSERT 12
-#define PTL_MDBIND   13
-#define PTL_MDUPDATE 14
-#define PTL_MDUNLINK 15
-#define PTL_EQALLOC  16
-#define PTL_EQFREE   17
-#define PTL_ACENTRY  18
-#define PTL_PUT      19 
-#define PTL_GET      20
-#define PTL_FAILNID  21
-#define LIB_MAX_DISPATCH 21
-
-typedef struct PtlFailNid_in {
-       ptl_handle_ni_t interface;
-       ptl_nid_t       nid;
-       unsigned int    threshold;
-} PtlFailNid_in;
-
-typedef struct PtlFailNid_out {
-       int             rc;
-} PtlFailNid_out;
-
-typedef struct PtlGetId_in {
-        ptl_handle_ni_t handle_in;
-} PtlGetId_in;
-
-typedef struct PtlGetId_out {
-        int rc;
-        ptl_process_id_t id_out;
-} PtlGetId_out;
-
-typedef struct PtlNIStatus_in {
-        ptl_handle_ni_t interface_in;
-        ptl_sr_index_t register_in;
-} PtlNIStatus_in;
-
-typedef struct PtlNIStatus_out {
-        int rc;
-        ptl_sr_value_t status_out;
-} PtlNIStatus_out;
-
-
-typedef struct PtlNIDist_in {
-        ptl_handle_ni_t interface_in;
-        ptl_process_id_t process_in;
-} PtlNIDist_in;
-
-typedef struct PtlNIDist_out {
-        int rc;
-        unsigned long distance_out;
-} PtlNIDist_out;
-
-
-typedef struct PtlNIDebug_in {
-        unsigned int mask_in;
-} PtlNIDebug_in;
-
-typedef struct PtlNIDebug_out {
-        unsigned int rc;
-} PtlNIDebug_out;
-
-
-typedef struct PtlMEAttach_in {
-        ptl_handle_ni_t interface_in;
-        ptl_pt_index_t index_in;
-        ptl_ins_pos_t position_in;
-        ptl_process_id_t match_id_in;
-        ptl_match_bits_t match_bits_in;
-        ptl_match_bits_t ignore_bits_in;
-        ptl_unlink_t unlink_in;
-} PtlMEAttach_in;
-
-typedef struct PtlMEAttach_out {
-        int rc;
-        ptl_handle_me_t handle_out;
-} PtlMEAttach_out;
-
-
-typedef struct PtlMEInsert_in {
-        ptl_handle_me_t current_in;
-        ptl_process_id_t match_id_in;
-        ptl_match_bits_t match_bits_in;
-        ptl_match_bits_t ignore_bits_in;
-        ptl_unlink_t unlink_in;
-        ptl_ins_pos_t position_in;
-} PtlMEInsert_in;
-
-typedef struct PtlMEInsert_out {
-        int rc;
-        ptl_handle_me_t handle_out;
-} PtlMEInsert_out;
-
-typedef struct PtlMEUnlink_in {
-        ptl_handle_me_t current_in;
-        ptl_unlink_t unlink_in;
-} PtlMEUnlink_in;
-
-typedef struct PtlMEUnlink_out {
-        int rc;
-} PtlMEUnlink_out;
-
-
-typedef struct PtlTblDump_in {
-        int index_in;
-} PtlTblDump_in;
-
-typedef struct PtlTblDump_out {
-        int rc;
-} PtlTblDump_out;
-
-
-typedef struct PtlMEDump_in {
-        ptl_handle_me_t current_in;
-} PtlMEDump_in;
-
-typedef struct PtlMEDump_out {
-        int rc;
-} PtlMEDump_out;
-
-
-typedef struct PtlMDAttach_in {
-        ptl_handle_me_t me_in;
-        ptl_handle_eq_t eq_in;
-        ptl_md_t md_in;
-        ptl_unlink_t unlink_in;
-} PtlMDAttach_in;
-
-typedef struct PtlMDAttach_out {
-        int rc;
-        ptl_handle_md_t handle_out;
-} PtlMDAttach_out;
-
-
-typedef struct PtlMDBind_in {
-        ptl_handle_ni_t ni_in;
-        ptl_handle_eq_t eq_in;
-        ptl_md_t md_in;
-       ptl_unlink_t unlink_in;
-} PtlMDBind_in;
-
-typedef struct PtlMDBind_out {
-        int rc;
-        ptl_handle_md_t handle_out;
-} PtlMDBind_out;
-
-
-typedef struct PtlMDUpdate_internal_in {
-        ptl_handle_md_t md_in;
-        ptl_handle_eq_t testq_in;
-        ptl_seq_t sequence_in;
-
-        ptl_md_t old_inout;
-        int old_inout_valid;
-        ptl_md_t new_inout;
-        int new_inout_valid;
-} PtlMDUpdate_internal_in;
-
-typedef struct PtlMDUpdate_internal_out {
-        int rc;
-        ptl_md_t old_inout;
-        ptl_md_t new_inout;
-} PtlMDUpdate_internal_out;
-
-
-typedef struct PtlMDUnlink_in {
-        ptl_handle_md_t md_in;
-} PtlMDUnlink_in;
-
-typedef struct PtlMDUnlink_out {
-        int rc;
-        ptl_md_t status_out;
-} PtlMDUnlink_out;
-
-
-typedef struct PtlEQAlloc_in {
-        ptl_handle_ni_t ni_in;
-        ptl_size_t count_in;
-        void *base_in;
-        int len_in;
-        ptl_eq_handler_t callback_in;
-} PtlEQAlloc_in;
-
-typedef struct PtlEQAlloc_out {
-        int rc;
-        ptl_handle_eq_t handle_out;
-} PtlEQAlloc_out;
-
-
-typedef struct PtlEQFree_in {
-        ptl_handle_eq_t eventq_in;
-} PtlEQFree_in;
-
-typedef struct PtlEQFree_out {
-        int rc;
-} PtlEQFree_out;
-
-
-typedef struct PtlACEntry_in {
-        ptl_handle_ni_t ni_in;
-        ptl_ac_index_t index_in;
-        ptl_process_id_t match_id_in;
-        ptl_pt_index_t portal_in;
-} PtlACEntry_in;
-
-typedef struct PtlACEntry_out {
-        int rc;
-} PtlACEntry_out;
-
-
-typedef struct PtlPut_in {
-        ptl_handle_md_t md_in;
-        ptl_ack_req_t ack_req_in;
-        ptl_process_id_t target_in;
-        ptl_pt_index_t portal_in;
-        ptl_ac_index_t cookie_in;
-        ptl_match_bits_t match_bits_in;
-        ptl_size_t offset_in;
-        ptl_hdr_data_t hdr_data_in;
-} PtlPut_in;
-
-typedef struct PtlPut_out {
-        int rc;
-} PtlPut_out;
-
-
-typedef struct PtlGet_in {
-        ptl_handle_md_t md_in;
-        ptl_process_id_t target_in;
-        ptl_pt_index_t portal_in;
-        ptl_ac_index_t cookie_in;
-        ptl_match_bits_t match_bits_in;
-        ptl_size_t offset_in;
-} PtlGet_in;
-
-typedef struct PtlGet_out {
-        int rc;
-} PtlGet_out;
-
-
-#endif
index a98bfd9..42f2626 100644 (file)
@@ -41,7 +41,10 @@ typedef enum {
 
        PTL_EQ_IN_USE           = 21,
 
-        PTL_MAX_ERRNO          = 22
+       PTL_NI_INVALID          = 22,
+       PTL_MD_ILLEGAL          = 23,
+       
+        PTL_MAX_ERRNO          = 24
 } ptl_err_t;
 /* If you change these, you must update the string table in api-errno.c */
 
index 25778e4..eae00a0 100644 (file)
 
 extern int ptl_init;           /* Has the library been initialized */
 
-extern int ptl_ni_init(void);
-extern void ptl_ni_fini(void);
-
-static inline ptl_eq_t *
-ptl_handle2usereq (ptl_handle_eq_t *handle)
-{
-        /* EQ handles are a little wierd.  On the "user" side, the cookie
-         * is just a pointer to a queue of events in shared memory.  It's
-         * cb_eq_handle is the "real" handle which we pass when we
-         * call do_forward(). */
-        return (ptl_eq_t *)((unsigned long)handle->cookie);
-}
-
 #endif
diff --git a/lnet/include/lnet/lib-dispatch.h b/lnet/include/lnet/lib-dispatch.h
deleted file mode 100644 (file)
index 610c776..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-#ifndef PTL_DISPATCH_H
-#define PTL_DISPATCH_H
-
-#include "build_check.h"
-/*
- * include/dispatch.h
- *
- * Dispatch table header and externs for remote side
- * operations
- *
- * Generated by idl
- *
- */
-
-#include <portals/lib-p30.h>
-#include <portals/arg-blocks.h>
-
-extern int do_PtlGetId(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlNIStatus(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlNIDist(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEAttach(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEInsert(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEPrepend(nal_cb_t * nal, void *private, void *args,
-                           void *ret);
-extern int do_PtlMEUnlink(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlTblDump(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEDump(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMDAttach(nal_cb_t * nal, void *private, void *args,
-                                   void *ret);
-extern int do_PtlMDBind(nal_cb_t * nal, void *private, void *args,
-                                 void *ret);
-extern int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *args,
-                                   void *ret);
-extern int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *args,
-                                   void *ret);
-extern int do_PtlEQAlloc_internal(nal_cb_t * nal, void *private, void *args,
-                                  void *ret);
-extern int do_PtlEQFree_internal(nal_cb_t * nal, void *private, void *args,
-                                 void *ret);
-extern int do_PtlPut(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlGet(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlFailNid (nal_cb_t *nal, void *private, void *args, void *ret);
-
-extern char *dispatch_name(int index);
-#endif
index efa929c..4daf219 100644 (file)
 #else
 # include <portals/list.h>
 # include <string.h>
+# include <pthread.h>
 #endif
 #include <portals/types.h>
 #include <linux/kp30.h>
 #include <portals/p30.h>
+#include <portals/nal.h>
 #include <portals/lib-types.h>
-#include <portals/lib-nal.h>
-#include <portals/lib-dispatch.h>
 
 static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
 {
@@ -31,17 +31,18 @@ static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
                 wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
 }
 
-#define state_lock(nal,flagsp)                          \
-do {                                                    \
-        CDEBUG(D_PORTALS, "taking state lock\n");       \
-        nal->cb_cli(nal, flagsp);                       \
-} while (0)
+#ifdef __KERNEL__
+#define LIB_LOCK(nal,flags)                                     \
+        spin_lock_irqsave(&(nal)->libnal_ni.ni_lock, flags)
+#define LIB_UNLOCK(nal,flags)                                   \
+        spin_unlock_irqrestore(&(nal)->libnal_ni.ni_lock, flags)
+#else
+#define LIB_LOCK(nal,flags)                                             \
+        (pthread_mutex_lock(&(nal)->libnal_ni.ni_mutex), (flags) = 0)
+#define LIB_UNLOCK(nal,flags)                                   \
+        pthread_mutex_unlock(&(nal)->libnal_ni.ni_mutex)
+#endif
 
-#define state_unlock(nal,flagsp)                        \
-{                                                       \
-        CDEBUG(D_PORTALS, "releasing state lock\n");    \
-        nal->cb_sti(nal, flagsp);                       \
-}
 
 #ifdef PTL_USE_LIB_FREELIST
 
@@ -50,13 +51,13 @@ do {                                                    \
 #define MAX_MSGS        2048    /* Outstanding messages */
 #define MAX_EQS         512
 
-extern int lib_freelist_init (nal_cb_t *nal, lib_freelist_t *fl, int nobj, int objsize);
-extern void lib_freelist_fini (nal_cb_t *nal, lib_freelist_t *fl);
+extern int lib_freelist_init (lib_nal_t *nal, lib_freelist_t *fl, int nobj, int objsize);
+extern void lib_freelist_fini (lib_nal_t *nal, lib_freelist_t *fl);
 
 static inline void *
 lib_freelist_alloc (lib_freelist_t *fl)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_freeobj_t *o;
 
         if (list_empty (&fl->fl_list))
@@ -70,7 +71,7 @@ lib_freelist_alloc (lib_freelist_t *fl)
 static inline void
 lib_freelist_free (lib_freelist_t *fl, void *obj)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_freeobj_t *o = list_entry (obj, lib_freeobj_t, fo_contents);
         
         list_add (&o->fo_list, &fl->fl_list);
@@ -78,78 +79,78 @@ lib_freelist_free (lib_freelist_t *fl, void *obj)
 
 
 static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_eq_t      *eq;
         
-        state_lock (nal, &flags);
-        eq = (lib_eq_t *)lib_freelist_alloc (&nal->ni.ni_free_eqs);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        eq = (lib_eq_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_eqs);
+        LIB_UNLOCK (nal, flags);
 
         return (eq);
 }
 
 static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_eqs, eq);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_eqs, eq);
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_md_t      *md;
         
-        state_lock (nal, &flags);
-        md = (lib_md_t *)lib_freelist_alloc (&nal->ni.ni_free_mds);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        md = (lib_md_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mds);
+        LIB_UNLOCK (nal, flags);
 
         return (md);
 }
 
 static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_mds, md);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_mds, md);
 }
 
 static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_me_t      *me;
         
-        state_lock (nal, &flags);
-        me = (lib_me_t *)lib_freelist_alloc (&nal->ni.ni_free_mes);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        me = (lib_me_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mes);
+        LIB_UNLOCK (nal, flags);
         
         return (me);
 }
 
 static inline void
-lib_me_free (nal_cb_t *nal, lib_me_t *me)
+lib_me_free (lib_nal_t *nal, lib_me_t *me)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_mes, me);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_mes, me);
 }
 
 static inline lib_msg_t *
-lib_msg_alloc (nal_cb_t *nal)
+lib_msg_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_msg_t     *msg;
         
-        state_lock (nal, &flags);
-        msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        msg = (lib_msg_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_msgs);
+        LIB_UNLOCK (nal, flags);
 
         if (msg != NULL) {
                 /* NULL pointers, clear flags etc */
@@ -160,18 +161,18 @@ lib_msg_alloc (nal_cb_t *nal)
 }
 
 static inline void
-lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free (lib_nal_t *nal, lib_msg_t *msg)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_msgs, msg);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_msgs, msg);
 }
 
 #else
 
 static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_eq_t *eq;
 
         PORTAL_ALLOC(eq, sizeof(*eq));
@@ -179,16 +180,16 @@ lib_eq_alloc (nal_cb_t *nal)
 }
 
 static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(eq, sizeof(*eq));
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_md_t *md;
         int       size;
         int       niov;
@@ -214,9 +215,9 @@ lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 }
 
 static inline void 
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         int       size;
 
         if ((md->options & PTL_MD_KIOV) != 0)
@@ -228,9 +229,9 @@ lib_md_free (nal_cb_t *nal, lib_md_t *md)
 }
 
 static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_me_t *me;
 
         PORTAL_ALLOC(me, sizeof(*me));
@@ -238,16 +239,16 @@ lib_me_alloc (nal_cb_t *nal)
 }
 
 static inline void 
-lib_me_free(nal_cb_t *nal, lib_me_t *me)
+lib_me_free(lib_nal_t *nal, lib_me_t *me)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(me, sizeof(*me));
 }
 
 static inline lib_msg_t *
-lib_msg_alloc(nal_cb_t *nal)
+lib_msg_alloc(lib_nal_t *nal)
 {
-        /* NEVER called with statelock held; may be in interrupt... */
+        /* NEVER called with liblock held; may be in interrupt... */
         lib_msg_t *msg;
 
         if (in_interrupt())
@@ -264,27 +265,28 @@ lib_msg_alloc(nal_cb_t *nal)
 }
 
 static inline void 
-lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free(lib_nal_t *nal, lib_msg_t *msg)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(msg, sizeof(*msg));
 }
 #endif
 
-extern lib_handle_t *lib_lookup_cookie (nal_cb_t *nal, __u64 cookie, int type);
-extern void lib_initialise_handle (nal_cb_t *nal, lib_handle_t *lh, int type);
-extern void lib_invalidate_handle (nal_cb_t *nal, lib_handle_t *lh);
+extern lib_handle_t *lib_lookup_cookie (lib_nal_t *nal, __u64 cookie, int type);
+extern void lib_initialise_handle (lib_nal_t *nal, lib_handle_t *lh, int type);
+extern void lib_invalidate_handle (lib_nal_t *nal, lib_handle_t *lh);
 
 static inline void
-ptl_eq2handle (ptl_handle_eq_t *handle, lib_eq_t *eq)
+ptl_eq2handle (ptl_handle_eq_t *handle, lib_nal_t *nal, lib_eq_t *eq)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = eq->eq_lh.lh_cookie;
 }
 
 static inline lib_eq_t *
-ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
+ptl_handle2eq (ptl_handle_eq_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie, 
                                               PTL_COOKIE_TYPE_EQ);
         if (lh == NULL)
@@ -294,15 +296,16 @@ ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
 }
 
 static inline void
-ptl_md2handle (ptl_handle_md_t *handle, lib_md_t *md)
+ptl_md2handle (ptl_handle_md_t *handle, lib_nal_t *nal, lib_md_t *md)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = md->md_lh.lh_cookie;
 }
 
 static inline lib_md_t *
-ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
+ptl_handle2md (ptl_handle_md_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
                                               PTL_COOKIE_TYPE_MD);
         if (lh == NULL)
@@ -312,12 +315,12 @@ ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
 }
 
 static inline lib_md_t *
-ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
+ptl_wire_handle2md (ptl_handle_wire_t *wh, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh;
         
-        if (wh->wh_interface_cookie != nal->ni.ni_interface_cookie)
+        if (wh->wh_interface_cookie != nal->libnal_ni.ni_interface_cookie)
                 return (NULL);
         
         lh = lib_lookup_cookie (nal, wh->wh_object_cookie,
@@ -329,15 +332,16 @@ ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
 }
 
 static inline void
-ptl_me2handle (ptl_handle_me_t *handle, lib_me_t *me)
+ptl_me2handle (ptl_handle_me_t *handle, lib_nal_t *nal, lib_me_t *me)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = me->me_lh.lh_cookie;
 }
 
 static inline lib_me_t *
-ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
+ptl_handle2me (ptl_handle_me_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
                                               PTL_COOKIE_TYPE_ME);
         if (lh == NULL)
@@ -346,35 +350,30 @@ ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
         return (lh_entry (lh, lib_me_t, me_lh));
 }
 
-extern int lib_init(nal_cb_t *cb, ptl_process_id_t pid,
+extern int lib_init(lib_nal_t *libnal, nal_t *apinal,
+                    ptl_process_id_t pid,
                     ptl_ni_limits_t *desired_limits, 
                     ptl_ni_limits_t *actual_limits);
-extern int lib_fini(nal_cb_t * cb);
-extern void lib_dispatch(nal_cb_t * cb, void *private, int index,
-                         void *arg_block, void *ret_block);
-extern char *dispatch_name(int index);
+extern int lib_fini(lib_nal_t *libnal);
 
 /*
- * When the NAL detects an incoming message, it should call
- * lib_parse() decode it.  The NAL callbacks will be handed
- * the private cookie as a way for the NAL to maintain state
- * about which transaction is being processed.  An extra parameter,
- * lib_cookie will contain the necessary information for
- * finalizing the message.
- *
- * After it has finished the handling the message, it should
- * call lib_finalize() with the lib_cookie parameter.
- * Call backs will be made to write events, send acks or
- * replies and so on.
+ * When the NAL detects an incoming message header, it should call
+ * lib_parse() decode it.  If the message header is garbage, lib_parse()
+ * returns immediately with failure, otherwise the NAL callbacks will be
+ * called to receive the message body.  They are handed the private cookie
+ * as a way for the NAL to maintain state about which transaction is being
+ * processed.  An extra parameter, lib_msg contains the lib-level message
+ * state for passing to lib_finalize() when the message body has been
+ * received.
  */
-extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+extern void lib_enq_event_locked (lib_nal_t *nal, void *private,
                                   lib_eq_t *eq, ptl_event_t *ev);
-extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, 
+extern void lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, 
                           ptl_ni_fail_t ni_fail_type);
-extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
-extern lib_msg_t *lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, 
+extern ptl_err_t lib_parse (lib_nal_t *nal, ptl_hdr_t *hdr, void *private);
+extern lib_msg_t *lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid, 
                                         lib_msg_t *get_msg);
-extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (lib_nal_t * nal, ptl_hdr_t * hdr);
 
 
 extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
@@ -397,14 +396,65 @@ extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst,
 
 extern void lib_assert_wire_constants (void);
 
-extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+extern ptl_err_t lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
                            ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern ptl_err_t lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
                            ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                            lib_md_t *md, ptl_size_t offset, ptl_size_t len);
 
-extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
-                               ptl_md_t * md_out);
-extern void lib_md_unlink(nal_cb_t * nal, lib_md_t * md_in);
-extern void lib_me_unlink(nal_cb_t * nal, lib_me_t * me_in);
+extern int lib_api_ni_status (nal_t *nal, ptl_sr_index_t sr_idx,
+                              ptl_sr_value_t *status);
+extern int lib_api_ni_dist (nal_t *nal, ptl_process_id_t *pid, 
+                            unsigned long *dist);
+
+extern int lib_api_eq_alloc (nal_t *nal, ptl_size_t count,
+                             ptl_eq_handler_t callback, 
+                             ptl_handle_eq_t *handle);
+extern int lib_api_eq_free(nal_t *nal, ptl_handle_eq_t *eqh);
+extern int lib_api_eq_poll (nal_t *nal, 
+                            ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+                            ptl_event_t *event, int *which);
+
+extern int lib_api_me_attach(nal_t *nal,
+                             ptl_pt_index_t portal,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, 
+                             ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos,
+                             ptl_handle_me_t *handle);
+extern int lib_api_me_insert(nal_t *nal,
+                             ptl_handle_me_t *current_meh,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, 
+                             ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos,
+                             ptl_handle_me_t *handle);
+extern int lib_api_me_unlink (nal_t *nal, ptl_handle_me_t *meh);
+extern void lib_me_unlink(lib_nal_t *nal, lib_me_t *me);
+
+extern int lib_api_get_id(nal_t *nal, ptl_process_id_t *pid);
+
+extern void lib_md_unlink(lib_nal_t *nal, lib_md_t *md);
+extern void lib_md_deconstruct(lib_nal_t *nal, lib_md_t *lmd, ptl_md_t *umd);
+extern int lib_api_md_attach(nal_t *nal, ptl_handle_me_t *meh,
+                             ptl_md_t *umd, ptl_unlink_t unlink, 
+                             ptl_handle_md_t *handle);
+extern int lib_api_md_bind(nal_t *nal, ptl_md_t *umd, ptl_unlink_t unlink,
+                           ptl_handle_md_t *handle);
+extern int lib_api_md_unlink (nal_t *nal, ptl_handle_md_t *mdh);
+extern int lib_api_md_update (nal_t *nal, ptl_handle_md_t *mdh,
+                              ptl_md_t *oldumd, ptl_md_t *newumd,
+                              ptl_handle_eq_t *testqh);
+
+extern int lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh, 
+                       ptl_process_id_t *id,
+                       ptl_pt_index_t portal, ptl_ac_index_t ac,
+                       ptl_match_bits_t match_bits, ptl_size_t offset);
+extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh, 
+                       ptl_ack_req_t ack, ptl_process_id_t *id,
+                       ptl_pt_index_t portal, ptl_ac_index_t ac,
+                       ptl_match_bits_t match_bits, 
+                       ptl_size_t offset, ptl_hdr_data_t hdr_data);
+extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+
 #endif
diff --git a/lnet/include/lnet/lib-nal.h b/lnet/include/lnet/lib-nal.h
deleted file mode 100644 (file)
index d1d0495..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-#ifndef _LIB_NAL_H_
-#define _LIB_NAL_H_
-
-#include "build_check.h"
-/*
- * nal.h
- *
- * Library side headers that define the abstraction layer's
- * responsibilities and interfaces
- */
-
-#include <portals/lib-types.h>
-
-struct nal_cb_t {
-       /*
-        * Per interface portal table, access control table
-        * and NAL private data field;
-        */
-       lib_ni_t ni;
-       void *nal_data;
-       /*
-        * send: Sends a preformatted header and payload data to a
-        * specified remote process. The payload is scattered over 'niov'
-        * fragments described by iov, starting at 'offset' for 'mlen'
-        * bytes.  
-        * NB the NAL may NOT overwrite iov.  
-        * PTL_OK on success => NAL has committed to send and will call
-        * lib_finalize on completion
-        */
-       ptl_err_t (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
-                             ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                             unsigned int niov, struct iovec *iov, 
-                             size_t offset, size_t mlen);
-
-       /* as send, but with a set of page fragments (NULL if not supported) */
-       ptl_err_t (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie, 
-                                   ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                                   unsigned int niov, ptl_kiov_t *iov, 
-                                   size_t offset, size_t mlen);
-       /*
-        * recv: Receives an incoming message from a remote process.  The
-        * payload is to be received into the scattered buffer of 'niov'
-        * fragments described by iov, starting at 'offset' for 'mlen'
-        * bytes.  Payload bytes after 'mlen' up to 'rlen' are to be
-        * discarded.  
-        * NB the NAL may NOT overwrite iov.
-        * PTL_OK on success => NAL has committed to receive and will call
-        * lib_finalize on completion
-        */
-       ptl_err_t (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
-                             unsigned int niov, struct iovec *iov, 
-                             size_t offset, size_t mlen, size_t rlen);
-
-       /* as recv, but with a set of page fragments (NULL if not supported) */
-       ptl_err_t (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
-                                   unsigned int niov, ptl_kiov_t *iov, 
-                                   size_t offset, size_t mlen, size_t rlen);
-       /*
-        * read: Reads a block of data from a specified user address
-        */
-       ptl_err_t (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr,
-                             user_ptr src_addr, size_t len);
-
-       /*
-        * write: Writes a block of data into a specified user address
-        */
-       ptl_err_t (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr,
-                              void *src_addr, size_t len);
-
-       /*
-        * callback: Calls an event callback
-        * NULL => lib calls eq's callback (if any) directly.
-        */
-       void (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq,
-                            ptl_event_t *ev);
-
-       /*
-        *  malloc: Acquire a block of memory in a system independent
-        * fashion.
-        */
-       void *(*cb_malloc) (nal_cb_t * nal, size_t len);
-
-       void (*cb_free) (nal_cb_t * nal, void *buf, size_t len);
-
-       /*
-        * (un)map: Tell the NAL about some memory it will access.
-        * *addrkey passed to cb_unmap() is what cb_map() set it to.
-        * type of *iov depends on options.
-        * Set to NULL if not required.
-        */
-       ptl_err_t (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, 
-                            void **addrkey);
-       void (*cb_unmap) (nal_cb_t * nal, unsigned int niov, struct iovec *iov, 
-                         void **addrkey);
-
-       /* as (un)map, but with a set of page fragments */
-       ptl_err_t (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, 
-                                  void **addrkey);
-       void (*cb_unmap_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov, 
-                         void **addrkey);
-
-       void (*cb_printf) (nal_cb_t * nal, const char *fmt, ...);
-
-       /* Turn interrupts off (begin of protected area) */
-       void (*cb_cli) (nal_cb_t * nal, unsigned long *flags);
-
-       /* Turn interrupts on (end of protected area) */
-       void (*cb_sti) (nal_cb_t * nal, unsigned long *flags);
-
-       /*
-        * Calculate a network "distance" to given node
-        */
-       int (*cb_dist) (nal_cb_t * nal, ptl_nid_t nid, unsigned long *dist);
-};
-
-#endif
index efa929c..4daf219 100644 (file)
 #else
 # include <portals/list.h>
 # include <string.h>
+# include <pthread.h>
 #endif
 #include <portals/types.h>
 #include <linux/kp30.h>
 #include <portals/p30.h>
+#include <portals/nal.h>
 #include <portals/lib-types.h>
-#include <portals/lib-nal.h>
-#include <portals/lib-dispatch.h>
 
 static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
 {
@@ -31,17 +31,18 @@ static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
                 wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
 }
 
-#define state_lock(nal,flagsp)                          \
-do {                                                    \
-        CDEBUG(D_PORTALS, "taking state lock\n");       \
-        nal->cb_cli(nal, flagsp);                       \
-} while (0)
+#ifdef __KERNEL__
+#define LIB_LOCK(nal,flags)                                     \
+        spin_lock_irqsave(&(nal)->libnal_ni.ni_lock, flags)
+#define LIB_UNLOCK(nal,flags)                                   \
+        spin_unlock_irqrestore(&(nal)->libnal_ni.ni_lock, flags)
+#else
+#define LIB_LOCK(nal,flags)                                             \
+        (pthread_mutex_lock(&(nal)->libnal_ni.ni_mutex), (flags) = 0)
+#define LIB_UNLOCK(nal,flags)                                   \
+        pthread_mutex_unlock(&(nal)->libnal_ni.ni_mutex)
+#endif
 
-#define state_unlock(nal,flagsp)                        \
-{                                                       \
-        CDEBUG(D_PORTALS, "releasing state lock\n");    \
-        nal->cb_sti(nal, flagsp);                       \
-}
 
 #ifdef PTL_USE_LIB_FREELIST
 
@@ -50,13 +51,13 @@ do {                                                    \
 #define MAX_MSGS        2048    /* Outstanding messages */
 #define MAX_EQS         512
 
-extern int lib_freelist_init (nal_cb_t *nal, lib_freelist_t *fl, int nobj, int objsize);
-extern void lib_freelist_fini (nal_cb_t *nal, lib_freelist_t *fl);
+extern int lib_freelist_init (lib_nal_t *nal, lib_freelist_t *fl, int nobj, int objsize);
+extern void lib_freelist_fini (lib_nal_t *nal, lib_freelist_t *fl);
 
 static inline void *
 lib_freelist_alloc (lib_freelist_t *fl)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_freeobj_t *o;
 
         if (list_empty (&fl->fl_list))
@@ -70,7 +71,7 @@ lib_freelist_alloc (lib_freelist_t *fl)
 static inline void
 lib_freelist_free (lib_freelist_t *fl, void *obj)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_freeobj_t *o = list_entry (obj, lib_freeobj_t, fo_contents);
         
         list_add (&o->fo_list, &fl->fl_list);
@@ -78,78 +79,78 @@ lib_freelist_free (lib_freelist_t *fl, void *obj)
 
 
 static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_eq_t      *eq;
         
-        state_lock (nal, &flags);
-        eq = (lib_eq_t *)lib_freelist_alloc (&nal->ni.ni_free_eqs);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        eq = (lib_eq_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_eqs);
+        LIB_UNLOCK (nal, flags);
 
         return (eq);
 }
 
 static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_eqs, eq);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_eqs, eq);
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_md_t      *md;
         
-        state_lock (nal, &flags);
-        md = (lib_md_t *)lib_freelist_alloc (&nal->ni.ni_free_mds);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        md = (lib_md_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mds);
+        LIB_UNLOCK (nal, flags);
 
         return (md);
 }
 
 static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_mds, md);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_mds, md);
 }
 
 static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_me_t      *me;
         
-        state_lock (nal, &flags);
-        me = (lib_me_t *)lib_freelist_alloc (&nal->ni.ni_free_mes);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        me = (lib_me_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mes);
+        LIB_UNLOCK (nal, flags);
         
         return (me);
 }
 
 static inline void
-lib_me_free (nal_cb_t *nal, lib_me_t *me)
+lib_me_free (lib_nal_t *nal, lib_me_t *me)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_mes, me);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_mes, me);
 }
 
 static inline lib_msg_t *
-lib_msg_alloc (nal_cb_t *nal)
+lib_msg_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         unsigned long  flags;
         lib_msg_t     *msg;
         
-        state_lock (nal, &flags);
-        msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
-        state_unlock (nal, &flags);
+        LIB_LOCK (nal, flags);
+        msg = (lib_msg_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_msgs);
+        LIB_UNLOCK (nal, flags);
 
         if (msg != NULL) {
                 /* NULL pointers, clear flags etc */
@@ -160,18 +161,18 @@ lib_msg_alloc (nal_cb_t *nal)
 }
 
 static inline void
-lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free (lib_nal_t *nal, lib_msg_t *msg)
 {
-        /* ALWAYS called with statelock held */
-        lib_freelist_free (&nal->ni.ni_free_msgs, msg);
+        /* ALWAYS called with liblock held */
+        lib_freelist_free (&nal->libnal_ni.ni_free_msgs, msg);
 }
 
 #else
 
 static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_eq_t *eq;
 
         PORTAL_ALLOC(eq, sizeof(*eq));
@@ -179,16 +180,16 @@ lib_eq_alloc (nal_cb_t *nal)
 }
 
 static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(eq, sizeof(*eq));
 }
 
 static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_md_t *md;
         int       size;
         int       niov;
@@ -214,9 +215,9 @@ lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
 }
 
 static inline void 
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         int       size;
 
         if ((md->options & PTL_MD_KIOV) != 0)
@@ -228,9 +229,9 @@ lib_md_free (nal_cb_t *nal, lib_md_t *md)
 }
 
 static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
 {
-        /* NEVER called with statelock held */
+        /* NEVER called with liblock held */
         lib_me_t *me;
 
         PORTAL_ALLOC(me, sizeof(*me));
@@ -238,16 +239,16 @@ lib_me_alloc (nal_cb_t *nal)
 }
 
 static inline void 
-lib_me_free(nal_cb_t *nal, lib_me_t *me)
+lib_me_free(lib_nal_t *nal, lib_me_t *me)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(me, sizeof(*me));
 }
 
 static inline lib_msg_t *
-lib_msg_alloc(nal_cb_t *nal)
+lib_msg_alloc(lib_nal_t *nal)
 {
-        /* NEVER called with statelock held; may be in interrupt... */
+        /* NEVER called with liblock held; may be in interrupt... */
         lib_msg_t *msg;
 
         if (in_interrupt())
@@ -264,27 +265,28 @@ lib_msg_alloc(nal_cb_t *nal)
 }
 
 static inline void 
-lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free(lib_nal_t *nal, lib_msg_t *msg)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         PORTAL_FREE(msg, sizeof(*msg));
 }
 #endif
 
-extern lib_handle_t *lib_lookup_cookie (nal_cb_t *nal, __u64 cookie, int type);
-extern void lib_initialise_handle (nal_cb_t *nal, lib_handle_t *lh, int type);
-extern void lib_invalidate_handle (nal_cb_t *nal, lib_handle_t *lh);
+extern lib_handle_t *lib_lookup_cookie (lib_nal_t *nal, __u64 cookie, int type);
+extern void lib_initialise_handle (lib_nal_t *nal, lib_handle_t *lh, int type);
+extern void lib_invalidate_handle (lib_nal_t *nal, lib_handle_t *lh);
 
 static inline void
-ptl_eq2handle (ptl_handle_eq_t *handle, lib_eq_t *eq)
+ptl_eq2handle (ptl_handle_eq_t *handle, lib_nal_t *nal, lib_eq_t *eq)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = eq->eq_lh.lh_cookie;
 }
 
 static inline lib_eq_t *
-ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
+ptl_handle2eq (ptl_handle_eq_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie, 
                                               PTL_COOKIE_TYPE_EQ);
         if (lh == NULL)
@@ -294,15 +296,16 @@ ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
 }
 
 static inline void
-ptl_md2handle (ptl_handle_md_t *handle, lib_md_t *md)
+ptl_md2handle (ptl_handle_md_t *handle, lib_nal_t *nal, lib_md_t *md)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = md->md_lh.lh_cookie;
 }
 
 static inline lib_md_t *
-ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
+ptl_handle2md (ptl_handle_md_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
                                               PTL_COOKIE_TYPE_MD);
         if (lh == NULL)
@@ -312,12 +315,12 @@ ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
 }
 
 static inline lib_md_t *
-ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
+ptl_wire_handle2md (ptl_handle_wire_t *wh, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh;
         
-        if (wh->wh_interface_cookie != nal->ni.ni_interface_cookie)
+        if (wh->wh_interface_cookie != nal->libnal_ni.ni_interface_cookie)
                 return (NULL);
         
         lh = lib_lookup_cookie (nal, wh->wh_object_cookie,
@@ -329,15 +332,16 @@ ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
 }
 
 static inline void
-ptl_me2handle (ptl_handle_me_t *handle, lib_me_t *me)
+ptl_me2handle (ptl_handle_me_t *handle, lib_nal_t *nal, lib_me_t *me)
 {
+        handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
         handle->cookie = me->me_lh.lh_cookie;
 }
 
 static inline lib_me_t *
-ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
+ptl_handle2me (ptl_handle_me_t *handle, lib_nal_t *nal)
 {
-        /* ALWAYS called with statelock held */
+        /* ALWAYS called with liblock held */
         lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
                                               PTL_COOKIE_TYPE_ME);
         if (lh == NULL)
@@ -346,35 +350,30 @@ ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
         return (lh_entry (lh, lib_me_t, me_lh));
 }
 
-extern int lib_init(nal_cb_t *cb, ptl_process_id_t pid,
+extern int lib_init(lib_nal_t *libnal, nal_t *apinal,
+                    ptl_process_id_t pid,
                     ptl_ni_limits_t *desired_limits, 
                     ptl_ni_limits_t *actual_limits);
-extern int lib_fini(nal_cb_t * cb);
-extern void lib_dispatch(nal_cb_t * cb, void *private, int index,
-                         void *arg_block, void *ret_block);
-extern char *dispatch_name(int index);
+extern int lib_fini(lib_nal_t *libnal);
 
 /*
- * When the NAL detects an incoming message, it should call
- * lib_parse() decode it.  The NAL callbacks will be handed
- * the private cookie as a way for the NAL to maintain state
- * about which transaction is being processed.  An extra parameter,
- * lib_cookie will contain the necessary information for
- * finalizing the message.
- *
- * After it has finished the handling the message, it should
- * call lib_finalize() with the lib_cookie parameter.
- * Call backs will be made to write events, send acks or
- * replies and so on.
+ * When the NAL detects an incoming message header, it should call
+ * lib_parse() decode it.  If the message header is garbage, lib_parse()
+ * returns immediately with failure, otherwise the NAL callbacks will be
+ * called to receive the message body.  They are handed the private cookie
+ * as a way for the NAL to maintain state about which transaction is being
+ * processed.  An extra parameter, lib_msg contains the lib-level message
+ * state for passing to lib_finalize() when the message body has been
+ * received.
  */
-extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+extern void lib_enq_event_locked (lib_nal_t *nal, void *private,
                                   lib_eq_t *eq, ptl_event_t *ev);
-extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg, 
+extern void lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, 
                           ptl_ni_fail_t ni_fail_type);
-extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
-extern lib_msg_t *lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, 
+extern ptl_err_t lib_parse (lib_nal_t *nal, ptl_hdr_t *hdr, void *private);
+extern lib_msg_t *lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid, 
                                         lib_msg_t *get_msg);
-extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (lib_nal_t * nal, ptl_hdr_t * hdr);
 
 
 extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
@@ -397,14 +396,65 @@ extern int lib_extract_kiov (int dst_niov, ptl_kiov_t *dst,
 
 extern void lib_assert_wire_constants (void);
 
-extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+extern ptl_err_t lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
                            ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern ptl_err_t lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
                            ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                            lib_md_t *md, ptl_size_t offset, ptl_size_t len);
 
-extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
-                               ptl_md_t * md_out);
-extern void lib_md_unlink(nal_cb_t * nal, lib_md_t * md_in);
-extern void lib_me_unlink(nal_cb_t * nal, lib_me_t * me_in);
+extern int lib_api_ni_status (nal_t *nal, ptl_sr_index_t sr_idx,
+                              ptl_sr_value_t *status);
+extern int lib_api_ni_dist (nal_t *nal, ptl_process_id_t *pid, 
+                            unsigned long *dist);
+
+extern int lib_api_eq_alloc (nal_t *nal, ptl_size_t count,
+                             ptl_eq_handler_t callback, 
+                             ptl_handle_eq_t *handle);
+extern int lib_api_eq_free(nal_t *nal, ptl_handle_eq_t *eqh);
+extern int lib_api_eq_poll (nal_t *nal, 
+                            ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+                            ptl_event_t *event, int *which);
+
+extern int lib_api_me_attach(nal_t *nal,
+                             ptl_pt_index_t portal,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, 
+                             ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos,
+                             ptl_handle_me_t *handle);
+extern int lib_api_me_insert(nal_t *nal,
+                             ptl_handle_me_t *current_meh,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, 
+                             ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos,
+                             ptl_handle_me_t *handle);
+extern int lib_api_me_unlink (nal_t *nal, ptl_handle_me_t *meh);
+extern void lib_me_unlink(lib_nal_t *nal, lib_me_t *me);
+
+extern int lib_api_get_id(nal_t *nal, ptl_process_id_t *pid);
+
+extern void lib_md_unlink(lib_nal_t *nal, lib_md_t *md);
+extern void lib_md_deconstruct(lib_nal_t *nal, lib_md_t *lmd, ptl_md_t *umd);
+extern int lib_api_md_attach(nal_t *nal, ptl_handle_me_t *meh,
+                             ptl_md_t *umd, ptl_unlink_t unlink, 
+                             ptl_handle_md_t *handle);
+extern int lib_api_md_bind(nal_t *nal, ptl_md_t *umd, ptl_unlink_t unlink,
+                           ptl_handle_md_t *handle);
+extern int lib_api_md_unlink (nal_t *nal, ptl_handle_md_t *mdh);
+extern int lib_api_md_update (nal_t *nal, ptl_handle_md_t *mdh,
+                              ptl_md_t *oldumd, ptl_md_t *newumd,
+                              ptl_handle_eq_t *testqh);
+
+extern int lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh, 
+                       ptl_process_id_t *id,
+                       ptl_pt_index_t portal, ptl_ac_index_t ac,
+                       ptl_match_bits_t match_bits, ptl_size_t offset);
+extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh, 
+                       ptl_ack_req_t ack, ptl_process_id_t *id,
+                       ptl_pt_index_t portal, ptl_ac_index_t ac,
+                       ptl_match_bits_t match_bits, 
+                       ptl_size_t offset, ptl_hdr_data_t hdr_data);
+extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+
 #endif
index ef618c7..6549988 100644 (file)
@@ -13,6 +13,7 @@
 #include "build_check.h"
 
 #include <portals/types.h>
+#include <portals/nal.h>
 #ifdef __KERNEL__
 # include <linux/uio.h>
 # include <linux/smp_lock.h>
@@ -22,9 +23,6 @@
 # include <sys/types.h>
 #endif
 
-/* struct nal_cb_t is defined in lib-nal.h */
-typedef struct nal_cb_t nal_cb_t;
-
 typedef char *user_ptr;
 typedef struct lib_msg_t lib_msg_t;
 typedef struct lib_ptl_t lib_ptl_t;
@@ -165,11 +163,12 @@ typedef struct {
 struct lib_eq_t {
         struct list_head  eq_list;
         lib_handle_t      eq_lh;
-        ptl_seq_t         sequence;
-        ptl_size_t        size;
-        ptl_event_t      *base;
+        ptl_seq_t         eq_enq_seq;
+        ptl_seq_t         eq_deq_seq;
+        ptl_size_t        eq_size;
+        ptl_event_t      *eq_events;
         int               eq_refcount;
-        ptl_eq_handler_t  event_callback;
+        ptl_eq_handler_t  eq_callback;
         void             *eq_addrkey;
 };
 
@@ -244,29 +243,117 @@ typedef struct {
 /* PTL_COOKIE_TYPES must be a power of 2, so the cookie type can be
  * extracted by masking with (PTL_COOKIE_TYPES - 1) */
 
-typedef struct {
-        ptl_nid_t nid;
-        ptl_pid_t pid;
-        lib_ptl_t tbl;
-        lib_counters_t counters;
-        ptl_ni_limits_t actual_limits;
+typedef struct lib_ni 
+{
+        nal_t            *ni_api;
+        ptl_process_id_t  ni_pid;
+        lib_ptl_t         ni_portals;
+        lib_counters_t    ni_counters;
+        ptl_ni_limits_t   ni_actual_limits;
 
         int               ni_lh_hash_size;      /* size of lib handle hash table */
         struct list_head *ni_lh_hash_table;     /* all extant lib handles, this interface */
         __u64             ni_next_object_cookie; /* cookie generator */
         __u64             ni_interface_cookie;  /* uniquely identifies this ni in this epoch */
         
-        struct list_head ni_test_peers;
+        struct list_head  ni_test_peers;
         
 #ifdef PTL_USE_LIB_FREELIST
-        lib_freelist_t   ni_free_mes;
-        lib_freelist_t   ni_free_msgs;
-        lib_freelist_t   ni_free_mds;
-        lib_freelist_t   ni_free_eqs;
+        lib_freelist_t    ni_free_mes;
+        lib_freelist_t    ni_free_msgs;
+        lib_freelist_t    ni_free_mds;
+        lib_freelist_t    ni_free_eqs;
+#endif
+
+        struct list_head  ni_active_msgs;
+        struct list_head  ni_active_mds;
+        struct list_head  ni_active_eqs;
+
+#ifdef __KERNEL__
+        spinlock_t        ni_lock;
+        wait_queue_head_t ni_waitq;
+#else
+        pthread_mutex_t   ni_mutex;
+        pthread_cond_t    ni_cond;
 #endif
-        struct list_head ni_active_msgs;
-        struct list_head ni_active_mds;
-        struct list_head ni_active_eqs;
 } lib_ni_t;
 
+
+typedef struct lib_nal
+{
+       /* lib-level interface state */
+       lib_ni_t libnal_ni;
+
+       /* NAL-private data */
+       void *libnal_data;
+
+       /*
+        * send: Sends a preformatted header and payload data to a
+        * specified remote process. The payload is scattered over 'niov'
+        * fragments described by iov, starting at 'offset' for 'mlen'
+        * bytes.  
+        * NB the NAL may NOT overwrite iov.  
+        * PTL_OK on success => NAL has committed to send and will call
+        * lib_finalize on completion
+        */
+       ptl_err_t (*libnal_send) 
+                (struct lib_nal *nal, void *private, lib_msg_t *cookie, 
+                 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
+                 unsigned int niov, struct iovec *iov, 
+                 size_t offset, size_t mlen);
+        
+       /* as send, but with a set of page fragments (NULL if not supported) */
+       ptl_err_t (*libnal_send_pages)
+                (struct lib_nal *nal, void *private, lib_msg_t * cookie, 
+                 ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
+                 unsigned int niov, ptl_kiov_t *iov, 
+                 size_t offset, size_t mlen);
+       /*
+        * recv: Receives an incoming message from a remote process.  The
+        * payload is to be received into the scattered buffer of 'niov'
+        * fragments described by iov, starting at 'offset' for 'mlen'
+        * bytes.  Payload bytes after 'mlen' up to 'rlen' are to be
+        * discarded.  
+        * NB the NAL may NOT overwrite iov.
+        * PTL_OK on success => NAL has committed to receive and will call
+        * lib_finalize on completion
+        */
+       ptl_err_t (*libnal_recv) 
+                (struct lib_nal *nal, void *private, lib_msg_t * cookie,
+                 unsigned int niov, struct iovec *iov, 
+                 size_t offset, size_t mlen, size_t rlen);
+
+       /* as recv, but with a set of page fragments (NULL if not supported) */
+       ptl_err_t (*libnal_recv_pages) 
+                (struct lib_nal *nal, void *private, lib_msg_t * cookie,
+                 unsigned int niov, ptl_kiov_t *iov, 
+                 size_t offset, size_t mlen, size_t rlen);
+
+       /*
+        * (un)map: Tell the NAL about some memory it will access.
+        * *addrkey passed to libnal_unmap() is what libnal_map() set it to.
+        * type of *iov depends on options.
+        * Set to NULL if not required.
+        */
+       ptl_err_t (*libnal_map)
+                (struct lib_nal *nal, unsigned int niov, struct iovec *iov, 
+                 void **addrkey);
+       void (*libnal_unmap)
+                (struct lib_nal *nal, unsigned int niov, struct iovec *iov, 
+                 void **addrkey);
+
+       /* as (un)map, but with a set of page fragments */
+       ptl_err_t (*libnal_map_pages)
+                (struct lib_nal *nal, unsigned int niov, ptl_kiov_t *iov, 
+                 void **addrkey);
+       void (*libnal_unmap_pages)
+                (struct lib_nal *nal, unsigned int niov, ptl_kiov_t *iov, 
+                 void **addrkey);
+
+       void (*libnal_printf)(struct lib_nal *nal, const char *fmt, ...);
+
+       /* Calculate a network "distance" to given node */
+       int (*libnal_dist) (struct lib_nal *nal, ptl_nid_t nid, unsigned long *dist);
+} lib_nal_t;
+
 #endif
index 1f925c1..bf86569 100644 (file)
 
 #include <portals/types.h>
 
-#ifdef yield
-#undef yield
-#endif
-
 typedef struct nal_t nal_t;
 
 struct nal_t {
+       /* common interface state */
        int              nal_refct;
+        ptl_handle_ni_t  nal_handle;
+
+       /* NAL-private data */
        void            *nal_data;
 
-       int (*startup) (nal_t *nal, ptl_pid_t requested_pid,
-                       ptl_ni_limits_t *req, ptl_ni_limits_t *actual);
+       /* NAL API implementation 
+        * NB only nal_ni_init needs to be set when the NAL registers itself */
+       int (*nal_ni_init) (nal_t *nal, ptl_pid_t requested_pid,
+                           ptl_ni_limits_t *req, ptl_ni_limits_t *actual);
        
-       void (*shutdown) (nal_t *nal);
+       void (*nal_ni_fini) (nal_t *nal);
 
-       int (*forward) (nal_t *nal, int index,  /* Function ID */
-                       void *args, size_t arg_len, void *ret, size_t ret_len);
+       int (*nal_get_id) (nal_t *nal, ptl_process_id_t *id);
+       int (*nal_ni_status) (nal_t *nal, ptl_sr_index_t register, ptl_sr_value_t *status);
+       int (*nal_ni_dist) (nal_t *nal, ptl_process_id_t *id, unsigned long *distance);
+       int (*nal_fail_nid) (nal_t *nal, ptl_nid_t nid, unsigned int threshold);
 
-       int (*yield) (nal_t *nal, unsigned long *flags, int milliseconds);
+       int (*nal_me_attach) (nal_t *nal, ptl_pt_index_t portal,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos, 
+                             ptl_handle_me_t *handle);
+       int (*nal_me_insert) (nal_t *nal, ptl_handle_me_t *me,
+                             ptl_process_id_t match_id, 
+                             ptl_match_bits_t match_bits, ptl_match_bits_t ignore_bits,
+                             ptl_unlink_t unlink, ptl_ins_pos_t pos, 
+                             ptl_handle_me_t *handle);
+       int (*nal_me_unlink) (nal_t *nal, ptl_handle_me_t *me);
+       
+       int (*nal_md_attach) (nal_t *nal, ptl_handle_me_t *me,
+                             ptl_md_t *md, ptl_unlink_t unlink, 
+                             ptl_handle_md_t *handle);
+       int (*nal_md_bind) (nal_t *nal, 
+                           ptl_md_t *md, ptl_unlink_t unlink, 
+                           ptl_handle_md_t *handle);
+       int (*nal_md_unlink) (nal_t *nal, ptl_handle_md_t *md);
+       int (*nal_md_update) (nal_t *nal, ptl_handle_md_t *md,
+                             ptl_md_t *old_md, ptl_md_t *new_md,
+                             ptl_handle_eq_t *testq);
 
-       void (*lock) (nal_t *nal, unsigned long *flags);
+       int (*nal_eq_alloc) (nal_t *nal, ptl_size_t count,
+                            ptl_eq_handler_t handler,
+                            ptl_handle_eq_t *handle);
+       int (*nal_eq_free) (nal_t *nal, ptl_handle_eq_t *eq);
+       int (*nal_eq_poll) (nal_t *nal, 
+                           ptl_handle_eq_t *eqs, int neqs, int timeout,
+                           ptl_event_t *event, int *which);
 
-       void (*unlock) (nal_t *nal, unsigned long *flags);
+       int (*nal_ace_entry) (nal_t *nal, ptl_ac_index_t index,
+                             ptl_process_id_t match_id, ptl_pt_index_t portal);
+       
+       int (*nal_put) (nal_t *nal, ptl_handle_md_t *md, ptl_ack_req_t ack,
+                       ptl_process_id_t *target, ptl_pt_index_t portal,
+                       ptl_ac_index_t ac, ptl_match_bits_t match,
+                       ptl_size_t offset, ptl_hdr_data_t hdr_data);
+       int (*nal_get) (nal_t *nal, ptl_handle_md_t *md,
+                       ptl_process_id_t *target, ptl_pt_index_t portal,
+                       ptl_ac_index_t ac, ptl_match_bits_t match,
+                       ptl_size_t offset);
 };
 
-extern nal_t *ptl_hndl2nal(ptl_handle_any_t * any);
+extern nal_t *ptl_hndl2nal(ptl_handle_any_t *any);
 
 #ifdef __KERNEL__
 extern int ptl_register_nal(ptl_interface_t interface, nal_t *nal);
index ef2712b..250b954 100644 (file)
@@ -153,17 +153,6 @@ typedef void (*ptl_eq_handler_t)(ptl_event_t *event);
 #define PTL_EQ_HANDLER_NONE NULL
 
 typedef struct {
-        volatile ptl_seq_t sequence;
-        ptl_size_t size;
-        ptl_event_t *base;
-        ptl_handle_any_t cb_eq_handle;
-} ptl_eq_t;
-
-typedef struct {
-        ptl_eq_t *eq;
-} ptl_ni_t;
-
-typedef struct {
        int max_mes;
        int max_mds;
        int max_eqs;
index e48552e..ca98f84 100644 (file)
@@ -190,7 +190,6 @@ typedef struct _gmnal_rxtwe {
 #define NRXTHREADS 10 /* max number of receiver threads */
 
 typedef struct _gmnal_data_t {
-       spinlock_t      cb_lock;
        spinlock_t      stxd_lock;
        struct semaphore stxd_token;
        gmnal_stxd_t    *stxd;
@@ -205,7 +204,7 @@ typedef struct _gmnal_data_t {
        gmnal_srxd_t    *srxd;
        struct gm_hash  *srxd_hash;
        nal_t           *nal;   
-       nal_cb_t        *nal_cb;
+       lib_nal_t       *libnal;
        struct gm_port  *gm_port;
        unsigned int    gm_local_nid;
        unsigned int    gm_global_nid;
@@ -298,7 +297,6 @@ extern gmnal_data_t *global_nal_data;
 #define GMNAL_GM_LOCK_INIT(a)          spin_lock_init(&a->gm_lock);
 #define GMNAL_GM_LOCK(a)               spin_lock(&a->gm_lock);
 #define GMNAL_GM_UNLOCK(a)             spin_unlock(&a->gm_lock);
-#define GMNAL_CB_LOCK_INIT(a)          spin_lock_init(&a->cb_lock);
 
 
 /*
@@ -340,39 +338,19 @@ void gmnal_api_unlock(nal_t *, unsigned long *);
  *     CB NAL
  */
 
-int gmnal_cb_send(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_cb_send(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
        int, ptl_nid_t, ptl_pid_t, unsigned int, struct iovec *, size_t);
 
-int gmnal_cb_send_pages(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_cb_send_pages(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
        int, ptl_nid_t, ptl_pid_t, unsigned int, ptl_kiov_t *, size_t);
 
-int gmnal_cb_recv(nal_cb_t *, void *, lib_msg_t *, 
+int gmnal_cb_recv(lib_nal_t *, void *, lib_msg_t *, 
        unsigned int, struct iovec *, size_t, size_t);
 
-int gmnal_cb_recv_pages(nal_cb_t *, void *, lib_msg_t *, 
+int gmnal_cb_recv_pages(lib_nal_t *, void *, lib_msg_t *, 
        unsigned int, ptl_kiov_t *, size_t, size_t);
 
-int gmnal_cb_read(nal_cb_t *, void *private, void *, user_ptr, size_t);
-
-int gmnal_cb_write(nal_cb_t *, void *private, user_ptr, void *, size_t);
-
-int gmnal_cb_callback(nal_cb_t *, void *, lib_eq_t *, ptl_event_t *);
-
-void *gmnal_cb_malloc(nal_cb_t *, size_t);
-
-void gmnal_cb_free(nal_cb_t *, void *, size_t);
-
-void gmnal_cb_unmap(nal_cb_t *, unsigned int, struct iovec*, void **);
-
-int  gmnal_cb_map(nal_cb_t *, unsigned int, struct iovec*, void **); 
-
-void gmnal_cb_printf(nal_cb_t *, const char *fmt, ...);
-
-void gmnal_cb_cli(nal_cb_t *, unsigned long *);
-
-void gmnal_cb_sti(nal_cb_t *, unsigned long *);
-
-int gmnal_cb_dist(nal_cb_t *, ptl_nid_t, unsigned long *);
+int gmnal_cb_dist(lib_nal_t *, ptl_nid_t, unsigned long *);
 
 int gmnal_init(void);
 
@@ -381,22 +359,14 @@ void  gmnal_fini(void);
 
 
 #define GMNAL_INIT_NAL_CB(a)   do {    \
-                               a->cb_send = gmnal_cb_send; \
-                               a->cb_send_pages = gmnal_cb_send_pages; \
-                               a->cb_recv = gmnal_cb_recv; \
-                               a->cb_recv_pages = gmnal_cb_recv_pages; \
-                               a->cb_read = gmnal_cb_read; \
-                               a->cb_write = gmnal_cb_write; \
-                               a->cb_callback = gmnal_cb_callback; \
-                               a->cb_malloc = gmnal_cb_malloc; \
-                               a->cb_free = gmnal_cb_free; \
-                               a->cb_map = NULL; \
-                               a->cb_unmap = NULL; \
-                               a->cb_printf = gmnal_cb_printf; \
-                               a->cb_cli = gmnal_cb_cli; \
-                               a->cb_sti = gmnal_cb_sti; \
-                               a->cb_dist = gmnal_cb_dist; \
-                               a->nal_data = NULL; \
+                               a->libnal_send = gmnal_cb_send; \
+                               a->libnal_send_pages = gmnal_cb_send_pages; \
+                               a->libnal_recv = gmnal_cb_recv; \
+                               a->libnal_recv_pages = gmnal_cb_recv_pages; \
+                               a->libnal_map = NULL; \
+                               a->libnal_unmap = NULL; \
+                               a->libnal_dist = gmnal_cb_dist; \
+                               a->libnal_data = NULL; \
                                } while (0)
 
 
@@ -451,9 +421,9 @@ void                gmnal_remove_rxtwe(gmnal_data_t *);
 /*
  *     Small messages
  */
-int            gmnal_small_rx(nal_cb_t *, void *, lib_msg_t *, unsigned int, 
+int            gmnal_small_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int, 
                                struct iovec *, size_t, size_t);
-int            gmnal_small_tx(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *, 
+int            gmnal_small_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *, 
                                int, ptl_nid_t, ptl_pid_t, 
                                unsigned int, struct iovec*, int);
 void           gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
@@ -463,10 +433,10 @@ void              gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
 /*
  *     Large messages
  */
-int            gmnal_large_rx(nal_cb_t *, void *, lib_msg_t *, unsigned int, 
+int            gmnal_large_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int, 
                                struct iovec *, size_t, size_t);
 
-int            gmnal_large_tx(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *, 
+int            gmnal_large_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *, 
                                int, ptl_nid_t, ptl_pid_t, unsigned int, 
                                struct iovec*, int);
 
index 7c94f93..002587d 100644 (file)
@@ -50,77 +50,6 @@ static ctl_table gmnalnal_top_sysctl_table[] = {
         { 0 }
 };
 
-
-
-
-
-
-/*
- *     gmnal_api_forward
- *     This function takes a pack block of arguments from the NAL API
- *     module and passes them to the NAL CB module. The CB module unpacks
- *     the args and calls the appropriate function indicated by index.
- *     Typically this function is used to pass args between kernel and use
- *     space.
- *     As lgmanl exists entirely in kernel, just pass the arg block directly 
- *     to the NAL CB, buy passing the args to lib_dispatch
- *     Arguments are
- *     nal_t   nal     Our nal
- *     int     index   the api function that initiated this call 
- *     void    *args   packed block of function args
- *     size_t  arg_len length of args block
- *     void    *ret    A return value for the API NAL
- *     size_t  ret_len Size of the return value
- *     
- */
-
-int
-gmnal_api_forward(nal_t *nal, int index, void *args, size_t arg_len,
-               void *ret, size_t ret_len)
-{
-
-       nal_cb_t        *nal_cb = NULL;
-       gmnal_data_t    *nal_data = NULL;
-
-
-
-
-
-       if (!nal || !args || (index < 0) || (arg_len < 0)) {
-                       CDEBUG(D_ERROR, "Bad args to gmnal_api_forward\n");
-               return (PTL_FAIL);
-       }
-
-       if (ret && (ret_len <= 0)) {
-               CDEBUG(D_ERROR, "Bad args to gmnal_api_forward\n");
-               return (PTL_FAIL);
-       }
-
-
-       if (!nal->nal_data) {
-               CDEBUG(D_ERROR, "bad nal, no nal data\n");      
-               return (PTL_FAIL);
-       }
-       
-       nal_data = nal->nal_data;
-       CDEBUG(D_INFO, "nal_data is [%p]\n", nal_data); 
-
-       if (!nal_data->nal_cb) {
-               CDEBUG(D_ERROR, "bad nal_data, no nal_cb\n");   
-               return (PTL_FAIL);
-       }
-       
-       nal_cb = nal_data->nal_cb;
-       CDEBUG(D_INFO, "nal_cb is [%p]\n", nal_cb);     
-       
-       CDEBUG(D_PORTALS, "gmnal_api_forward calling lib_dispatch\n");
-       lib_dispatch(nal_cb, NULL, index, args, ret);
-       CDEBUG(D_PORTALS, "gmnal_api_forward returns from lib_dispatch\n");
-
-       return(PTL_OK);
-}
-
-
 /*
  *     gmnal_api_shutdown
  *      nal_refct == 0 => called on last matching PtlNIFini()
@@ -131,7 +60,7 @@ void
 gmnal_api_shutdown(nal_t *nal, int interface)
 {
        gmnal_data_t    *nal_data;
-       nal_cb_t        *nal_cb;
+       lib_nal_t       *libnal;
 
         if (nal->nal_refct != 0)
                 return;
@@ -139,9 +68,9 @@ gmnal_api_shutdown(nal_t *nal, int interface)
        CDEBUG(D_TRACE, "gmnal_api_shutdown: nal_data [%p]\n", nal_data);
 
         LASSERT(nal == global_nal_data->nal);
-        nal_data = nal->nal_data;
+        libnal = (lib_nal_t *)nal->nal_data;
+        nal_data = (gmnal_data_t *)libnal->libnal_data;
         LASSERT(nal_data == global_nal_data);
-        nal_cb = nal_data->nal_cb;
 
         /* Stop portals calling our ioctl handler */
         libcfs_nal_cmd_unregister(GMNAL);
@@ -150,7 +79,7 @@ gmnal_api_shutdown(nal_t *nal, int interface)
          * flag so when lib calls us we fail immediately and dont queue any
          * more work but our threads can still call into lib OK.  THEN
          * shutdown our threads, THEN lib_fini() */
-        lib_fini(nal_cb);
+        lib_fini(libnal);
 
        gmnal_stop_rxthread(nal_data);
        gmnal_stop_ctthread(nal_data);
@@ -162,94 +91,22 @@ gmnal_api_shutdown(nal_t *nal, int interface)
        GMNAL_GM_UNLOCK(nal_data);
         if (nal_data->sysctl)
                 unregister_sysctl_table (nal_data->sysctl);
-       PORTAL_FREE(nal, sizeof(nal_t));        
+        /* Don't free 'nal'; it's a static struct */
        PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-       PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+       PORTAL_FREE(libnal, sizeof(lib_nal_t));
 
         global_nal_data = NULL;
         PORTAL_MODULE_UNUSE;
 }
 
 
-/*
- *     gmnal_api_validate
- *     validate a user address for use in communications
- *     There's nothing to be done here
- */
-int
-gmnal_api_validate(nal_t *nal, void *base, size_t extent)
-{
-
-       return(PTL_OK);
-}
-
-
-
-/*
- *     gmnal_api_yield
- *     Give up the processor
- */
-void
-gmnal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
-       CDEBUG(D_TRACE, "gmnal_api_yield : nal [%p]\n", nal);
-
-        if (milliseconds != 0) {
-                CERROR("Blocking yield not implemented yet\n");
-                LBUG();
-        }
-
-        our_cond_resched();
-       return;
-}
-
-
-
-/*
- *     gmnal_api_lock
- *     Take a threadsafe lock
- */
-void
-gmnal_api_lock(nal_t *nal, unsigned long *flags)
-{
-
-       gmnal_data_t    *nal_data;
-       nal_cb_t        *nal_cb;
-
-       nal_data = nal->nal_data;
-       nal_cb = nal_data->nal_cb;
-
-       nal_cb->cb_cli(nal_cb, flags);
-
-       return;
-}
-
-/*
- *     gmnal_api_unlock
- *     Release a threadsafe lock
- */
-void
-gmnal_api_unlock(nal_t *nal, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data;
-       nal_cb_t        *nal_cb;
-
-       nal_data = nal->nal_data;
-       nal_cb = nal_data->nal_cb;
-
-       nal_cb->cb_sti(nal_cb, flags);
-
-       return;
-}
-
-
 int
 gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                   ptl_ni_limits_t *requested_limits,
                   ptl_ni_limits_t *actual_limits)
 {
 
-       nal_cb_t        *nal_cb = NULL;
+       lib_nal_t       *libnal = NULL;
        gmnal_data_t    *nal_data = NULL;
        gmnal_srxd_t    *srxd = NULL;
        gm_status_t     gm_status;
@@ -258,9 +115,8 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
 
         if (nal->nal_refct != 0) {
                 if (actual_limits != NULL) {
-                        nal_data = (gmnal_data_t *)nal->nal_data;
-                        nal_cb = nal_data->nal_cb;
-                        *actual_limits = nal->_cb->ni.actual_limits;
+                        libnal = (lib_nal_t *)nal->nal_data;
+                        *actual_limits = nal->libnal_ni.ni_actual_limits;
                 return (PTL_OK);
         }
 
@@ -283,24 +139,22 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data);
        CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size);
 
-       PORTAL_ALLOC(nal_cb, sizeof(nal_cb_t));
-       if (!nal_cb) {
+       PORTAL_ALLOC(libnal, sizeof(lib_nal_t));
+       if (!libnal) {
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
                return(PTL_NO_SPACE);
        }
-       memset(nal_cb, 0, sizeof(nal_cb_t));
-       CDEBUG(D_INFO, "Allocd and reset nal_cb[%p]\n", nal_cb);
+       memset(libnal, 0, sizeof(lib_nal_t));
+       CDEBUG(D_INFO, "Allocd and reset libnal[%p]\n", libnal);
 
-       GMNAL_INIT_NAL_CB(nal_cb);
+       GMNAL_INIT_NAL_CB(libnal);
        /*
         *      String them all together
         */
-       nal->nal_data = (void*)nal_data;
-       nal_cb->nal_data = (void*)nal_data;
+       libnal->libnal_data = (void*)nal_data;
        nal_data->nal = nal;
-       nal_data->nal_cb = nal_cb;
+       nal_data->libnal = libnal;
 
-       GMNAL_CB_LOCK_INIT(nal_data);
        GMNAL_GM_LOCK_INIT(nal_data);
 
 
@@ -311,7 +165,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        if (gm_init() != GM_SUCCESS) {
                CDEBUG(D_ERROR, "call to gm_init failed\n");
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
@@ -356,7 +210,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
@@ -373,7 +227,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
@@ -402,7 +256,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
 
@@ -434,7 +288,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
        nal_data->gm_local_nid = local_nid;
@@ -454,7 +308,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
        }
        CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid);
@@ -471,7 +325,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", process_id.nid);
        
        CDEBUG(D_PORTALS, "calling lib_init\n");
-       if (lib_init(nal_cb, process_id, 
+       if (lib_init(libnal, nal, process_id, 
                      requested_limits, actual_limits) != PTL_OK) {
                CDEBUG(D_ERROR, "lib_init failed\n");
                gmnal_stop_rxthread(nal_data);
@@ -483,7 +337,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
                
        }
@@ -493,7 +347,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
 
                 /* XXX these cleanup cases should be restructured to
                  * minimise duplication... */
-                lib_fini(nal_cb);
+                lib_fini(libnal);
                 
                gmnal_stop_rxthread(nal_data);
                gmnal_stop_ctthread(nal_data);
@@ -504,7 +358,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
                gm_finalize();
                GMNAL_GM_UNLOCK(nal_data);
                PORTAL_FREE(nal_data, sizeof(gmnal_data_t));    
-               PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+               PORTAL_FREE(libnal, sizeof(lib_nal_t));
                return(PTL_FAIL);
         }
 
@@ -550,10 +404,6 @@ int gmnal_init(void)
  */
 void gmnal_fini()
 {
-       gmnal_data_t    *nal_data = global_nal_data;
-       nal_t           *nal = nal_data->nal;
-       nal_cb_t        *nal_cb = nal_data->nal_cb;
-
        CDEBUG(D_TRACE, "gmnal_fini\n");
 
         LASSERT(global_nal_data == NULL);
index ece1380..e99d3ec 100644 (file)
@@ -27,7 +27,7 @@
 
 #include "gmnal.h"
 
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+int gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                   unsigned int niov, struct iovec *iov, size_t mlen, 
                   size_t rlen)
 {
@@ -35,19 +35,19 @@ int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        int             status = PTL_OK;
 
 
-       CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], "
+       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
               "niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n", 
-              nal_cb, private, cookie, niov, iov, mlen, rlen);
+              libnal, private, cookie, niov, iov, mlen, rlen);
 
        switch(srxd->type) {
        case(GMNAL_SMALL_MESSAGE):
                CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, niov, 
+               status = gmnal_small_rx(libnal, private, cookie, niov, 
                                         iov, mlen, rlen);
        break;
        case(GMNAL_LARGE_MESSAGE_INIT):
                CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(nal_cb, private, cookie, niov, 
+               status = gmnal_large_rx(libnal, private, cookie, niov, 
                                         iov, mlen, rlen);
        }
                
@@ -56,7 +56,7 @@ int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        return(status);
 }
 
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+int gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                         unsigned int kniov, ptl_kiov_t *kiov, size_t mlen, 
                         size_t rlen)
 {
@@ -67,9 +67,9 @@ int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        ptl_kiov_t      *kiov_dup = kiov;;
 
 
-       CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], "
+       CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
               "cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-              nal_cb, private, cookie, kniov, kiov, mlen, rlen);
+              libnal, private, cookie, kniov, kiov, mlen, rlen);
 
        if (srxd->type == GMNAL_SMALL_MESSAGE) {
                PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
@@ -98,7 +98,7 @@ int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
                         kiov++;
                }
                CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, kniov, 
+               status = gmnal_small_rx(libnal, private, cookie, kniov, 
                                         iovec_dup, mlen, rlen);
                for (i=0; i<kniov; i++) {
                        kunmap(kiov_dup->kiov_page);
@@ -113,7 +113,7 @@ int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
 }
 
 
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+int gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                   ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
                   unsigned int niov, struct iovec *iov, size_t len)
 {
@@ -123,24 +123,25 @@ int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
 
        CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n", 
               niov, len, nid);
-       nal_data = nal_cb->nal_data;
+       nal_data = libnal->libnal_data;
        
        if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
                CDEBUG(D_INFO, "This is a small message send\n");
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
+               gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid, 
                                niov, iov, len);
        } else {
                CDEBUG(D_ERROR, "Large message send it is not supported\n");
-               lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+               lib_finalize(libnal, private, cookie, PTL_FAIL);
                return(PTL_FAIL);
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
+               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid, 
                                niov, iov, len);
        }
        return(PTL_OK);
 }
 
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
+int gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
+                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
 {
 
        int     i = 0;
@@ -149,7 +150,7 @@ int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        ptl_kiov_t      *kiov_dup = kiov;
 
        CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
-       nal_data = nal_cb->nal_data;
+       nal_data = libnal->libnal_data;
        PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
         iovec_dup = iovec;
        if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
@@ -168,7 +169,7 @@ int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
                         iovec++;
                         kiov++;
                }
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, 
+               gmnal_small_tx(libnal, private, cookie, hdr, type, nid, 
                                pid, kniov, iovec_dup, len);
        } else {
                CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
@@ -185,7 +186,7 @@ int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
                         iovec++;
                         kiov++;
                }
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, 
+               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
                                pid, kniov, iovec, len);
        }
        for (i=0; i<kniov; i++) {
@@ -196,94 +197,7 @@ int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        return(PTL_OK);
 }
 
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst, 
-                  user_ptr src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, 
-                   void *src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, 
-                      ptl_event_t *ev)
-{
-
-       if (eq->event_callback != NULL) {
-               CDEBUG(D_INFO, "found callback\n");
-               eq->event_callback(ev);
-       }
-       
-       return(PTL_OK);
-}
-
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
-       void *ptr = NULL;
-       CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
-       PORTAL_ALLOC(ptr, len);
-       return(ptr);
-}
-
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
-       PORTAL_FREE(buf, len);
-       return;
-}
-
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                    void **addrkey)
-{
-       return;
-}
-
-int  gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                  void**addrkey)
-{
-       return(PTL_OK);
-}
-
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_printf\n");
-       printk(fmt);
-       return;
-}
-
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_lock_irqsave(&nal_data->cb_lock, *flags);
-       return;
-}
-
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
-       return;
-}
-
-void gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding cb_lock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        /* We will wake theads sleeping in yield() here, AFTER the
-         * callback, when we implement blocking yield */
-}
-
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
+int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
        CDEBUG(D_TRACE, "gmnal_cb_dist\n");
        if (dist)
index 1bcd9bd..4af7186 100644 (file)
@@ -189,6 +189,7 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
        unsigned int snode, sport, type, length;
        gmnal_msghdr_t  *gmnal_msghdr;
        ptl_hdr_t       *portals_hdr;
+        int              rc;
 
        CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
               nal_data, we, gmnal_type);
@@ -219,10 +220,12 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
         */
        srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
        CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
-       srxd->nal_data = nal_data;
        if (!srxd) {
                CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
-               lib_parse(nal_data->nal_cb, portals_hdr, srxd);
+                /* I think passing a NULL srxd to lib_parse will crash
+                 * gmnal_recv() */
+                LBUG();
+               lib_parse(nal_data->libnal, portals_hdr, srxd);
                return(GMNAL_STATUS_FAIL);
        }
 
@@ -234,6 +237,7 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
                return(GMNAL_STATUS_OK);
        }
 
+       srxd->nal_data = nal_data;
        srxd->type = gmnal_type;
        srxd->nsiov = gmnal_msghdr->niov;
        srxd->gm_source_node = gmnal_msghdr->sender_node_id;
@@ -245,7 +249,12 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
         *      cb_recv is responsible for returning the buffer 
         *      for future receive
         */
-       lib_parse(nal_data->nal_cb, portals_hdr, srxd);
+       rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
+
+        if (rc != PTL_OK) {
+                /* I just received garbage; take appropriate action... */
+                LBUG();
+        }
 
        return(GMNAL_STATUS_OK);
 }
@@ -309,19 +318,19 @@ gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
  *     Call lib_finalize
  */
 int
-gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
 {
        gmnal_srxd_t    *srxd = NULL;
        void    *buffer = NULL;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
 
 
        CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
 
        if (!private) {
                CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
-               lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+               lib_finalize(libnal, private, cookie, PTL_FAIL);
                return(PTL_FAIL);
        }
 
@@ -343,7 +352,7 @@ gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
         *      let portals library know receive is complete
         */
        CDEBUG(D_PORTALS, "calling lib_finalize\n");
-       lib_finalize(nal_cb, private, cookie, PTL_OK);
+       lib_finalize(libnal, private, cookie, PTL_OK);
        /*
         *      return buffer so it can be used again
         */
@@ -365,11 +374,11 @@ gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
  *     The callback function informs when the send is complete.
  */
 int
-gmnal_small_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
                unsigned int niov, struct iovec *iov, int size)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
        gmnal_stxd_t    *stxd = NULL;
        void            *buffer = NULL;
        gmnal_msghdr_t  *msghdr = NULL;
@@ -377,9 +386,9 @@ gmnal_small_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        unsigned int    local_nid;
        gm_status_t     gm_status = GM_SUCCESS;
 
-       CDEBUG(D_TRACE, "gmnal_small_tx nal_cb [%p] private [%p] cookie [%p] "
+       CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
               "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
-              "iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, 
+              "iov [%p] size [%d]\n", libnal, private, cookie, hdr, type, 
               global_nid, pid, niov, iov, size);
 
        CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
@@ -472,7 +481,7 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
        gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
        lib_msg_t       *cookie = stxd->cookie;
        gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
-       nal_cb_t        *nal_cb = nal_data->nal_cb;
+       lib_nal_t       *libnal = nal_data->libnal;
 
        if (!stxd) {
                CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
@@ -592,7 +601,7 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                return;
        }
        gmnal_return_stxd(nal_data, stxd);
-       lib_finalize(nal_cb, stxd, cookie, PTL_OK);
+       lib_finalize(libnal, stxd, cookie, PTL_OK);
        return;
 }
 
@@ -645,7 +654,7 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
  *     this ack, deregister the memory. Only 1 send token is required here.
  */
 int
-gmnal_large_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
                unsigned int niov, struct iovec *iov, int size)
 {
@@ -661,15 +670,15 @@ gmnal_large_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        int             niov_dup;
 
 
-       CDEBUG(D_TRACE, "gmnal_large_tx nal_cb [%p] private [%p], cookie [%p] "
+       CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
               "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
-              "iov [%p], size [%d]\n", nal_cb, private, cookie, hdr, type, 
+              "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
               global_nid, pid, niov, iov, size);
 
-       if (nal_cb)
-               nal_data = (gmnal_data_t*)nal_cb->nal_data;
+       if (libnal)
+               nal_data = (gmnal_data_t*)libnal->nal_data;
        else  {
-               CDEBUG(D_ERROR, "no nal_cb.\n");
+               CDEBUG(D_ERROR, "no libnal.\n");
                return(GMNAL_STATUS_FAIL);
        }
        
@@ -811,11 +820,11 @@ gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
  *     data from the sender.
  */
 int
-gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
+gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                unsigned int nriov, struct iovec *riov, size_t mlen, 
                size_t rlen)
 {
-       gmnal_data_t    *nal_data = nal_cb->nal_data;
+       gmnal_data_t    *nal_data = libnal->nal_data;
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
        void            *buffer = NULL;
        struct  iovec   *riov_dup;
@@ -823,13 +832,13 @@ gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
        gmnal_msghdr_t  *msghdr = NULL;
        gm_status_t     gm_status;
 
-       CDEBUG(D_TRACE, "gmnal_large_rx :: nal_cb[%p], private[%p], "
+       CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
               "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-               nal_cb, private, cookie, nriov, riov, mlen, rlen);
+               libnal, private, cookie, nriov, riov, mlen, rlen);
 
        if (!srxd) {
                CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
-               lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+               lib_finalize(libnal, private, cookie, PTL_FAIL);
                return(PTL_FAIL);
        }
 
@@ -1092,7 +1101,7 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
 
        gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
        gmnal_srxd_t    *srxd = ltxd->srxd;
-       nal_cb_t        *nal_cb = srxd->nal_data->nal_cb;
+       lib_nal_t       *libnal = srxd->nal_data->libnal;
        int             lastone;
        struct  iovec   *riov;
        int             nriov;
@@ -1126,7 +1135,7 @@ gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
         *      Let our client application proceed
         */     
        CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
-       lib_finalize(nal_cb, srxd, srxd->cookie, PTL_OK);
+       lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
 
        /*
         *      send an ack to the sender to let him know we got the data
@@ -1276,7 +1285,7 @@ gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
 void 
 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
 {
-       nal_cb_t        *nal_cb = nal_data->nal_cb;
+       lib_nal_t       *libnal = nal_data->libnal;
        gmnal_stxd_t    *stxd = NULL;
        gmnal_msghdr_t  *msghdr = NULL;
        void            *buffer = NULL;
@@ -1291,7 +1300,7 @@ gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
 
        CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
 
-       lib_finalize(nal_cb, stxd, stxd->cookie, PTL_OK);
+       lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
 
        /*
         *      extract the iovec from the stxd, deregister the memory.
index f4005de..c595450 100644 (file)
@@ -43,6 +43,9 @@ kpr_nal_interface_t kqswnal_router_interface = {
 #define QSWNAL_SYSCTL_COPY_SMALL_FWD     2
 
 static ctl_table kqswnal_ctl_table[] = {
+       {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_puts",
+        &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
+        0644, NULL, &proc_dointvec},
        {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
         &kqswnal_tunables.kqn_optimized_gets, sizeof (int),
         0644, NULL, &proc_dointvec},
@@ -55,88 +58,6 @@ static ctl_table kqswnal_top_ctl_table[] = {
 };
 #endif
 
-static int
-kqswnal_forward(nal_t   *nal,
-               int     id,
-               void    *args,  size_t args_len,
-               void    *ret,   size_t ret_len)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
-       return (PTL_OK);
-}
-
-static void
-kqswnal_lock (nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_cli(nal_cb,flags);
-}
-
-static void
-kqswnal_unlock(nal_t *nal, unsigned long *flags)
-{
-       kqswnal_data_t *k = nal->nal_data;
-       nal_cb_t       *nal_cb = k->kqn_cb;
-
-       LASSERT (nal == &kqswnal_api);
-       LASSERT (k == &kqswnal_data);
-       LASSERT (nal_cb == &kqswnal_lib);
-
-       nal_cb->cb_sti(nal_cb,flags);
-}
-
-static int
-kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
-       /* NB called holding statelock */
-        wait_queue_t       wait;
-       unsigned long      now = jiffies;
-
-       CDEBUG (D_NET, "yield\n");
-
-       if (milliseconds == 0) {
-               if (need_resched())
-                       schedule();
-               return 0;
-       }
-
-       init_waitqueue_entry(&wait, current);
-       set_current_state(TASK_INTERRUPTIBLE);
-       add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
-       kqswnal_unlock(nal, flags);
-
-       if (milliseconds < 0)
-               schedule ();
-       else
-               schedule_timeout((milliseconds * HZ) / 1000);
-       
-       kqswnal_lock(nal, flags);
-
-       remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
-       if (milliseconds > 0) {
-               milliseconds -= ((jiffies - now) * 1000) / HZ;
-               if (milliseconds < 0)
-                       milliseconds = 0;
-       }
-       
-       return (milliseconds);
-}
-
 int
 kqswnal_get_tx_desc (struct portals_cfg *pcfg)
 {
@@ -186,7 +107,7 @@ kqswnal_cmd (struct portals_cfg *pcfg, void *private)
                        kqswnal_data.kqn_nid_offset);
                kqswnal_data.kqn_nid_offset =
                        pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
-               kqswnal_lib.ni.nid = pcfg->pcfg_nid;
+               kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
                return (0);
                
        default:
@@ -469,9 +390,11 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        ptl_process_id_t  my_process_id;
        int               pkmem = atomic_read(&portal_kmemory);
 
+       LASSERT (nal == &kqswnal_api);
+
        if (nal->nal_refct != 0) {
                if (actual_limits != NULL)
-                       *actual_limits = kqswnal_lib.ni.actual_limits;
+                       *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
                /* This module got the first ref */
                PORTAL_MODULE_USE;
                return (PTL_OK);
@@ -481,18 +404,9 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
 
        CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
 
-       memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
-       memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
-#if MULTIRAIL_EKC
-       kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
-#else
-       kqswnal_rpc_failed.Status = -ECONNREFUSED;
-#endif
        /* ensure all pointers NULL etc */
        memset (&kqswnal_data, 0, sizeof (kqswnal_data));
 
-       kqswnal_data.kqn_cb = &kqswnal_lib;
-
        INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
        INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
        INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
@@ -507,8 +421,12 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        spin_lock_init (&kqswnal_data.kqn_sched_lock);
        init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
 
-       spin_lock_init (&kqswnal_data.kqn_statelock);
-       init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
+       /* Leave kqn_rpc_success zeroed */
+#if MULTIRAIL_EKC
+       kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+       kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
+#endif
 
        /* pointers/lists/locks initialised */
        kqswnal_data.kqn_init = KQN_INIT_DATA;
@@ -517,13 +435,13 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        kqswnal_data.kqn_ep = ep_system();
        if (kqswnal_data.kqn_ep == NULL) {
                CERROR("Can't initialise EKC\n");
-               kqswnal_shutdown(&kqswnal_api);
+               kqswnal_shutdown(nal);
                return (PTL_IFACE_INVALID);
        }
 
        if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
                CERROR("Can't get elan ID\n");
-               kqswnal_shutdown(&kqswnal_api);
+               kqswnal_shutdown(nal);
                return (PTL_IFACE_INVALID);
        }
 #else
@@ -534,7 +452,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_ep == NULL)
        {
                CERROR ("Can't get elan device 0\n");
-               kqswnal_shutdown(&kqswnal_api);
+               kqswnal_shutdown(nal);
                return (PTL_IFACE_INVALID);
        }
 #endif
@@ -550,7 +468,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eptx == NULL)
        {
                CERROR ("Can't allocate transmitter\n");
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 
@@ -563,7 +481,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eprx_small == NULL)
        {
                CERROR ("Can't install small msg receiver\n");
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 
@@ -573,7 +491,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (kqswnal_data.kqn_eprx_large == NULL)
        {
                CERROR ("Can't install large msg receiver\n");
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 
@@ -588,7 +506,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve tx dma space\n");
-               kqswnal_shutdown(&kqswnal_api);
+               kqswnal_shutdown(nal);
                return (PTL_NO_SPACE);
        }
 #else
@@ -603,7 +521,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 #endif
@@ -617,7 +535,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                                EP_PERM_WRITE);
        if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
                CERROR("Can't reserve rx dma space\n");
-               kqswnal_shutdown(&kqswnal_api);
+               kqswnal_shutdown(nal);
                return (PTL_NO_SPACE);
        }
 #else
@@ -633,7 +551,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        if (rc != DDI_SUCCESS)
        {
                CERROR ("Can't reserve rx dma space\n");
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 #endif
@@ -644,7 +562,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                     sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
        if (kqswnal_data.kqn_txds == NULL)
        {
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 
@@ -660,7 +578,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
                if (ktx->ktx_buffer == NULL)
                {
-                       kqswnal_shutdown (&kqswnal_api);
+                       kqswnal_shutdown (nal);
                        return (PTL_NO_SPACE);
                }
 
@@ -697,7 +615,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                      sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
        if (kqswnal_data.kqn_rxds == NULL)
        {
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_NO_SPACE);
        }
 
@@ -732,7 +650,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                        struct page *page = alloc_page(GFP_KERNEL);
                        
                        if (page == NULL) {
-                               kqswnal_shutdown (&kqswnal_api);
+                               kqswnal_shutdown (nal);
                                return (PTL_NO_SPACE);
                        }
 
@@ -780,12 +698,12 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
        my_process_id.pid = 0;
 
-       rc = lib_init(&kqswnal_lib, my_process_id,
+       rc = lib_init(&kqswnal_lib, nal, my_process_id,
                      requested_limits, actual_limits);
         if (rc != PTL_OK)
        {
                CERROR ("lib_init failed %d\n", rc);
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (rc);
        }
 
@@ -799,6 +717,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
 
                /* NB this enqueue can allocate/sleep (attr == 0) */
+               krx->krx_state = KRX_POSTED;
 #if MULTIRAIL_EKC
                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
                                      &krx->krx_elanbuffer, 0);
@@ -810,7 +729,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                if (rc != EP_SUCCESS)
                {
                        CERROR ("failed ep_queue_receive %d\n", rc);
-                       kqswnal_shutdown (&kqswnal_api);
+                       kqswnal_shutdown (nal);
                        return (PTL_FAIL);
                }
        }
@@ -822,7 +741,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
                if (rc != 0)
                {
                        CERROR ("failed to spawn scheduling thread: %d\n", rc);
-                       kqswnal_shutdown (&kqswnal_api);
+                       kqswnal_shutdown (nal);
                        return (PTL_FAIL);
                }
        }
@@ -835,7 +754,7 @@ kqswnal_startup (nal_t *nal, ptl_pid_t requested_pid,
        rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
        if (rc != 0) {
                CERROR ("Can't initialise command interface (rc = %d)\n", rc);
-               kqswnal_shutdown (&kqswnal_api);
+               kqswnal_shutdown (nal);
                return (PTL_FAIL);
        }
 
@@ -867,17 +786,11 @@ kqswnal_initialise (void)
 {
        int   rc;
 
-       kqswnal_api.startup  = kqswnal_startup;
-       kqswnal_api.shutdown = kqswnal_shutdown;
-       kqswnal_api.forward  = kqswnal_forward;
-       kqswnal_api.yield    = kqswnal_yield;
-       kqswnal_api.lock     = kqswnal_lock;
-       kqswnal_api.unlock   = kqswnal_unlock;
-       kqswnal_api.nal_data = &kqswnal_data;
-
-       kqswnal_lib.nal_data = &kqswnal_data;
+       kqswnal_api.nal_ni_init = kqswnal_startup;
+       kqswnal_api.nal_ni_fini = kqswnal_shutdown;
 
        /* Initialise dynamic tunables to defaults once only */
+       kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
        kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
        
        rc = ptl_register_nal(QSWNAL, &kqswnal_api);
index 6978aa0..b085caa 100644 (file)
@@ -109,7 +109,8 @@ typedef unsigned long kqsw_csum_t;
 
 #define KQSW_RESCHED                    100     /* # busy loops that forces scheduler to yield */
 
-#define KQSW_OPTIMIZED_GETS             1       /* optimized gets? */
+#define KQSW_OPTIMIZED_GETS             1       /* optimize gets >= this size */
+#define KQSW_OPTIMIZED_PUTS            (32<<10) /* optimize puts >= this size */
 #define KQSW_COPY_SMALL_FWD             0       /* copy small fwd messages to pre-mapped buffer? */
 
 /*
@@ -156,12 +157,18 @@ typedef struct
         int              krx_npages;            /* # pages in receive buffer */
         int              krx_nob;               /* Number Of Bytes received into buffer */
         int              krx_rpc_reply_needed;  /* peer waiting for EKC RPC reply */
-        int              krx_rpc_reply_sent;    /* rpc reply sent */
+        int              krx_rpc_reply_status;  /* what status to send */
+        int              krx_state;             /* what this RX is doing */
         atomic_t         krx_refcount;          /* how to tell when rpc is done */
         kpr_fwd_desc_t   krx_fwd;               /* embedded forwarding descriptor */
         ptl_kiov_t       krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */
 }  kqswnal_rx_t;
 
+#define KRX_POSTED       1                      /* receiving */
+#define KRX_PARSE        2                      /* ready to be parsed */
+#define KRX_COMPLETING   3                      /* waiting to be completed */
+
+
 typedef struct
 {
         struct list_head  ktx_list;             /* enqueue idle/active */
@@ -174,7 +181,7 @@ typedef struct
         int               ktx_nmappedpages;     /* # pages mapped for current message */
         int               ktx_port;             /* destination ep port */
         ptl_nid_t         ktx_nid;              /* destination node */
-        void             *ktx_args[2];          /* completion passthru */
+        void             *ktx_args[3];          /* completion passthru */
         char             *ktx_buffer;           /* pre-allocated contiguous buffer for hdr + small payloads */
         unsigned long     ktx_launchtime;       /* when (in jiffies) the transmit was launched */
 
@@ -193,13 +200,16 @@ typedef struct
 } kqswnal_tx_t;
 
 #define KTX_IDLE        0                       /* on kqn_(nblk_)idletxds */
-#define KTX_SENDING     1                       /* local send */
-#define KTX_FORWARDING  2                       /* routing a packet */
-#define KTX_GETTING     3                       /* local optimised get */
+#define KTX_FORWARDING  1                       /* sending a forwarded packet */
+#define KTX_SENDING     2                       /* normal send */
+#define KTX_GETTING     3                       /* sending optimised get */
+#define KTX_PUTTING     4                       /* sending optimised put */
+#define KTX_RDMAING     5                       /* handling optimised put/get */
 
 typedef struct
 {
         /* dynamic tunables... */
+        int                      kqn_optimized_puts;  /* optimized PUTs? */
         int                      kqn_optimized_gets;  /* optimized GETs? */
 #if CONFIG_SYSCTL
         struct ctl_table_header *kqn_sysctl;          /* sysctl interface */
@@ -230,9 +240,6 @@ typedef struct
         struct list_head   kqn_delayedfwds;     /* delayed forwards */
         struct list_head   kqn_delayedtxds;     /* delayed transmits */
 
-        spinlock_t         kqn_statelock;       /* cb_cli/cb_sti */
-        wait_queue_head_t  kqn_yield_waitq;     /* where yield waits */
-        nal_cb_t          *kqn_cb;              /* -> kqswnal_lib */
 #if MULTIRAIL_EKC
         EP_SYS            *kqn_ep;              /* elan system */
         EP_NMH            *kqn_ep_tx_nmh;       /* elan reserved tx vaddrs */
@@ -250,6 +257,9 @@ typedef struct
         ptl_nid_t          kqn_nid_offset;      /* this cluster's NID offset */
         int                kqn_nnodes;          /* this cluster's size */
         int                kqn_elanid;          /* this nodes's elan ID */
+
+        EP_STATUSBLK       kqn_rpc_success;     /* preset RPC reply status blocks */
+        EP_STATUSBLK       kqn_rpc_failed;
 }  kqswnal_data_t;
 
 /* kqn_init state */
@@ -258,21 +268,16 @@ typedef struct
 #define KQN_INIT_LIB            2
 #define KQN_INIT_ALL            3
 
-extern nal_cb_t            kqswnal_lib;
+extern lib_nal_t           kqswnal_lib;
 extern nal_t               kqswnal_api;
 extern kqswnal_tunables_t  kqswnal_tunables;
 extern kqswnal_data_t      kqswnal_data;
 
-/* global pre-prepared replies to keep off the stack */
-extern EP_STATUSBLK    kqswnal_rpc_success;
-extern EP_STATUSBLK    kqswnal_rpc_failed;
-
 extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg);
 extern void kqswnal_rxhandler(EP_RXD *rxd);
 extern int kqswnal_scheduler (void *);
 extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
-extern void kqswnal_dma_reply_complete (EP_RXD *rxd);
-extern void kqswnal_requeue_rx (kqswnal_rx_t *krx);
+extern void kqswnal_rx_done (kqswnal_rx_t *krx);
 
 static inline ptl_nid_t
 kqswnal_elanid2nid (int elanid) 
@@ -291,6 +296,12 @@ kqswnal_nid2elanid (ptl_nid_t nid)
         return (nid - kqswnal_data.kqn_nid_offset);
 }
 
+static inline ptl_nid_t
+kqswnal_rx_nid(kqswnal_rx_t *krx) 
+{
+        return (kqswnal_elanid2nid(ep_rxd_node(krx->krx_rxd)));
+}
+
 static inline int
 kqswnal_pages_spanned (void *base, int nob)
 {
@@ -313,11 +324,11 @@ static inline kqsw_csum_t kqsw_csum (kqsw_csum_t sum, void *base, int nob)
 }
 #endif
 
-static inline void kqswnal_rx_done (kqswnal_rx_t *krx)
+static inline void kqswnal_rx_decref (kqswnal_rx_t *krx)
 {
         LASSERT (atomic_read (&krx->krx_refcount) > 0);
         if (atomic_dec_and_test (&krx->krx_refcount))
-                kqswnal_requeue_rx(krx);
+                kqswnal_rx_done(krx);
 }
 
 #if MULTIRAIL_EKC
index 2bcb853..e1237a8 100644 (file)
 
 #include "qswnal.h"
 
-EP_STATUSBLK  kqswnal_rpc_success;
-EP_STATUSBLK  kqswnal_rpc_failed;
-
 /*
  *  LIB functions follow
  *
  */
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
-             size_t len)
-{
-        CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
-              size_t len)
-{
-        CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
-                nal->ni.nid, len, src_addr, dst_addr );
-        memcpy( dst_addr, src_addr, len );
-
-        return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-        return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap);        /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;                /* ensure terminated */
-
-        CDEBUG (D_NET, "%s", msg);
-}
-
-#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
-# error "Can't save/restore irq contexts in different procedures"
-#endif
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
-        kqswnal_data_t *data= nal->nal_data;
-
-        spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-static void
-kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding kqn_statelock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
-                wake_up_all(&kqswnal_data.kqn_yield_waitq);
-}
-
 static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
-        if (nid == nal->ni.nid)
+        if (nid == nal->libnal_ni.ni_pid.nid)
                 *dist = 0;                      /* it's me */
         else if (kqswnal_nid2elanid (nid) >= 0)
                 *dist = 1;                      /* it's my peer */
@@ -212,11 +124,12 @@ kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_
         do {
                 int  fraglen = kiov->kiov_len - offset;
 
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
-                /* each frag fits in a page */
+                /* each page frag is contained in one page */
                 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
 
+                if (fraglen > nob)
+                        fraglen = nob;
+
                 nmapped++;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -328,11 +241,12 @@ kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
         
         do {
                 int  fraglen = iov->iov_len - offset;
-                long npages  = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
-                /* nob exactly spans the iovs */
-                LASSERT (fraglen <= nob);
+                long npages;
                 
+                if (fraglen > nob)
+                        fraglen = nob;
+                npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
                 nmapped += npages;
                 if (nmapped > maxmapped) {
                         CERROR("Can't map message in %d pages (max %d)\n",
@@ -519,40 +433,29 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
 void
 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
 {
-        lib_msg_t     *msg;
-        lib_msg_t     *repmsg = NULL;
-
         switch (ktx->ktx_state) {
         case KTX_FORWARDING:       /* router asked me to forward this packet */
                 kpr_fwd_done (&kqswnal_data.kqn_router,
                               (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
                 break;
 
-        case KTX_SENDING:          /* packet sourced locally */
-                lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+        case KTX_RDMAING:          /* optimized GET/PUT handled */
+        case KTX_PUTTING:          /* optimized PUT sent */
+        case KTX_SENDING:          /* normal send */
+                lib_finalize (&kqswnal_lib, NULL,
                               (lib_msg_t *)ktx->ktx_args[1],
-                              (error == 0) ? PTL_OK : 
-                              (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
-        case KTX_GETTING:          /* Peer has DMA-ed direct? */
-                msg = (lib_msg_t *)ktx->ktx_args[1];
-
-                if (error == 0) {
-                        repmsg = lib_create_reply_msg (&kqswnal_lib, 
-                                                       ktx->ktx_nid, msg);
-                        if (repmsg == NULL)
-                                error = -ENOMEM;
-                }
-                
-                if (error == 0) {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], 
-                                      msg, PTL_OK);
-                        lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
-                } else {
-                        lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
-                                      (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
-                }
+        case KTX_GETTING:          /* optimized GET sent & REPLY received */
+                /* Complete the GET with success since we can't avoid
+                 * delivering a REPLY event; we committed to it when we
+                 * launched the GET */
+                lib_finalize (&kqswnal_lib, NULL, 
+                              (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+                lib_finalize (&kqswnal_lib, NULL,
+                              (lib_msg_t *)ktx->ktx_args[2],
+                              (error == 0) ? PTL_OK : PTL_FAIL);
                 break;
 
         default:
@@ -580,16 +483,27 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
                 kqswnal_notify_peer_down(ktx);
                 status = -EHOSTDOWN;
 
-        } else if (ktx->ktx_state == KTX_GETTING) {
-                /* RPC completed OK; what did our peer put in the status
+        } else switch (ktx->ktx_state) {
+
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* RPC completed OK; but what did our peer put in the status
                  * block? */
 #if MULTIRAIL_EKC
                 status = ep_txd_statusblk(txd)->Data[0];
 #else
                 status = ep_txd_statusblk(txd)->Status;
 #endif
-        } else {
+                break;
+                
+        case KTX_FORWARDING:
+        case KTX_SENDING:
                 status = 0;
+                break;
+                
+        default:
+                LBUG();
+                break;
         }
 
         kqswnal_tx_done (ktx, status);
@@ -610,21 +524,20 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                 return (-ESHUTDOWN);
 
         LASSERT (dest >= 0);                    /* must be a peer */
-        if (ktx->ktx_state == KTX_GETTING) {
-                /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t.  The
-                 * other frags are the GET sink which we obviously don't
-                 * send here :) */
-#if MULTIRAIL_EKC
+
+        switch (ktx->ktx_state) {
+        case KTX_GETTING:
+        case KTX_PUTTING:
+                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+                 * The other frags are the payload, awaiting RDMA */
                 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
                                      ktx->ktx_port, attr,
                                      kqswnal_txhandler, ktx,
                                      NULL, ktx->ktx_frags, 1);
-#else
-                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
-                                     ktx->ktx_port, attr, kqswnal_txhandler,
-                                     ktx, NULL, ktx->ktx_frags, 1);
-#endif
-        } else {
+                break;
+
+        case KTX_FORWARDING:
+        case KTX_SENDING:
 #if MULTIRAIL_EKC
                 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
                                          ktx->ktx_port, attr,
@@ -636,6 +549,12 @@ kqswnal_launch (kqswnal_tx_t *ktx)
                                        kqswnal_txhandler, ktx, 
                                        ktx->ktx_frags, ktx->ktx_nfrag);
 #endif
+                break;
+                
+        default:
+                LBUG();
+                rc = -EINVAL;                   /* no compiler warning please */
+                break;
         }
 
         switch (rc) {
@@ -658,6 +577,7 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         }
 }
 
+#if 0
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -726,6 +646,7 @@ kqswnal_cerror_hdr(ptl_hdr_t * hdr)
         }
 
 }                               /* end of print_hdr() */
+#endif
 
 #if !MULTIRAIL_EKC
 void
@@ -787,114 +708,291 @@ kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
         CERROR ("DATAVEC too small\n");
         return (-E2BIG);
 }
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+                    int nrfrag, EP_NMD *rfrag)
+{
+        int  i;
+
+        if (nlfrag != nrfrag) {
+                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+                       nlfrag, nrfrag);
+                return (-EINVAL);
+        }
+        
+        for (i = 0; i < nlfrag; i++)
+                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+                        CERROR("Can't cope with unequal frags %d(%d):"
+                               " %d local %d remote\n",
+                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+                        return (-EINVAL);
+                }
+        
+        return (0);
+}
 #endif
 
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, 
-                   struct iovec *iov, ptl_kiov_t *kiov, 
-                   int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
 {
-        kqswnal_rx_t       *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
         char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t          *hdr = (ptl_hdr_t *)buffer;
         kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
-        int                 rc;
-#if MULTIRAIL_EKC
-        int                 i;
-#else
-        EP_DATAVEC          datav[EP_MAXFRAG];
-        int                 ndatav;
-#endif
-        LASSERT (krx->krx_rpc_reply_needed);
-        LASSERT ((iov == NULL) != (kiov == NULL));
+        ptl_nid_t           nid = kqswnal_rx_nid(krx);
+
+        /* Note (1) lib_parse has already flipped hdr.
+         *      (2) RDMA addresses are sent in native endian-ness.  When
+         *      EKC copes with different endian nodes, I'll fix this (and
+         *      eat my hat :) */
+
+        LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+        if (hdr->type != type) {
+                CERROR ("Unexpected optimized get/put type %d (%d expected)"
+                        "from "LPX64"\n", hdr->type, type, nid);
+                return (NULL);
+        }
+        
+        if (hdr->src_nid != nid) {
+                CERROR ("Unexpected optimized get/put source NID "
+                        LPX64" from "LPX64"\n", hdr->src_nid, nid);
+                return (NULL);
+        }
+
+        LASSERT (nid == expected_nid);
 
-        /* see kqswnal_sendmsg comment regarding endian-ness */
         if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
                 /* msg too small to discover rmd size */
                 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
                         krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
-                return (-EINVAL);
+                return (NULL);
         }
-        
+
         if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
                 /* rmd doesn't fit in the incoming message */
                 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
                         krx->krx_nob, rmd->kqrmd_nfrag,
                         (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
-                return (-EINVAL);
+                return (NULL);
         }
 
-        /* Map the source data... */
+        return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd) 
+{
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        krx->krx_rpc_reply_needed = 0;
+        kqswnal_rx_decref (krx);
+
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd) 
+{
+        /* Completed fetching the PUT data */
+        int           status = ep_rxd_status(rxd);
+        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+        unsigned long flags;
+        
+        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+        LASSERT (ktx->ktx_state == KTX_RDMAING);
+        LASSERT (krx->krx_rxd == rxd);
+        LASSERT (krx->krx_rpc_reply_needed);
+
+        /* Set the RPC completion status */
+        status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
+        krx->krx_rpc_reply_status = status;
+
+        /* free ktx & finalize() its lib_msg_t */
+        kqswnal_tx_done(ktx, status);
+
+        if (!in_interrupt()) {
+                /* OK to complete the RPC now (iff I had the last ref) */
+                kqswnal_rx_decref (krx);
+                return;
+        }
+
+        LASSERT (krx->krx_state == KRX_PARSE);
+        krx->krx_state = KRX_COMPLETING;
+
+        /* Complete the RPC in thread context */
+        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+        list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+        wake_up (&kqswnal_data.kqn_sched_waitq);
+
+        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+              int niov, struct iovec *iov, ptl_kiov_t *kiov,
+              size_t offset, size_t len)
+{
+        kqswnal_remotemd_t *rmd;
+        kqswnal_tx_t       *ktx;
+        int                 eprc;
+        int                 rc;
+#if !MULTIRAIL_EKC
+        EP_DATAVEC          datav[EP_MAXFRAG];
+        int                 ndatav;
+#endif
+
+        LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+        /* Not both mapped and paged payload */
+        LASSERT (iov == NULL || kiov == NULL);
+        /* RPC completes with failure by default */
+        LASSERT (krx->krx_rpc_reply_needed);
+        LASSERT (krx->krx_rpc_reply_status != 0);
+
+        rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+        if (rmd == NULL)
+                return (-EPROTO);
+
+        if (len == 0) {
+                /* data got truncated to nothing. */
+                lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+                /* Let kqswnal_rx_done() complete the RPC with success */
+                krx->krx_rpc_reply_status = 0;
+                return (0);
+        }
+        
+        /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+           actually sending a portals message with it */
+        ktx = kqswnal_get_idle_tx(NULL, 0);
+        if (ktx == NULL) {
+                CERROR ("Can't get txd for RDMA with "LPX64"\n",
+                        libmsg->ev.initiator.nid);
+                return (-ENOMEM);
+        }
+
+        ktx->ktx_state   = KTX_RDMAING;
+        ktx->ktx_nid     = libmsg->ev.initiator.nid;
+        ktx->ktx_args[0] = krx;
+        ktx->ktx_args[1] = libmsg;
+
+        /* Start mapping at offset 0 (we're not mapping any headers) */
         ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+        
         if (kiov != NULL)
-                rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
         else
-                rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
 
         if (rc != 0) {
-                CERROR ("Can't map source data: %d\n", rc);
-                return (rc);
+                CERROR ("Can't map local RDMA data: %d\n", rc);
+                goto out;
         }
 
 #if MULTIRAIL_EKC
-        if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
-                CERROR("Can't cope with unequal # frags: %d local %d remote\n",
-                       ktx->ktx_nfrag, rmd->kqrmd_nfrag);
-                return (-EINVAL);
+        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        if (rc != 0) {
+                CERROR ("Incompatible RDMA descriptors\n");
+                goto out;
         }
-        
-        for (i = 0; i < rmd->kqrmd_nfrag; i++)
-                if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
-                        CERROR("Can't cope with unequal frags %d(%d):"
-                               " %d local %d remote\n",
-                               i, rmd->kqrmd_nfrag, 
-                               ktx->ktx_frags[i].nmd_len, 
-                               rmd->kqrmd_frag[i].nmd_len);
-                        return (-EINVAL);
-                }
 #else
-        ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
-                                      ktx->ktx_nfrag, ktx->ktx_frags,
-                                      rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+        switch (type) {
+        default:
+                LBUG();
+
+        case PTL_MSG_GET:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             ktx->ktx_nfrag, ktx->ktx_frags,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+                break;
+
+        case PTL_MSG_PUT:
+                ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+                                             rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+                                             ktx->ktx_nfrag, ktx->ktx_frags);
+                break;
+        }
+                
         if (ndatav < 0) {
                 CERROR ("Can't create datavec: %d\n", ndatav);
-                return (ndatav);
+                rc = ndatav;
+                goto out;
         }
 #endif
 
-        /* Our caller will start to race with kqswnal_dma_reply_complete... */
-        LASSERT (atomic_read (&krx->krx_refcount) == 1);
-        atomic_set (&krx->krx_refcount, 2);
+        LASSERT (atomic_read(&krx->krx_refcount) > 0);
+        /* Take an extra ref for the completion callback */
+        atomic_inc(&krx->krx_refcount);
 
-#if MULTIRAIL_EKC
-        rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx, 
-                             &kqswnal_rpc_success,
-                             ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
-        if (rc == EP_SUCCESS)
-                return (0);
+        switch (type) {
+        default:
+                LBUG();
 
-        /* Well we tried... */
-        krx->krx_rpc_reply_needed = 0;
+        case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+                eprc = ep_complete_rpc(krx->krx_rxd, 
+                                       kqswnal_rdma_store_complete, ktx, 
+                                       &kqswnal_data.kqn_rpc_success,
+                                       ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
 #else
-        rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
-                              &kqswnal_rpc_success, datav, ndatav);
-        if (rc == EP_SUCCESS)
-                return (0);
-
-        /* "old" EKC destroys rxd on failed completion */
-        krx->krx_rxd = NULL;
+                eprc = ep_complete_rpc (krx->krx_rxd, 
+                                        kqswnal_rdma_store_complete, ktx,
+                                        &kqswnal_data.kqn_rpc_success, 
+                                        datav, ndatav);
+                if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+                        krx->krx_rxd = NULL;
 #endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("can't complete RPC: %d\n", eprc);
+                        /* don't re-attempt RPC completion */
+                        krx->krx_rpc_reply_needed = 0;
+                        rc = -ECONNABORTED;
+                }
+                break;
+                
+        case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+                eprc = ep_rpc_get (krx->krx_rxd, 
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+                eprc = ep_rpc_get (krx->krx_rxd,
+                                   kqswnal_rdma_fetch_complete, ktx,
+                                   datav, ndatav);
+#endif
+                if (eprc != EP_SUCCESS) {
+                        CERROR("ep_rpc_get failed: %d\n", eprc);
+                        rc = -ECONNABORTED;
+                }
+                break;
+        }
 
-        CERROR("can't complete RPC: %d\n", rc);
-
-        /* reset refcount back to 1: we're not going to be racing with
-         * kqswnal_dma_reply_complete. */
-        atomic_set (&krx->krx_refcount, 1);
+ out:
+        if (rc != 0) {
+                kqswnal_rx_decref(krx);                 /* drop callback's ref */
+                kqswnal_put_idle_tx (ktx);
+        }
 
-        return (-ECONNABORTED);
+        atomic_dec(&kqswnal_data.kqn_pending_txs);
+        return (rc);
 }
 
 static ptl_err_t
-kqswnal_sendmsg (nal_cb_t     *nal,
+kqswnal_sendmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  ptl_hdr_t    *hdr,
@@ -916,6 +1014,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         int                sumoff;
         int                sumnob;
 #endif
+        /* NB 1. hdr is in network byte order */
+        /*    2. 'private' depends on the message type */
         
         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
                " pid %u\n", payload_nob, payload_niov, nid, pid);
@@ -934,6 +1034,15 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
+        if (type == PTL_MSG_REPLY &&            /* can I look in 'private' */
+            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+                /* Must be a REPLY for an optimized GET */
+                rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+                                   payload_niov, payload_iov, payload_kiov, 
+                                   payload_offset, payload_nob);
+                return ((rc == 0) ? PTL_OK : PTL_FAIL);
+        }
+
         targetnid = nid;
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
                 rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
@@ -956,35 +1065,16 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           type == PTL_MSG_REPLY ||
                                           in_interrupt()));
         if (ktx == NULL) {
-                kqswnal_cerror_hdr (hdr);
+                CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+                        type, libmsg->ev.initiator.nid);
                 return (PTL_NO_SPACE);
         }
 
+        ktx->ktx_state   = KTX_SENDING;
         ktx->ktx_nid     = targetnid;
         ktx->ktx_args[0] = private;
         ktx->ktx_args[1] = libmsg;
-
-        if (type == PTL_MSG_REPLY &&
-            ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
-                if (nid != targetnid ||
-                    kqswnal_nid2elanid(nid) != 
-                    ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
-                        CERROR("Optimized reply nid conflict: "
-                               "nid "LPX64" via "LPX64" elanID %d\n",
-                               nid, targetnid,
-                               ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
-                        rc = -EINVAL;
-                        goto out;
-                }
-
-                /* peer expects RPC completion with GET data */
-                rc = kqswnal_dma_reply (ktx, payload_niov, 
-                                        payload_iov, payload_kiov, 
-                                        payload_offset, payload_nob);
-                if (rc != 0)
-                        CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
-                goto out;
-        }
+        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
         ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
@@ -1027,28 +1117,31 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
 #endif
 
-        if (kqswnal_tunables.kqn_optimized_gets &&
-            type == PTL_MSG_GET &&              /* doing a GET */
-            nid == targetnid) {                 /* not forwarding */
+        /* The first frag will be the pre-mapped buffer for (at least) the
+         * portals header. */
+        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+        if (nid == targetnid &&                 /* not forwarding */
+            ((type == PTL_MSG_GET &&            /* optimize GET? */
+              kqswnal_tunables.kqn_optimized_gets != 0 &&
+              NTOH__u32(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+             (type == PTL_MSG_PUT &&            /* optimize PUT? */
+              kqswnal_tunables.kqn_optimized_puts != 0 &&
+              payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
                 lib_md_t           *md = libmsg->md;
                 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
                 
-                /* Optimised path: I send over the Elan vaddrs of the get
-                 * sink buffers, and my peer DMAs directly into them.
+                /* Optimised path: I send over the Elan vaddrs of the local
+                 * buffers, and my peer DMAs directly to/from them.
                  *
                  * First I set up ktx as if it was going to send this
                  * payload, (it needs to map it anyway).  This fills
                  * ktx_frags[1] and onward with the network addresses
                  * of the GET sink frags.  I copy these into ktx_buffer,
-                 * immediately after the header, and send that as my GET
-                 * message.
-                 *
-                 * Note that the addresses are sent in native endian-ness.
-                 * When EKC copes with different endian nodes, I'll fix
-                 * this (and eat my hat :) */
+                 * immediately after the header, and send that as my
+                 * message. */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_GETTING;
+                ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
 
                 if ((libmsg->md->options & PTL_MD_KIOV) != 0) 
                         rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
@@ -1078,12 +1171,21 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
                 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
 #endif
+                if (type == PTL_MSG_GET) {
+                        /* Allocate reply message now while I'm in thread context */
+                        ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+                                                                 nid, libmsg);
+                        if (ktx->ktx_args[2] == NULL)
+                                goto out;
+
+                        /* NB finalizing the REPLY message is my
+                         * responsibility now, whatever happens. */
+                }
+                
         } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
 
                 /* small message: single frag copied into the pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE + payload_nob);
@@ -1105,8 +1207,6 @@ kqswnal_sendmsg (nal_cb_t     *nal,
 
                 /* large message: multiple frags: first is hdr in pre-mapped buffer */
 
-                ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
-                ktx->ktx_state = KTX_SENDING;
 #if MULTIRAIL_EKC
                 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
                               0, KQSW_HDR_SIZE);
@@ -1135,15 +1235,29 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                rc == 0 ? "Sent" : "Failed to send",
                payload_nob, nid, targetnid, rc);
 
-        if (rc != 0)
+        if (rc != 0) {
+                if (ktx->ktx_state == KTX_GETTING &&
+                    ktx->ktx_args[2] != NULL) {
+                        /* We committed to reply, but there was a problem
+                         * launching the GET.  We can't avoid delivering a
+                         * REPLY event since we committed above, so we
+                         * pretend the GET succeeded but the REPLY
+                         * failed. */
+                        rc = 0;
+                        lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+                        lib_finalize (&kqswnal_lib, private,
+                                      (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+                }
+                
                 kqswnal_put_idle_tx (ktx);
-
+        }
+        
         atomic_dec(&kqswnal_data.kqn_pending_txs);
         return (rc == 0 ? PTL_OK : PTL_FAIL);
 }
 
 static ptl_err_t
-kqswnal_send (nal_cb_t     *nal,
+kqswnal_send (lib_nal_t    *nal,
               void         *private,
               lib_msg_t    *libmsg,
               ptl_hdr_t    *hdr,
@@ -1161,7 +1275,7 @@ kqswnal_send (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_send_pages (nal_cb_t     *nal,
+kqswnal_send_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     ptl_hdr_t    *hdr,
@@ -1200,7 +1314,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (ktx == NULL)        /* can't get txd right now */
                 return;         /* fwd will be scheduled when tx desc freed */
 
-        if (nid == kqswnal_lib.ni.nid)          /* gateway is me */
+        if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
                 nid = fwd->kprfd_target_nid;    /* target is final dest */
 
         if (kqswnal_nid2elanid (nid) < 0) {
@@ -1254,9 +1368,8 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (rc != 0) {
                 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
 
-                kqswnal_put_idle_tx (ktx);
                 /* complete now (with failure) */
-                kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+                kqswnal_tx_done (ktx, rc);
         }
 
         atomic_dec(&kqswnal_data.kqn_pending_txs);
@@ -1277,29 +1390,48 @@ kqswnal_fwd_callback (void *arg, int error)
                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
         }
 
-        kqswnal_requeue_rx (krx);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
+        kqswnal_rx_decref (krx);
 }
 
 void
-kqswnal_dma_reply_complete (EP_RXD *rxd) 
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
 {
-        int           status = ep_rxd_status(rxd);
-        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
-        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
-        lib_msg_t    *msg = (lib_msg_t *)ktx->ktx_args[1];
-        
-        CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
-               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (!krx->krx_rpc_reply_needed);
 
-        LASSERT (krx->krx_rxd == rxd);
-        LASSERT (krx->krx_rpc_reply_needed);
+        krx->krx_state = KRX_POSTED;
 
-        krx->krx_rpc_reply_needed = 0;
-        kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+        if (kqswnal_data.kqn_shuttingdown) {
+                /* free EKC rxd on shutdown */
+                ep_complete_receive(krx->krx_rxd);
+        } else {
+                /* repost receive */
+                ep_requeue_receive(krx->krx_rxd, 
+                                   kqswnal_rxhandler, krx,
+                                   &krx->krx_elanbuffer, 0);
+        }
+#else                
+        if (kqswnal_data.kqn_shuttingdown)
+                return;
 
-        lib_finalize (&kqswnal_lib, NULL, msg,
-                      (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
-        kqswnal_put_idle_tx (ktx);
+        if (krx->krx_rxd == NULL) {
+                /* We had a failed ep_complete_rpc() which nukes the
+                 * descriptor in "old" EKC */
+                int eprc = ep_queue_receive(krx->krx_eprx, 
+                                            kqswnal_rxhandler, krx,
+                                            krx->krx_elanbuffer, 
+                                            krx->krx_npages * PAGE_SIZE, 0);
+                LASSERT (eprc == EP_SUCCESS);
+                /* We don't handle failure here; it's incredibly rare
+                 * (never reported?) and only happens with "old" EKC */
+        } else {
+                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+                                   krx->krx_elanbuffer, 
+                                   krx->krx_npages * PAGE_SIZE);
+        }
+#endif
 }
 
 void
@@ -1319,71 +1451,45 @@ kqswnal_rpc_complete (EP_RXD *rxd)
 }
 
 void
-kqswnal_requeue_rx (kqswnal_rx_t *krx) 
+kqswnal_rx_done (kqswnal_rx_t *krx) 
 {
-        int   rc;
+        int           rc;
+        EP_STATUSBLK *sblk;
 
         LASSERT (atomic_read(&krx->krx_refcount) == 0);
 
         if (krx->krx_rpc_reply_needed) {
+                /* We've not completed the peer's RPC yet... */
+                sblk = (krx->krx_rpc_reply_status == 0) ? 
+                       &kqswnal_data.kqn_rpc_success : 
+                       &kqswnal_data.kqn_rpc_failed;
 
-                /* We failed to complete the peer's optimized GET (e.g. we
-                 * couldn't map the source buffers).  We complete the
-                 * peer's EKC rpc now with failure. */
+                LASSERT (!in_interrupt());
 #if MULTIRAIL_EKC
-                rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
-                                     &kqswnal_rpc_failed, NULL, NULL, 0);
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, NULL, 0);
                 if (rc == EP_SUCCESS)
                         return;
-                
-                CERROR("can't complete RPC: %d\n", rc);
 #else
-                if (krx->krx_rxd != NULL) {
-                        /* We didn't try (and fail) to complete earlier... */
-                        rc = ep_complete_rpc(krx->krx_rxd, 
-                                             kqswnal_rpc_complete, krx,
-                                             &kqswnal_rpc_failed, NULL, 0);
-                        if (rc == EP_SUCCESS)
-                                return;
-
-                        CERROR("can't complete RPC: %d\n", rc);
-                }
-                
-                /* NB the old ep_complete_rpc() frees rxd on failure, so we
-                 * have to requeue from scratch here, unless we're shutting
-                 * down */
-                if (kqswnal_data.kqn_shuttingdown)
+                rc = ep_complete_rpc(krx->krx_rxd, 
+                                     kqswnal_rpc_complete, krx,
+                                     sblk, NULL, 0);
+                if (rc == EP_SUCCESS)
                         return;
 
-                rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
-                                      krx->krx_elanbuffer, 
-                                      krx->krx_npages * PAGE_SIZE, 0);
-                LASSERT (rc == EP_SUCCESS);
-                /* We don't handle failure here; it's incredibly rare
-                 * (never reported?) and only happens with "old" EKC */
-                return;
+                /* "old" EKC destroys rxd on failed completion */
+                krx->krx_rxd = NULL;
 #endif
+                CERROR("can't complete RPC: %d\n", rc);
+                krx->krx_rpc_reply_needed = 0;
         }
 
-#if MULTIRAIL_EKC
-        if (kqswnal_data.kqn_shuttingdown) {
-                /* free EKC rxd on shutdown */
-                ep_complete_receive(krx->krx_rxd);
-        } else {
-                /* repost receive */
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   &krx->krx_elanbuffer, 0);
-        }
-#else                
-        /* don't actually requeue on shutdown */
-        if (!kqswnal_data.kqn_shuttingdown) 
-                ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
-                                   krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+        kqswnal_requeue_rx(krx);
 }
         
 void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
 {
         ptl_hdr_t      *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
         ptl_nid_t       dest_nid = NTOH__u64 (hdr->dest_nid);
@@ -1391,25 +1497,28 @@ kqswnal_rx (kqswnal_rx_t *krx)
         int             nob;
         int             niov;
 
-        LASSERT (atomic_read(&krx->krx_refcount) == 0);
+        LASSERT (atomic_read(&krx->krx_refcount) == 1);
+
+        if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+                /* I ignore parse errors since I'm not consuming a byte
+                 * stream */
+                (void)lib_parse (&kqswnal_lib, hdr, krx);
 
-        if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
-                atomic_set(&krx->krx_refcount, 1);
-                lib_parse (&kqswnal_lib, hdr, krx);
-                kqswnal_rx_done(krx);
+                /* Drop my ref; any RDMA activity takes an additional ref */
+                kqswnal_rx_decref(krx);
                 return;
         }
 
 #if KQSW_CHECKSUM
-        CERROR ("checksums for forwarded packets not implemented\n");
-        LBUG ();
+        LASSERTF (0, "checksums for forwarded packets not implemented\n");
 #endif
+
         if (kqswnal_nid2elanid (dest_nid) >= 0)  /* should have gone direct to peer */
         {
                 CERROR("dropping packet from "LPX64" for "LPX64
                        ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
 
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref (krx);
                 return;
         }
 
@@ -1451,7 +1560,9 @@ kqswnal_rxhandler(EP_RXD *rxd)
                rxd, krx, nob, status);
 
         LASSERT (krx != NULL);
-
+        LASSERT (krx->krx_state = KRX_POSTED);
+        
+        krx->krx_state = KRX_PARSE;
         krx->krx_rxd = rxd;
         krx->krx_nob = nob;
 #if MULTIRAIL_EKC
@@ -1459,7 +1570,10 @@ kqswnal_rxhandler(EP_RXD *rxd)
 #else
         krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
 #endif
-        
+        /* Default to failure if an RPC reply is requested but not handled */
+        krx->krx_rpc_reply_status = -EPROTO;
+        atomic_set (&krx->krx_refcount, 1);
+
         /* must receive a whole header to be able to parse */
         if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
         {
@@ -1475,12 +1589,12 @@ kqswnal_rxhandler(EP_RXD *rxd)
                         CERROR("receive status failed with status %d nob %d\n",
                                ep_rxd_status(rxd), nob);
 #endif
-                kqswnal_requeue_rx (krx);
+                kqswnal_rx_decref(krx);
                 return;
         }
 
         if (!in_interrupt()) {
-                kqswnal_rx (krx);
+                kqswnal_parse(krx);
                 return;
         }
 
@@ -1540,7 +1654,7 @@ kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
 #endif
 
 static ptl_err_t
-kqswnal_recvmsg (nal_cb_t     *nal,
+kqswnal_recvmsg (lib_nal_t    *nal,
                  void         *private,
                  lib_msg_t    *libmsg,
                  unsigned int  niov,
@@ -1552,16 +1666,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 {
         kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
         char         *buffer = page_address(krx->krx_kiov[0].kiov_page);
+        ptl_hdr_t    *hdr = (ptl_hdr_t *)buffer;
         int           page;
         char         *page_ptr;
         int           page_nob;
         char         *iov_ptr;
         int           iov_nob;
         int           frag;
+        int           rc;
 #if KQSW_CHECKSUM
         kqsw_csum_t   senders_csum;
         kqsw_csum_t   payload_csum = 0;
-        kqsw_csum_t   hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+        kqsw_csum_t   hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
         size_t        csum_len = mlen;
         int           csum_frags = 0;
         int           csum_nob = 0;
@@ -1574,8 +1690,18 @@ kqswnal_recvmsg (nal_cb_t     *nal,
         if (senders_csum != hdr_csum)
                 kqswnal_csum_error (krx, 1);
 #endif
+        /* NB lib_parse() has already flipped *hdr */
+
         CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
 
+        if (krx->krx_rpc_reply_needed &&
+            hdr->type == PTL_MSG_PUT) {
+                /* This must be an optimized PUT */
+                rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+                                   niov, iov, kiov, offset, mlen);
+                return (rc == 0 ? PTL_OK : PTL_FAIL);
+        }
+
         /* What was actually received must be >= payload. */
         LASSERT (mlen <= rlen);
         if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
@@ -1691,7 +1817,7 @@ kqswnal_recvmsg (nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv(nal_cb_t     *nal,
+kqswnal_recv(lib_nal_t    *nal,
              void         *private,
              lib_msg_t    *libmsg,
              unsigned int  niov,
@@ -1706,7 +1832,7 @@ kqswnal_recv(nal_cb_t     *nal,
 }
 
 static ptl_err_t
-kqswnal_recv_pages (nal_cb_t     *nal,
+kqswnal_recv_pages (lib_nal_t    *nal,
                     void         *private,
                     lib_msg_t    *libmsg,
                     unsigned int  niov,
@@ -1766,7 +1892,18 @@ kqswnal_scheduler (void *arg)
                         spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
                                                flags);
 
-                        kqswnal_rx (krx);
+                        switch (krx->krx_state) {
+                        case KRX_PARSE:
+                                kqswnal_parse (krx);
+                                break;
+                        case KRX_COMPLETING:
+                                /* Drop last ref to reply to RPC and requeue */
+                                LASSERT (krx->krx_rpc_reply_needed);
+                                kqswnal_rx_decref (krx);
+                                break;
+                        default:
+                                LBUG();
+                        }
 
                         did_something = 1;
                         spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
@@ -1835,20 +1972,12 @@ kqswnal_scheduler (void *arg)
         return (0);
 }
 
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
 {
-        nal_data:       &kqswnal_data,         /* NAL private data */
-        cb_send:        kqswnal_send,
-        cb_send_pages:  kqswnal_send_pages,
-        cb_recv:        kqswnal_recv,
-        cb_recv_pages:  kqswnal_recv_pages,
-        cb_read:        kqswnal_read,
-        cb_write:       kqswnal_write,
-        cb_malloc:      kqswnal_malloc,
-        cb_free:        kqswnal_free,
-        cb_printf:      kqswnal_printf,
-        cb_cli:         kqswnal_cli,
-        cb_sti:         kqswnal_sti,
-        cb_callback:    kqswnal_callback,
-        cb_dist:        kqswnal_dist
+        libnal_data:       &kqswnal_data,         /* NAL private data */
+        libnal_send:        kqswnal_send,
+        libnal_send_pages:  kqswnal_send_pages,
+        libnal_recv:        kqswnal_recv,
+        libnal_recv_pages:  kqswnal_recv_pages,
+        libnal_dist:        kqswnal_dist
 };
index 32bbbec..9d39cb1 100644 (file)
@@ -74,83 +74,9 @@ static ctl_table ksocknal_top_ctl_table[] = {
 #endif
 
 int
-ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
-                       void *ret, size_t ret_len)
-{
-        ksock_nal_data_t *k;
-        nal_cb_t *nal_cb;
-
-        k = nal->nal_data;
-        nal_cb = k->ksnd_nal_cb;
-
-        lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
-        return PTL_OK;
-}
-
-void
-ksocknal_api_lock(nal_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *k;
-        nal_cb_t *nal_cb;
-
-        k = nal->nal_data;
-        nal_cb = k->ksnd_nal_cb;
-        nal_cb->cb_cli(nal_cb,flags);
-}
-
-void
-ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *k;
-        nal_cb_t *nal_cb;
-
-        k = nal->nal_data;
-        nal_cb = k->ksnd_nal_cb;
-        nal_cb->cb_sti(nal_cb,flags);
-}
-
-int
-ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
-       /* NB called holding statelock */
-        wait_queue_t       wait;
-       unsigned long      now = jiffies;
-
-       CDEBUG (D_NET, "yield\n");
-
-       if (milliseconds == 0) {
-                our_cond_resched();
-               return 0;
-       }
-
-       init_waitqueue_entry(&wait, current);
-       set_current_state (TASK_INTERRUPTIBLE);
-       add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
-
-       ksocknal_api_unlock(nal, flags);
-
-       if (milliseconds < 0)
-               schedule ();
-       else
-               schedule_timeout((milliseconds * HZ) / 1000);
-       
-       ksocknal_api_lock(nal, flags);
-
-       remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
-
-       if (milliseconds > 0) {
-               milliseconds -= ((jiffies - now) * 1000) / HZ;
-               if (milliseconds < 0)
-                       milliseconds = 0;
-       }
-       
-       return (milliseconds);
-}
-
-int
 ksocknal_set_mynid(ptl_nid_t nid)
 {
-        lib_ni_t *ni = &ksocknal_lib.ni;
+        lib_ni_t *ni = &ksocknal_lib.libnal_ni;
 
         /* FIXME: we have to do this because we call lib_init() at module
          * insertion time, which is before we have 'mynid' available.  lib_init
@@ -159,9 +85,9 @@ ksocknal_set_mynid(ptl_nid_t nid)
          * problem. */
 
         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
-               nid, ni->nid);
+               nid, ni->ni_pid.nid);
 
-        ni->nid = nid;
+        ni->ni_pid.nid = nid;
         return (0);
 }
 
@@ -1527,14 +1453,18 @@ ksocknal_api_shutdown (nal_t *nal)
 
                 /* flag threads to terminate; wake and wait for them to die */
                 ksocknal_data.ksnd_shuttingdown = 1;
+                mb();
                 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
 
                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
                        wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
 
+                i = 4;
                 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
-                        CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
+                        i++;
+                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+                               "waiting for %d threads to terminate\n",
                                 atomic_read (&ksocknal_data.ksnd_nthreads));
                         set_current_state (TASK_UNINTERRUPTIBLE);
                         schedule_timeout (HZ);
@@ -1590,7 +1520,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
 
         if (nal->nal_refct != 0) {
                 if (actual_limits != NULL)
-                        *actual_limits = ksocknal_lib.ni.actual_limits;
+                        *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
                 /* This module got the first ref */
                 PORTAL_MODULE_USE;
                 return (PTL_OK);
@@ -1613,10 +1543,6 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
 
         rwlock_init(&ksocknal_data.ksnd_global_lock);
 
-        ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
-        spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
-        init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
-        
         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
         INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
@@ -1646,7 +1572,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
         if (ksocknal_data.ksnd_schedulers == NULL) {
-                ksocknal_api_shutdown (&ksocknal_api);
+                ksocknal_api_shutdown (nal);
                 return (-ENOMEM);
         }
 
@@ -1666,11 +1592,11 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         process_id.pid = 0;
         process_id.nid = 0;
         
-        rc = lib_init(&ksocknal_lib, process_id,
+        rc = lib_init(&ksocknal_lib, nal, process_id,
                       requested_limits, actual_limits);
         if (rc != PTL_OK) {
                 CERROR("lib_init failed: error %d\n", rc);
-                ksocknal_api_shutdown (&ksocknal_api);
+                ksocknal_api_shutdown (nal);
                 return (rc);
         }
 
@@ -1682,7 +1608,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 if (rc != 0) {
                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
                                i, rc);
-                        ksocknal_api_shutdown (&ksocknal_api);
+                        ksocknal_api_shutdown (nal);
                         return (rc);
                 }
         }
@@ -1691,7 +1617,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
                 if (rc != 0) {
                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
-                        ksocknal_api_shutdown (&ksocknal_api);
+                        ksocknal_api_shutdown (nal);
                         return (rc);
                 }
         }
@@ -1699,7 +1625,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
         if (rc != 0) {
                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
-                ksocknal_api_shutdown (&ksocknal_api);
+                ksocknal_api_shutdown (nal);
                 return (rc);
         }
 
@@ -1725,7 +1651,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t, 
                                                    fmb_kiov[pool->fmp_buff_pages]));
                         if (fmb == NULL) {
-                                ksocknal_api_shutdown(&ksocknal_api);
+                                ksocknal_api_shutdown(nal);
                                 return (-ENOMEM);
                         }
 
@@ -1735,7 +1661,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                                 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
 
                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
-                                        ksocknal_api_shutdown (&ksocknal_api);
+                                        ksocknal_api_shutdown (nal);
                                         return (-ENOMEM);
                                 }
 
@@ -1749,7 +1675,7 @@ ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
         if (rc != 0) {
                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
-                ksocknal_api_shutdown (&ksocknal_api);
+                ksocknal_api_shutdown (nal);
                 return (rc);
         }
 
@@ -1794,14 +1720,8 @@ ksocknal_module_init (void)
         /* check ksnr_connected/connecting field large enough */
         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
         
-        ksocknal_api.startup  = ksocknal_api_startup;
-        ksocknal_api.forward  = ksocknal_api_forward;
-        ksocknal_api.shutdown = ksocknal_api_shutdown;
-        ksocknal_api.lock     = ksocknal_api_lock;
-        ksocknal_api.unlock   = ksocknal_api_unlock;
-        ksocknal_api.nal_data = &ksocknal_data;
-
-        ksocknal_lib.nal_data = &ksocknal_data;
+        ksocknal_api.nal_ni_init = ksocknal_api_startup;
+        ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
 
         /* Initialise dynamic tunables to defaults once only */
         ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
index 87b23dc..ff73f71 100644 (file)
@@ -160,10 +160,6 @@ typedef struct {
         struct list_head *ksnd_peers;           /* hash table of all my known peers */
         int               ksnd_peer_hash_size;  /* size of ksnd_peers */
 
-        nal_cb_t         *ksnd_nal_cb;
-        spinlock_t        ksnd_nal_cb_lock;     /* lib cli/sti lock */
-        wait_queue_head_t ksnd_yield_waitq;     /* where yield waits */
-
         atomic_t          ksnd_nthreads;        /* # live threads */
         int               ksnd_shuttingdown;    /* tell threads to exit */
         ksock_sched_t    *ksnd_schedulers;      /* scheduler state */
@@ -364,7 +360,7 @@ typedef struct ksock_peer
 } ksock_peer_t;
 
 
-extern nal_cb_t         ksocknal_lib;
+extern lib_nal_t        ksocknal_lib;
 extern ksock_nal_data_t ksocknal_data;
 extern ksock_tunables_t ksocknal_tunables;
 
index 21e0abe..5815d16 100644 (file)
  *  LIB functions follow
  *
  */
-ptl_err_t
-ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
-              user_ptr src_addr, size_t len)
-{
-        CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
-               nal->ni.nid, (long)len, src_addr, dst_addr);
-
-        memcpy( dst_addr, src_addr, len );
-        return PTL_OK;
-}
-
-ptl_err_t
-ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
-               void *src_addr, size_t len)
-{
-        CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
-               nal->ni.nid, (long)len, src_addr, dst_addr);
-
-        memcpy( dst_addr, src_addr, len );
-        return PTL_OK;
-}
-
-void *
-ksocknal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-
-        if (buf != NULL)
-                memset(buf, 0, len);
-
-        return (buf);
-}
-
-void
-ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-void
-ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
-
-        CDEBUG (D_NET, "%s", msg);
-}
-
-void
-ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *data = nal->nal_data;
-
-        /* OK to ignore 'flags'; we're only ever serialise threads and
-         * never need to lock out interrupts */
-        spin_lock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *data;
-        data = nal->nal_data;
-
-        /* OK to ignore 'flags'; we're only ever serialise threads and
-         * never need to lock out interrupts */
-        spin_unlock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding ksnd_nal_cb_lock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-        
-        if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
-                wake_up_all(&ksocknal_data.ksnd_yield_waitq);
-}
-
 int
-ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
         /* I would guess that if ksocknal_get_peer (nid) == NULL,
            and we're not routing, then 'nid' is very distant :) */
-        if ( nal->ni.nid == nid ) {
+        if (nal->libnal_ni.ni_pid.nid == nid) {
                 *dist = 0;
         } else {
                 *dist = 1;
@@ -882,8 +793,8 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
 {
         struct list_head  *tmp;
         ksock_route_t     *route;
-        ksock_route_t     *candidate = NULL;
-        int                found = 0;
+        ksock_route_t     *first_lazy = NULL;
+        int                found_connecting_or_connected = 0;
         int                bits;
         
         list_for_each (tmp, &peer->ksnp_routes) {
@@ -896,7 +807,7 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
                         /* All typed connections have been established, or
                          * an untyped connection has been established, or
                          * connections are currently being established */
-                        found = 1;
+                        found_connecting_or_connected = 1;
                         continue;
                 }
 
@@ -904,20 +815,24 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
                 if (!time_after_eq (jiffies, route->ksnr_timeout))
                         continue;
                 
-                /* always do eager routes */
+                /* eager routes always want to be connected */
                 if (route->ksnr_eager)
                         return (route);
 
-                if (candidate == NULL) {
-                        /* If we don't find any other route that is fully
-                         * connected or connecting, the first connectable
-                         * route is returned.  If it fails to connect, it
-                         * will get placed at the end of the list */
-                        candidate = route;
-                }
+                if (first_lazy == NULL)
+                        first_lazy = route;
         }
-        return (found ? NULL : candidate);
+        
+        /* No eager routes need to be connected.  If some connection has
+         * already been established, or is being established there's nothing to
+         * do.  Otherwise we return the first lazy route we found.  If it fails
+         * to connect, it will go to the end of the list. */
+
+        if (!list_empty (&peer->ksnp_conns) ||
+            found_connecting_or_connected)
+                return (NULL);
+        
+        return (first_lazy);
 }
 
 ksock_route_t *
@@ -1028,7 +943,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
 }
 
 ptl_err_t
-ksocknal_sendmsg(nal_cb_t     *nal, 
+ksocknal_sendmsg(lib_nal_t     *nal, 
                  void         *private, 
                  lib_msg_t    *cookie,
                  ptl_hdr_t    *hdr, 
@@ -1125,7 +1040,7 @@ ksocknal_sendmsg(nal_cb_t     *nal,
 }
 
 ptl_err_t
-ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
+ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                unsigned int payload_niov, struct iovec *payload_iov,
                size_t payload_offset, size_t payload_len)
@@ -1137,7 +1052,7 @@ ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
 }
 
 ptl_err_t
-ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
+ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
                      size_t payload_offset, size_t payload_len)
@@ -1159,7 +1074,7 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
 
         /* I'm the gateway; must be the last hop */
-        if (nid == ksocknal_lib.ni.nid)
+        if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
                 nid = fwd->kprfd_target_nid;
 
         /* setup iov for hdr */
@@ -1544,7 +1459,8 @@ ksocknal_process_receive (ksock_conn_t *conn)
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
-                    NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
+                    NTOH__u64(conn->ksnc_hdr.dest_nid) != 
+                    ksocknal_lib.libnal_ni.ni_pid.nid) {
                         /* This packet isn't for me */
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
@@ -1561,7 +1477,13 @@ ksocknal_process_receive (ksock_conn_t *conn)
                 }
 
                 /* sets wanted_len, iovs etc */
-                lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+                rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+
+                if (rc != PTL_OK) {
+                        /* I just received garbage: give up on this conn */
+                        ksocknal_close_conn_and_siblings (conn, rc);
+                        return (-EPROTO);
+                }
 
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
@@ -1608,7 +1530,7 @@ ksocknal_process_receive (ksock_conn_t *conn)
 }
 
 ptl_err_t
-ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
                unsigned int niov, struct iovec *iov, 
                size_t offset, size_t mlen, size_t rlen)
 {
@@ -1636,7 +1558,7 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
 }
 
 ptl_err_t
-ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
                      unsigned int niov, ptl_kiov_t *kiov, 
                      size_t offset, size_t mlen, size_t rlen)
 {
@@ -2029,7 +1951,7 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
         hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
         hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
 
-        hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
+        hdr.src_nid = __cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
 
         hdr.msg.hello.type = __cpu_to_le32 (*type);
@@ -2698,19 +2620,11 @@ ksocknal_reaper (void *arg)
         return (0);
 }
 
-nal_cb_t ksocknal_lib = {
-        nal_data:       &ksocknal_data,                /* NAL private data */
-        cb_send:         ksocknal_send,
-        cb_send_pages:   ksocknal_send_pages,
-        cb_recv:         ksocknal_recv,
-        cb_recv_pages:   ksocknal_recv_pages,
-        cb_read:         ksocknal_read,
-        cb_write:        ksocknal_write,
-        cb_malloc:       ksocknal_malloc,
-        cb_free:         ksocknal_free,
-        cb_printf:       ksocknal_printf,
-        cb_cli:          ksocknal_cli,
-        cb_sti:          ksocknal_sti,
-        cb_callback:     ksocknal_callback,
-        cb_dist:         ksocknal_dist
+lib_nal_t ksocknal_lib = {
+        libnal_data:       &ksocknal_data,      /* NAL private data */
+        libnal_send:        ksocknal_send,
+        libnal_send_pages:  ksocknal_send_pages,
+        libnal_recv:        ksocknal_recv,
+        libnal_recv_pages:  ksocknal_recv_pages,
+        libnal_dist:        ksocknal_dist
 };
index 4e63c86..06f1578 100644 (file)
 #define PORTAL_MINOR 240
 
 struct nal_cmd_handler {
+        int                  nch_number;
         nal_cmd_handler_fn  *nch_handler;
         void                *nch_private;
 };
 
-static struct nal_cmd_handler nal_cmd[NAL_MAX_NR + 1];
+static struct nal_cmd_handler nal_cmd[16];
 static DECLARE_MUTEX(nal_cmd_sem);
 
 #ifdef PORTAL_DEBUG
@@ -245,23 +246,53 @@ static inline void freedata(void *data, int len)
         PORTAL_FREE(data, len);
 }
 
+struct nal_cmd_handler *
+libcfs_find_nal_cmd_handler(int nal)
+{
+        int    i;
+
+        for (i = 0; i < sizeof(nal_cmd)/sizeof(nal_cmd[0]); i++)
+                if (nal_cmd[i].nch_handler != NULL &&
+                    nal_cmd[i].nch_number == nal)
+                        return (&nal_cmd[i]);
+
+        return (NULL);
+}
+
 int
 libcfs_nal_cmd_register(int nal, nal_cmd_handler_fn *handler, void *private)
 {
-        int rc = 0;
+        struct nal_cmd_handler *cmd;
+        int                     i;
+        int                     rc;
 
         CDEBUG(D_IOCTL, "Register NAL %d, handler: %p\n", nal, handler);
 
-        if (nal > 0  && nal <= NAL_MAX_NR) {
-                down(&nal_cmd_sem);
-                if (nal_cmd[nal].nch_handler != NULL)
-                        rc = -EBUSY;
-                else {
-                        nal_cmd[nal].nch_handler = handler;
-                        nal_cmd[nal].nch_private = private;
+        down(&nal_cmd_sem);
+
+        if (libcfs_find_nal_cmd_handler(nal) != NULL) {
+                up (&nal_cmd_sem);
+                return (-EBUSY);
+        }
+
+        cmd = NULL;
+        for (i = 0; i < sizeof(nal_cmd)/sizeof(nal_cmd[0]); i++)
+                if (nal_cmd[i].nch_handler == NULL) {
+                        cmd = &nal_cmd[i];
+                        break;
                 }
-                up(&nal_cmd_sem);
+        
+        if (cmd == NULL) {
+                rc = -EBUSY;
+        } else {
+                rc = 0;
+                cmd->nch_number = nal;
+                cmd->nch_handler = handler;
+                cmd->nch_private = private;
         }
+
+        up(&nal_cmd_sem);
+
         return rc;
 }
 EXPORT_SYMBOL(libcfs_nal_cmd_register);
@@ -269,14 +300,15 @@ EXPORT_SYMBOL(libcfs_nal_cmd_register);
 void
 libcfs_nal_cmd_unregister(int nal)
 {
-        CDEBUG(D_IOCTL, "Unregister NAL %d\n", nal);
+        struct nal_cmd_handler *cmd;
 
-        LASSERT(nal > 0 && nal <= NAL_MAX_NR);
-        LASSERT(nal_cmd[nal].nch_handler != NULL);
+        CDEBUG(D_IOCTL, "Unregister NAL %d\n", nal);
 
         down(&nal_cmd_sem);
-        nal_cmd[nal].nch_handler = NULL;
-        nal_cmd[nal].nch_private = NULL;
+        cmd = libcfs_find_nal_cmd_handler(nal);
+        LASSERT (cmd != NULL);
+        cmd->nch_handler = NULL;
+        cmd->nch_private = NULL;
         up(&nal_cmd_sem);
 }
 EXPORT_SYMBOL(libcfs_nal_cmd_unregister);
@@ -284,16 +316,17 @@ EXPORT_SYMBOL(libcfs_nal_cmd_unregister);
 int
 libcfs_nal_cmd(struct portals_cfg *pcfg)
 {
+        struct nal_cmd_handler *cmd;
         __u32 nal = pcfg->pcfg_nal;
         int   rc = -EINVAL;
         ENTRY;
 
         down(&nal_cmd_sem);
-        if (nal > 0 && nal <= NAL_MAX_NR && 
-            nal_cmd[nal].nch_handler != NULL) {
+        cmd = libcfs_find_nal_cmd_handler(nal);
+        if (cmd != NULL) {
                 CDEBUG(D_IOCTL, "calling handler nal: %d, cmd: %d\n", nal, 
                        pcfg->pcfg_command);
-                rc = nal_cmd[nal].nch_handler(pcfg, nal_cmd[nal].nch_private);
+                rc = cmd->nch_handler(pcfg, cmd->nch_private);
         }
         up(&nal_cmd_sem);
 
index 6ce334b..c0f2e71 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := portals
-portals-objs := api-eq.o api-init.o api-me.o api-errno.o api-ni.o api-wrap.o
-portals-objs += lib-dispatch.o lib-init.o lib-me.o lib-msg.o lib-eq.o lib-md.o
+portals-objs := api-errno.o api-ni.o api-wrap.o
+portals-objs += lib-init.o lib-me.o lib-msg.o lib-eq.o lib-md.o
 portals-objs += lib-move.o lib-ni.o lib-pid.o module.o
 
 @INCLUDE_RULES@
index de01765..088902a 100644 (file)
@@ -6,7 +6,7 @@
 include $(src)/../Kernelenv
 
 obj-y += portals.o
-portals-objs    :=     lib-dispatch.o lib-eq.o lib-init.o lib-md.o lib-me.o \
+portals-objs    :=     lib-eq.o lib-init.o lib-md.o lib-me.o \
                        lib-move.o lib-msg.o lib-ni.o lib-pid.o \
-                       api-eq.o api-errno.o api-init.o api-me.o api-ni.o \
-                       api-wrap.o module.o
+                       api-errno.o api-ni.o api-wrap.o \
+                       module.o
diff --git a/lnet/lnet/api-eq.c b/lnet/lnet/api-eq.c
deleted file mode 100644 (file)
index 0306043..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-eq.c
- * User-level event queue management routines
- *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
- *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
-int ptl_get_event (ptl_eq_t *eq, ptl_event_t *ev)
-{
-        int          new_index = eq->sequence & (eq->size - 1);
-        ptl_event_t *new_event = &eq->base[new_index];
-        ENTRY;
-
-        CDEBUG(D_INFO, "new_event: %p, sequence: %lu, eq->size: %u\n",
-               new_event, eq->sequence, eq->size);
-
-        if (PTL_SEQ_GT (eq->sequence, new_event->sequence)) {
-                RETURN(PTL_EQ_EMPTY);
-        }
-
-        *ev = *new_event;
-
-        /* ensure event is delivered correctly despite possible 
-           races with lib_finalize */
-        if (eq->sequence != new_event->sequence) {
-                CERROR("DROPPING EVENT: eq seq %lu ev seq %lu\n",
-                       eq->sequence, new_event->sequence);
-                RETURN(PTL_EQ_DROPPED);
-        }
-
-        eq->sequence = new_event->sequence + 1;
-        RETURN(PTL_OK);
-}
-
-int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev)
-{
-        int which;
-        
-        return (PtlEQPoll (&eventq, 1, 0, ev, &which));
-}
-
-int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
-{
-        int which;
-        
-        return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER, 
-                           event_out, &which));
-}
-
-int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
-              ptl_event_t *event_out, int *which_out)
-{
-        nal_t        *nal;
-        int           i;
-        int           rc;
-        unsigned long flags;
-        
-        if (!ptl_init)
-                RETURN(PTL_NO_INIT);
-
-        if (neq_in < 1)
-                RETURN(PTL_EQ_INVALID);
-        
-        nal = ptl_hndl2nal(&eventqs_in[0]);
-        if (nal == NULL)
-                RETURN(PTL_EQ_INVALID);
-
-        nal->lock(nal, &flags);
-
-        for (;;) {
-                for (i = 0; i < neq_in; i++) {
-                        ptl_eq_t *eq = ptl_handle2usereq(&eventqs_in[i]);
-
-                        if (i > 0 &&
-                            ptl_hndl2nal(&eventqs_in[i]) != nal) {
-                                nal->unlock(nal, &flags);
-                                RETURN (PTL_EQ_INVALID);
-                        }
-
-                        /* size must be a power of 2 to handle a wrapped sequence # */
-                        LASSERT (eq->size != 0 &&
-                                 eq->size == LOWEST_BIT_SET (eq->size));
-
-                        rc = ptl_get_event (eq, event_out);
-                        if (rc != PTL_EQ_EMPTY) {
-                                nal->unlock(nal, &flags);
-                                *which_out = i;
-                                RETURN(rc);
-                        }
-                }
-                
-                if (timeout == 0) {
-                        nal->unlock(nal, &flags);
-                        RETURN (PTL_EQ_EMPTY);
-                }
-                        
-                timeout = nal->yield(nal, &flags, timeout);
-        }
-}
index 1c01c88..9a4e5ac 100644 (file)
@@ -40,6 +40,9 @@ const char *ptl_err_str[] = {
 
         "PTL_EQ_IN_USE",
 
+        "PTL_NI_INVALID",
+        "PTL_MD_ILLEGAL",
+
         "PTL_MAX_ERRNO"
 };
 /* If you change these, you must update the number table in portals/errno.h */
diff --git a/lnet/lnet/api-init.c b/lnet/lnet/api-init.c
deleted file mode 100644 (file)
index 9a98714..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-init.c
- * Initialization and global data for the p30 user side library
- *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
- *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
-int PtlInit(int *max_interfaces)
-{
-        if (max_interfaces != NULL)
-                *max_interfaces = NAL_MAX_NR;
-
-        LASSERT(!strcmp(ptl_err_str[PTL_MAX_ERRNO], "PTL_MAX_ERRNO"));
-
-        return ptl_ni_init();
-}
-
-
-void PtlFini(void)
-{
-        ptl_ni_fini();
-}
-
-
-void PtlSnprintHandle(char *str, int len, ptl_handle_any_t h)
-{
-        snprintf(str, len, "0x%lx."LPX64, h.nal_idx, h.cookie);
-}
diff --git a/lnet/lnet/api-me.c b/lnet/lnet/api-me.c
deleted file mode 100644 (file)
index 37f0150..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-me.c
- * Match Entry local operations.
- *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
- *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
index 4f37d13..56afd45 100644 (file)
@@ -66,6 +66,8 @@ nal_t *ptl_hndl2nal(ptl_handle_any_t *handle)
          * invalidated out from under her (or worse, swapped for a
          * completely different interface!) */
 
+        LASSERT (ptl_init);
+
         if (((idx ^ NI_HANDLE_MAGIC) & ~NI_HANDLE_MASK) != 0)
                 return NULL;
 
@@ -112,12 +114,17 @@ void ptl_unregister_nal (ptl_interface_t interface)
         ptl_mutex_exit();
 }
 
-int ptl_ni_init(void)
+int PtlInit(int *max_interfaces)
 {
+        LASSERT(!strcmp(ptl_err_str[PTL_MAX_ERRNO], "PTL_MAX_ERRNO"));
+
         /* If this assertion fails, we need more bits in NI_HANDLE_MASK and
          * to shift NI_HANDLE_MAGIC left appropriately */
         LASSERT (NAL_MAX_NR <= (NI_HANDLE_MASK + 1));
         
+        if (max_interfaces != NULL)
+                *max_interfaces = NAL_MAX_NR;
+
         ptl_mutex_enter();
 
         if (!ptl_init) {
@@ -143,7 +150,7 @@ int ptl_ni_init(void)
         return PTL_OK;
 }
 
-void ptl_ni_fini(void)
+void PtlFini(void)
 {
         nal_t  *nal;
         int     i;
@@ -160,7 +167,7 @@ void ptl_ni_fini(void)
                         if (nal->nal_refct != 0) {
                                 CWARN("NAL %d has outstanding refcount %d\n",
                                       i, nal->nal_refct);
-                                nal->shutdown(nal);
+                                nal->nal_ni_fini(nal);
                         }
                         
                         ptl_nal_table[i] = NULL;
@@ -202,9 +209,11 @@ int PtlNIInit(ptl_interface_t interface, ptl_pid_t requested_pid,
         }
 
         nal = ptl_nal_table[interface];
-
+        nal->nal_handle.nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | interface;
+        nal->nal_handle.cookie = 0;
+        
         CDEBUG(D_OTHER, "Starting up NAL (%d) refs %d\n", interface, nal->nal_refct);
-        rc = nal->startup(nal, requested_pid, desired_limits, actual_limits);
+        rc = nal->nal_ni_init(nal, requested_pid, desired_limits, actual_limits);
 
         if (rc != PTL_OK) {
                 CERROR("Error %d starting up NAL %d, refs %d\n", rc,
@@ -218,10 +227,11 @@ int PtlNIInit(ptl_interface_t interface, ptl_pid_t requested_pid,
         }
         
         nal->nal_refct++;
-        handle->nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | interface;
+        *handle = nal->nal_handle;
 
  out:
         ptl_mutex_exit ();
+
         return rc;
 }
 
@@ -248,15 +258,8 @@ int PtlNIFini(ptl_handle_ni_t ni)
         nal->nal_refct--;
 
         /* nal_refct == 0 tells nal->shutdown to really shut down */
-        nal->shutdown(nal);
+        nal->nal_ni_fini(nal);
 
         ptl_mutex_exit ();
         return PTL_OK;
 }
-
-int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * ni_out)
-{
-        *ni_out = handle_in;
-
-        return PTL_OK;
-}
index 3e6f9ce..d7ff020 100644 (file)
 # define DEBUG_SUBSYSTEM S_PORTALS
 #include <portals/api-support.h>
 
-static int do_forward(ptl_handle_any_t any_h, int cmd, void *argbuf,
-                      int argsize, void *retbuf, int retsize)
+void PtlSnprintHandle(char *str, int len, ptl_handle_any_t h)
 {
-        nal_t *nal;
+        snprintf(str, len, "0x%lx."LPX64, h.nal_idx, h.cookie);
+}
 
-        if (!ptl_init) {
-                CERROR("Not initialized\n");
+int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t *ni_out)
+{
+        if (!ptl_init)
                 return PTL_NO_INIT;
-        }
-
-        nal = ptl_hndl2nal(&any_h);
-        if (!nal)
+        
+        if (ptl_hndl2nal(&handle_in) == NULL)
                 return PTL_HANDLE_INVALID;
-
-        nal->forward(nal, cmd, argbuf, argsize, retbuf, retsize);
-
+        
+        *ni_out = handle_in;
         return PTL_OK;
 }
 
 int PtlGetId(ptl_handle_ni_t ni_handle, ptl_process_id_t *id)
 {
-        PtlGetId_in args;
-        PtlGetId_out ret;
-        int rc;
-
-        args.handle_in = ni_handle;
+        nal_t     *nal;
 
-        rc = do_forward(ni_handle, PTL_GETID, &args, sizeof(args), &ret,
-                        sizeof(ret));
-        if (rc != PTL_OK)
-                return rc;
+        if (!ptl_init)
+                return PTL_NO_INIT;
         
-        if (id)
-                *id = ret.id_out;
+        nal = ptl_hndl2nal(&ni_handle);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        return ret.rc;
+        return nal->nal_get_id(nal, id);
 }
 
 int PtlFailNid (ptl_handle_ni_t interface, ptl_nid_t nid, unsigned int threshold) 
 {
-        PtlFailNid_in  args;
-        PtlFailNid_out ret;
-        int            rc;
-        
-        args.interface = interface;
-        args.nid       = nid;
-        args.threshold = threshold;
+        nal_t     *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
         
-        rc = do_forward (interface, PTL_FAILNID, 
-                         &args, sizeof(args), &ret, sizeof (ret));
+        nal = ptl_hndl2nal(&interface);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        return ((rc != PTL_OK) ? rc : ret.rc);
+        return nal->nal_fail_nid(nal, nid, threshold);
 }
 
 int PtlNIStatus(ptl_handle_ni_t interface_in, ptl_sr_index_t register_in,
-                ptl_sr_value_t * status_out)
+                ptl_sr_value_t *status_out)
 {
-        PtlNIStatus_in args;
-        PtlNIStatus_out ret;
-        int rc;
+        nal_t     *nal;
 
-        args.interface_in = interface_in;
-        args.register_in = register_in;
-
-        rc = do_forward(interface_in, PTL_NISTATUS, &args, sizeof(args), &ret,
-                        sizeof(ret));
-
-        if (rc != PTL_OK)
-                return rc;
-
-        if (status_out)
-                *status_out = ret.status_out;
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&interface_in);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        return ret.rc;
+        return nal->nal_ni_status(nal, register_in, status_out);
 }
 
 int PtlNIDist(ptl_handle_ni_t interface_in, ptl_process_id_t process_in,
               unsigned long *distance_out)
 {
-        PtlNIDist_in args;
-        PtlNIDist_out ret;
-        int rc;
-
-        args.interface_in = interface_in;
-        args.process_in = process_in;
-
-        rc = do_forward(interface_in, PTL_NIDIST, &args, sizeof(args), &ret,
-                        sizeof(ret));
+        nal_t     *nal;
 
-        if (rc != PTL_OK)
-                return rc;
-
-        if (distance_out)
-                *distance_out = ret.distance_out;
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&interface_in);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        return ret.rc;
+        return nal->nal_ni_dist(nal, &process_in, distance_out);
 }
 
 int PtlMEAttach(ptl_handle_ni_t interface_in, ptl_pt_index_t index_in,
                 ptl_process_id_t match_id_in, ptl_match_bits_t match_bits_in,
                 ptl_match_bits_t ignore_bits_in, ptl_unlink_t unlink_in,
-                ptl_ins_pos_t pos_in, ptl_handle_me_t * handle_out)
+                ptl_ins_pos_t pos_in, ptl_handle_me_t *handle_out)
 {
-        PtlMEAttach_in args;
-        PtlMEAttach_out ret;
-        int rc;
-
-        args.interface_in = interface_in;
-        args.index_in = index_in;
-        args.match_id_in = match_id_in;
-        args.match_bits_in = match_bits_in;
-        args.ignore_bits_in = ignore_bits_in;
-        args.unlink_in = unlink_in;
-        args.position_in = pos_in;
-
-        rc = do_forward(interface_in, PTL_MEATTACH, &args, sizeof(args), &ret,
-                        sizeof(ret));
-
-        if (rc != PTL_OK)
-                return rc;
-
-        if (handle_out) {
-                handle_out->nal_idx = interface_in.nal_idx;
-                handle_out->cookie = ret.handle_out.cookie;
-        }
-
-        return ret.rc;
+        nal_t     *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&interface_in);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
+
+        return nal->nal_me_attach(nal, index_in, match_id_in, 
+                                  match_bits_in, ignore_bits_in,
+                                  unlink_in, pos_in, handle_out);
 }
 
 int PtlMEInsert(ptl_handle_me_t current_in, ptl_process_id_t match_id_in,
@@ -160,367 +125,226 @@ int PtlMEInsert(ptl_handle_me_t current_in, ptl_process_id_t match_id_in,
                 ptl_unlink_t unlink_in, ptl_ins_pos_t position_in,
                 ptl_handle_me_t * handle_out)
 {
-        PtlMEInsert_in args;
-        PtlMEInsert_out ret;
-        int rc;
-
-        args.current_in = current_in;
-        args.match_id_in = match_id_in;
-        args.match_bits_in = match_bits_in;
-        args.ignore_bits_in = ignore_bits_in;
-        args.unlink_in = unlink_in;
-        args.position_in = position_in;
-
-        rc = do_forward(current_in, PTL_MEINSERT, &args, sizeof(args), &ret,
-                        sizeof(ret));
-
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
-
-        if (handle_out) {
-                handle_out->nal_idx = current_in.nal_idx;
-                handle_out->cookie = ret.handle_out.cookie;
-        }
-        return ret.rc;
+        nal_t     *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&current_in);
+        if (nal == NULL)
+                return PTL_ME_INVALID;
+
+        return nal->nal_me_insert(nal, &current_in, match_id_in,
+                                  match_bits_in, ignore_bits_in,
+                                  unlink_in, position_in, handle_out);
 }
 
 int PtlMEUnlink(ptl_handle_me_t current_in)
 {
-        PtlMEUnlink_in args;
-        PtlMEUnlink_out ret;
-        int rc;
+        nal_t     *nal;
 
-        args.current_in = current_in;
-        args.unlink_in = PTL_RETAIN;
-
-        rc = do_forward(current_in, PTL_MEUNLINK, &args, sizeof(args), &ret,
-                        sizeof(ret));
-
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&current_in);
+        if (nal == NULL)
+                return PTL_ME_INVALID;
 
-        return ret.rc;
+        return nal->nal_me_unlink(nal, &current_in);
 }
 
-int PtlTblDump(ptl_handle_ni_t ni, int index_in)
+int PtlMDAttach(ptl_handle_me_t me_in, ptl_md_t md_in,
+                ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
 {
-        PtlTblDump_in args;
-        PtlTblDump_out ret;
-        int rc;
+        nal_t     *nal;
 
-        args.index_in = index_in;
-
-        rc = do_forward(ni, PTL_TBLDUMP, &args, sizeof(args), &ret,
-                        sizeof(ret));
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&me_in);
+        if (nal == NULL)
+                return PTL_ME_INVALID;
 
-        if (rc != PTL_OK)
-                return rc;
+        if (!PtlHandleIsEqual(md_in.eventq, PTL_EQ_NONE) &&
+            ptl_hndl2nal(&md_in.eventq) != nal)
+                return PTL_MD_ILLEGAL;
 
-        return ret.rc;
+        return (nal->nal_md_attach)(nal, &me_in, &md_in, 
+                                    unlink_in, handle_out);
 }
 
-int PtlMEDump(ptl_handle_me_t current_in)
+int PtlMDBind(ptl_handle_ni_t ni_in, ptl_md_t md_in,
+              ptl_unlink_t unlink_in, ptl_handle_md_t *handle_out)
 {
-        PtlMEDump_in args;
-        PtlMEDump_out ret;
-        int rc;
+        nal_t     *nal;
 
-        args.current_in = current_in;
-
-        rc = do_forward(current_in, PTL_MEDUMP, &args, sizeof(args), &ret,
-                        sizeof(ret));
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&ni_in);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
+        if (!PtlHandleIsEqual(md_in.eventq, PTL_EQ_NONE) &&
+            ptl_hndl2nal(&md_in.eventq) != nal)
+                return PTL_MD_ILLEGAL;
 
-        return ret.rc;
+        return (nal->nal_md_bind)(nal, &md_in, unlink_in, handle_out);
 }
 
-static ptl_handle_eq_t md2eq (ptl_md_t *md)
+int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t *old_inout,
+                ptl_md_t *new_inout, ptl_handle_eq_t testq_in)
 {
-        if (PtlHandleIsEqual (md->eventq, PTL_EQ_NONE))
-                return (PTL_EQ_NONE);
+        nal_t    *nal;
         
-        return (ptl_handle2usereq (&md->eventq)->cb_eq_handle);
-}
-
-
-int PtlMDAttach(ptl_handle_me_t me_in, ptl_md_t md_in,
-                ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
-{
-        PtlMDAttach_in args;
-        PtlMDAttach_out ret;
-        int rc;
-
-        args.eq_in = md2eq(&md_in);
-        args.me_in = me_in;
-        args.md_in = md_in;
-        args.unlink_in = unlink_in;
-                
-        rc = do_forward(me_in, PTL_MDATTACH, 
-                        &args, sizeof(args), &ret, sizeof(ret));
-
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
-
-        if (handle_out) {
-                handle_out->nal_idx = me_in.nal_idx;
-                handle_out->cookie = ret.handle_out.cookie;
-        }
-        return ret.rc;
-}
-
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&md_in);
+        if (nal == NULL)
+                return PTL_MD_INVALID;
 
+        if (!PtlHandleIsEqual(testq_in, PTL_EQ_NONE) &&
+            ptl_hndl2nal(&testq_in) != nal)
+                return PTL_EQ_INVALID;
 
-int PtlMDBind(ptl_handle_ni_t ni_in, ptl_md_t md_in,
-              ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
-{
-        PtlMDBind_in args;
-        PtlMDBind_out ret;
-        int rc;
-
-        args.eq_in = md2eq(&md_in);
-        args.ni_in = ni_in;
-        args.md_in = md_in;
-        args.unlink_in = unlink_in;
-
-        rc = do_forward(ni_in, PTL_MDBIND, 
-                        &args, sizeof(args), &ret, sizeof(ret));
-
-        if (rc != PTL_OK)
-                return rc;
-
-        if (handle_out) {
-                handle_out->nal_idx = ni_in.nal_idx;
-                handle_out->cookie = ret.handle_out.cookie;
-        }
-        return ret.rc;
+        return (nal->nal_md_update)(nal, &md_in, 
+                                    old_inout, new_inout, &testq_in);
 }
 
-int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t *old_inout,
-                ptl_md_t *new_inout, ptl_handle_eq_t testq_in)
+int PtlMDUnlink(ptl_handle_md_t md_in)
 {
-        PtlMDUpdate_internal_in args;
-        PtlMDUpdate_internal_out ret;
-        int rc;
-
-        args.md_in = md_in;
-
-        if (old_inout) {
-                args.old_inout = *old_inout;
-                args.old_inout_valid = 1;
-        } else
-                args.old_inout_valid = 0;
-
-        if (new_inout) {
-                args.new_inout = *new_inout;
-                args.new_inout_valid = 1;
-        } else
-                args.new_inout_valid = 0;
-
-        if (PtlHandleIsEqual (testq_in, PTL_EQ_NONE)) {
-                args.testq_in = PTL_EQ_NONE;
-                args.sequence_in = -1;
-        } else {
-                ptl_eq_t *eq = ptl_handle2usereq (&testq_in);
-                
-                args.testq_in = eq->cb_eq_handle;
-                args.sequence_in = eq->sequence;
-        }
-
-        rc = do_forward(md_in, PTL_MDUPDATE, &args, sizeof(args), &ret,
-                        sizeof(ret));
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_MD_INVALID : rc;
-
-        if (old_inout)
-                *old_inout = ret.old_inout;
-
-        return ret.rc;
+        nal_t    *nal;
+        
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&md_in);
+        if (nal == NULL)
+                return PTL_MD_INVALID;
+        
+        return (nal->nal_md_unlink)(nal, &md_in);
 }
 
-int PtlMDUnlink(ptl_handle_md_t md_in)
+int PtlEQAlloc(ptl_handle_ni_t interface, ptl_size_t count,
+               ptl_eq_handler_t callback,
+               ptl_handle_eq_t *handle_out)
 {
-        PtlMDUnlink_in args;
-        PtlMDUnlink_out ret;
-        int rc;
-
-        args.md_in = md_in;
-        rc = do_forward(md_in, PTL_MDUNLINK, &args, sizeof(args), &ret,
-                        sizeof(ret));
-        if (rc != PTL_OK)
-                return (rc == PTL_HANDLE_INVALID) ? PTL_MD_INVALID : rc;
+        nal_t    *nal;
+        
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&interface);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
 
-        return ret.rc;
+        return (nal->nal_eq_alloc)(nal, count, callback, handle_out);
 }
 
-int PtlEQAlloc(ptl_handle_ni_t interface, ptl_size_t count,
-               ptl_eq_handler_t callback,
-               ptl_handle_eq_t * handle_out)
+int PtlEQFree(ptl_handle_eq_t eventq)
 {
-        ptl_eq_t *eq = NULL;
-        ptl_event_t *ev = NULL;
-        PtlEQAlloc_in args;
-        PtlEQAlloc_out ret;
-        int rc, i;
-        nal_t *nal;
+        nal_t       *nal;
 
         if (!ptl_init)
                 return PTL_NO_INIT;
         
-        nal = ptl_hndl2nal (&interface);
+        nal = ptl_hndl2nal(&eventq);
         if (nal == NULL)
-                return PTL_HANDLE_INVALID;
+                return PTL_EQ_INVALID;
 
-        if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
-                do {                    /* knock off all but the top bit... */
-                        count &= ~LOWEST_BIT_SET (count);
-                } while (count != LOWEST_BIT_SET(count));
-
-                count <<= 1;                             /* ...and round up */
-        }
-
-        if (count == 0)        /* catch bad parameter / overflow on roundup */
-                return (PTL_VAL_FAILED);
-
-        PORTAL_ALLOC(ev, count * sizeof(ptl_event_t));
-        if (!ev)
-                return PTL_NO_SPACE;
-
-        for (i = 0; i < count; i++)
-                ev[i].sequence = 0;
-
-        args.ni_in = interface;
-        args.count_in = count;
-        args.base_in = ev;
-        args.len_in = count * sizeof(*ev);
-        args.callback_in = callback;
-
-        rc = do_forward(interface, PTL_EQALLOC, &args, sizeof(args), &ret,
-                        sizeof(ret));
-        if (rc != PTL_OK)
-                goto fail;
-        if (ret.rc)
-                GOTO(fail, rc = ret.rc);
-
-        PORTAL_ALLOC(eq, sizeof(*eq));
-        if (!eq) {
-                rc = PTL_NO_SPACE;
-                goto fail;
-        }
-
-        eq->sequence = 1;
-        eq->size = count;
-        eq->base = ev;
-
-        /* EQ handles are a little wierd.  PtlEQGet() just looks at the
-         * queued events in shared memory.  It doesn't want to do_forward()
-         * at all, so the cookie in the EQ handle we pass out of here is
-         * simply a pointer to the event queue we just set up.  We stash
-         * the handle returned by do_forward(), so we can pass it back via
-         * do_forward() when we need to. */
-
-        eq->cb_eq_handle.nal_idx = interface.nal_idx;
-        eq->cb_eq_handle.cookie = ret.handle_out.cookie;
-
-        handle_out->nal_idx = interface.nal_idx;
-        handle_out->cookie = (__u64)((unsigned long)eq);
-        return PTL_OK;
+        return (nal->nal_eq_free)(nal, &eventq);
+}
 
-fail:
-        PORTAL_FREE(ev, count * sizeof(ptl_event_t));
-        return rc;
+int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t *ev)
+{
+        int which;
+        
+        return (PtlEQPoll (&eventq, 1, 0, ev, &which));
 }
 
-int PtlEQFree(ptl_handle_eq_t eventq)
+int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
 {
-        PtlEQFree_in args;
-        PtlEQFree_out ret;
-        ptl_eq_t *eq;
-        int rc;
+        int which;
+        
+        return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER, 
+                           event_out, &which));
+}
 
-        eq = ptl_handle2usereq (&eventq);
-        args.eventq_in = eq->cb_eq_handle;
+int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
+              ptl_event_t *event_out, int *which_out)
+{
+        int           i;
+        nal_t        *nal;
 
-        rc = do_forward(eq->cb_eq_handle, PTL_EQFREE, &args,
-                        sizeof(args), &ret, sizeof(ret));
+        if (!ptl_init)
+                return PTL_NO_INIT;
+
+        if (neq_in < 1)
+                return PTL_EQ_INVALID;
+
+        nal = ptl_hndl2nal(&eventqs_in[0]);
+        if (nal == NULL)
+                return PTL_EQ_INVALID;
 
-        /* XXX we're betting rc == PTL_OK here */
-        PORTAL_FREE(eq->base, eq->size * sizeof(ptl_event_t));
-        PORTAL_FREE(eq, sizeof(*eq));
+        for (i = 1; i < neq_in; i++)
+                if (ptl_hndl2nal(&eventqs_in[i]) != nal)
+                        return PTL_EQ_INVALID;
 
-        return rc;
+        return (nal->nal_eq_poll)(nal, eventqs_in, neq_in, timeout,
+                                  event_out, which_out);
 }
 
+
 int PtlACEntry(ptl_handle_ni_t ni_in, ptl_ac_index_t index_in,
                ptl_process_id_t match_id_in, ptl_pt_index_t portal_in)
 {
-        PtlACEntry_in args;
-        PtlACEntry_out ret;
-        int rc;
-
-        /*
-         * Copy arguments into the argument block to
-         * hand to the forwarding object
-         */
-        args.ni_in = ni_in;
-        args.index_in = index_in;
-        args.match_id_in = match_id_in;
-        args.portal_in = portal_in;
-
-        rc = do_forward(ni_in, PTL_ACENTRY, &args, sizeof(args), &ret,
-                        sizeof(ret));
-
-        return (rc != PTL_OK) ? rc : ret.rc;
+        nal_t    *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&ni_in);
+        if (nal == NULL)
+                return PTL_NI_INVALID;
+        
+        return (nal->nal_ace_entry)(nal, index_in, match_id_in, portal_in);
 }
 
 int PtlPut(ptl_handle_md_t md_in, ptl_ack_req_t ack_req_in,
            ptl_process_id_t target_in, ptl_pt_index_t portal_in,
-           ptl_ac_index_t cookie_in, ptl_match_bits_t match_bits_in,
+           ptl_ac_index_t ac_in, ptl_match_bits_t match_bits_in,
            ptl_size_t offset_in, ptl_hdr_data_t hdr_data_in)
 {
-        PtlPut_in args;
-        PtlPut_out ret;
-        int rc;
-
-        /*
-         * Copy arguments into the argument block to
-         * hand to the forwarding object
-         */
-        args.md_in = md_in;
-        args.ack_req_in = ack_req_in;
-        args.target_in = target_in;
-        args.portal_in = portal_in;
-        args.cookie_in = cookie_in;
-        args.match_bits_in = match_bits_in;
-        args.offset_in = offset_in;
-        args.hdr_data_in = hdr_data_in;
-
-        rc = do_forward(md_in, PTL_PUT, &args, sizeof(args), &ret, sizeof(ret));
-
-        return (rc != PTL_OK) ? rc : ret.rc;
+        nal_t    *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+        
+        nal = ptl_hndl2nal(&md_in);
+        if (nal == NULL)
+                return PTL_MD_INVALID;
+
+        return (nal->nal_put)(nal, &md_in, ack_req_in,
+                              &target_in, portal_in, ac_in,
+                              match_bits_in, offset_in, hdr_data_in);
 }
 
 int PtlGet(ptl_handle_md_t md_in, ptl_process_id_t target_in,
-           ptl_pt_index_t portal_in, ptl_ac_index_t cookie_in,
+           ptl_pt_index_t portal_in, ptl_ac_index_t ac_in,
            ptl_match_bits_t match_bits_in, ptl_size_t offset_in)
 {
-        PtlGet_in args;
-        PtlGet_out ret;
-        int rc;
-
-        /*
-         * Copy arguments into the argument block to
-         * hand to the forwarding object
-         */
-        args.md_in = md_in;
-        args.target_in = target_in;
-        args.portal_in = portal_in;
-        args.cookie_in = cookie_in;
-        args.match_bits_in = match_bits_in;
-        args.offset_in = offset_in;
-
-        rc = do_forward(md_in, PTL_GET, &args, sizeof(args), &ret, sizeof(ret));
-
-        return (rc != PTL_OK) ? rc : ret.rc;
+        nal_t  *nal;
+
+        if (!ptl_init)
+                return PTL_NO_INIT;
+
+        nal = ptl_hndl2nal(&md_in);
+        if (nal == NULL)
+                return PTL_MD_INVALID;
+
+        return (nal->nal_get)(nal, &md_in, 
+                              &target_in, portal_in, ac_in,
+                              match_bits_in, offset_in);
 }
+
index bf7a107..285f8fe 100644 (file)
@@ -3,8 +3,8 @@
 # This code is issued under the GNU General Public License.
 # See the file COPYING in this distribution
 
-my_sources = api-eq.c api-init.c api-me.c api-errno.c api-ni.c api-wrap.c \
-               lib-dispatch.c lib-init.c lib-me.c lib-msg.c lib-eq.c \
+my_sources =    api-errno.c api-ni.c api-wrap.c \
+               lib-init.c lib-me.c lib-msg.c lib-eq.c \
                lib-md.c lib-move.c lib-ni.c lib-pid.c
 
 if !CRAY_PORTALS
diff --git a/lnet/lnet/lib-dispatch.c b/lnet/lnet/lib-dispatch.c
deleted file mode 100644 (file)
index 798e117..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * lib/lib-dispatch.c
- *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
- *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/lib-p30.h>
-#include <portals/lib-dispatch.h>
-
-typedef struct {
-        int (*fun) (nal_cb_t * nal, void *private, void *in, void *out);
-        char *name;
-} dispatch_table_t;
-
-static dispatch_table_t dispatch_table[] = {
-        [PTL_GETID] {do_PtlGetId, "PtlGetId"},
-        [PTL_NISTATUS] {do_PtlNIStatus, "PtlNIStatus"},
-      &