Index: linux-stage/fs/ext3/Makefile
===================================================================
---- linux-stage.orig/fs/ext3/Makefile 2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/Makefile 2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/Makefile 2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/fs/ext3/Makefile 2004-05-11 17:21:21.000000000 -0400
@@ -4,7 +4,7 @@
obj-$(CONFIG_EXT3_FS) += ext3.o
ext3-$(CONFIG_EXT3_FS_XATTR) += xattr.o xattr_user.o xattr_trusted.o
Index: linux-stage/fs/ext3/inode.c
===================================================================
---- linux-stage.orig/fs/ext3/inode.c 2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/inode.c 2004-05-07 17:21:59.000000000 -0400
+--- linux-stage.orig/fs/ext3/inode.c 2004-05-11 17:21:21.000000000 -0400
++++ linux-stage/fs/ext3/inode.c 2004-05-11 17:21:21.000000000 -0400
@@ -37,6 +37,7 @@
#include <linux/mpage.h>
#include <linux/uio.h>
bh = iloc.bh;
Index: linux-stage/fs/ext3/iopen.c
===================================================================
---- linux-stage.orig/fs/ext3/iopen.c 2004-05-07 16:00:17.000000000 -0400
-+++ linux-stage/fs/ext3/iopen.c 2004-05-07 17:22:37.000000000 -0400
+--- linux-stage.orig/fs/ext3/iopen.c 1969-12-31 19:00:00.000000000 -0500
++++ linux-stage/fs/ext3/iopen.c 2004-05-11 17:21:21.000000000 -0400
@@ -0,0 +1,272 @@
+/*
+ * linux/fs/ext3/iopen.c
+}
Index: linux-stage/fs/ext3/iopen.h
===================================================================
---- linux-stage.orig/fs/ext3/iopen.h 2004-05-07 16:00:17.000000000 -0400
-+++ linux-stage/fs/ext3/iopen.h 2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/iopen.h 1969-12-31 19:00:00.000000000 -0500
++++ linux-stage/fs/ext3/iopen.h 2004-05-11 17:21:21.000000000 -0400
@@ -0,0 +1,15 @@
+/*
+ * iopen.h
+ struct inode *inode, int rehash);
Index: linux-stage/fs/ext3/namei.c
===================================================================
---- linux-stage.orig/fs/ext3/namei.c 2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/namei.c 2004-05-07 16:00:17.000000000 -0400
+--- linux-stage.orig/fs/ext3/namei.c 2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/fs/ext3/namei.c 2004-05-11 17:21:21.000000000 -0400
@@ -37,6 +37,7 @@
#include <linux/buffer_head.h>
#include <linux/smp_lock.h>
}
Index: linux-stage/fs/ext3/super.c
===================================================================
---- linux-stage.orig/fs/ext3/super.c 2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/fs/ext3/super.c 2004-05-07 17:21:59.000000000 -0400
+--- linux-stage.orig/fs/ext3/super.c 2004-05-11 17:21:21.000000000 -0400
++++ linux-stage/fs/ext3/super.c 2004-05-11 17:44:53.000000000 -0400
@@ -536,7 +536,7 @@
Opt_user_xattr, Opt_nouser_xattr, Opt_acl, Opt_noacl, Opt_noload,
Opt_commit, Opt_journal_update, Opt_journal_inum,
Opt_abort, Opt_data_journal, Opt_data_ordered, Opt_data_writeback,
-- Opt_ignore, Opt_err,
-+ Opt_ignore, Opt_err, Opt_iopen, Opt_noiopen, Opt_iopen_nopriv,
+- Opt_ignore, Opt_barrier,
++ Opt_ignore, Opt_barrier, Opt_iopen, Opt_noiopen, Opt_iopen_nopriv,
+ Opt_err,
};
- static match_table_t tokens = {
-@@ -575,6 +575,9 @@
- {Opt_ignore, "noquota"},
+@@ -577,6 +577,9 @@
{Opt_ignore, "quota"},
{Opt_ignore, "usrquota"},
-+ {Opt_iopen, "iopen"},
-+ {Opt_noiopen, "noiopen"},
-+ {Opt_iopen_nopriv, "iopen_nopriv"},
+ {Opt_barrier, "barrier=%u"},
++ {Opt_iopen, "iopen"},
++ {Opt_noiopen, "noiopen"},
++ {Opt_iopen_nopriv, "iopen_nopriv"},
{Opt_err, NULL}
};
-@@ -762,6 +765,18 @@
- case Opt_abort:
- set_opt(sbi->s_mount_opt, ABORT);
+@@ -772,6 +775,18 @@
+ else
+ clear_opt(sbi->s_mount_opt, BARRIER);
break;
+ case Opt_iopen:
+ set_opt (sbi->s_mount_opt, IOPEN);
default:
Index: linux-stage/include/linux/ext3_fs.h
===================================================================
---- linux-stage.orig/include/linux/ext3_fs.h 2004-05-07 16:00:16.000000000 -0400
-+++ linux-stage/include/linux/ext3_fs.h 2004-05-07 16:00:17.000000000 -0400
-@@ -325,6 +325,8 @@
- #define EXT3_MOUNT_NO_UID32 0x2000 /* Disable 32-bit UIDs */
+--- linux-stage.orig/include/linux/ext3_fs.h 2004-05-11 17:21:20.000000000 -0400
++++ linux-stage/include/linux/ext3_fs.h 2004-05-11 17:21:21.000000000 -0400
+@@ -326,6 +326,8 @@
#define EXT3_MOUNT_XATTR_USER 0x4000 /* Extended user attributes */
#define EXT3_MOUNT_POSIX_ACL 0x8000 /* POSIX Access Control Lists */
-+#define EXT3_MOUNT_IOPEN 0x10000 /* Allow access via iopen */
-+#define EXT3_MOUNT_IOPEN_NOPRIV 0x20000 /* Make iopen world-readable */
+ #define EXT3_MOUNT_BARRIER 0x10000 /* Use block barriers */
++#define EXT3_MOUNT_IOPEN 0x20000 /* Allow access via iopen */
++#define EXT3_MOUNT_IOPEN_NOPRIV 0x40000 /* Make iopen world-readable */
/* Compatibility, for having both ext2_fs.h and ext3_fs.h included at once */
#ifndef _LINUX_EXT2_FS_H
ext3-ea-in-inode-2.6-suse.patch
export-ext3-2.6-suse.patch
ext3-include-fixes-2.6-suse.patch
+ext3-htree-rename_fix.patch
+if MODULES
if LDISKFS
modulefs_DATA = ldiskfs$(KMODEXT)
endif
+endif
ldiskfs_linux_headers := $(addprefix linux/,$(subst ext3,ldiskfs,$(notdir $(linux_headers))))
fi
LUSTRE_MODULE_TRY_MAKE(
[#include <linux/version.h>],
- [LINUXRELEASE=UTS_RELEASE],
+ [char *LINUXRELEASE;
+ LINUXRELEASE=UTS_RELEASE;],
[$makerule LUSTRE_KERNEL_TEST=conftest.i],
[test -s kernel-tests/conftest.i],
[
# LINUXRELEASE="UTS_RELEASE"
- eval $(grep LINUXRELEASE kernel-tests/conftest.i)
+ eval $(grep "LINUXRELEASE=" kernel-tests/conftest.i)
],[
AC_MSG_RESULT([unknown])
AC_MSG_ERROR([Could not preprocess test program. Consult config.log for details.])
#include <linux/libcfs.h>
#define PORTAL_DEBUG
-#ifndef offsetof
-# define offsetof(typ,memb) ((unsigned long)((char *)&(((typ *)0)->memb)))
-#endif
-
-#define LOWEST_BIT_SET(x) ((x) & ~((x) - 1))
-
#ifdef __KERNEL__
# include <linux/vmalloc.h>
# include <linux/time.h>
TCPNAL = 5,
ROUTER = 6,
IBNAL = 7,
- CRAY_KB_ERNAL = 8,
NAL_ENUM_END_MARKER
};
#ifndef _KPR_H
#define _KPR_H
-# include <portals/lib-nal.h> /* for ptl_hdr_t */
+# include <portals/lib-types.h> /* for ptl_hdr_t */
/******************************************************************************/
/* Kernel Portals Router interface */
#define S_PTLROUTER 0x00100000
#define S_COBD 0x00200000
#define S_IBNAL 0x00400000
-#define S_LMV 0x00800000
-#define S_SM 0x01000000
-#define S_CMOBD 0x02000000
+#define S_SM 0x00800000
+#define S_ASOBD 0x01000000
+#define S_LMV 0x02000000
+#define S_CMOBD 0x04000000
+
/* If you change these values, please keep portals/utils/debug.c
* up to date! */
#endif
#ifdef __KERNEL__
+# define NTOH__u16(var) le16_to_cpu(var)
# define NTOH__u32(var) le32_to_cpu(var)
# define NTOH__u64(var) le64_to_cpu(var)
+# define HTON__u16(var) cpu_to_le16(var)
# define HTON__u32(var) cpu_to_le32(var)
# define HTON__u64(var) cpu_to_le64(var)
#else
}; \
(ret); \
})
+# define NTOH__u16(var) (var)
# define NTOH__u32(var) (var)
# define NTOH__u64(var) (expansion_u64(var))
+# define HTON__u16(var) (var)
# define HTON__u32(var) (var)
# define HTON__u64(var) (expansion_u64(var))
#endif
#include <portals/internal.h>
#include <portals/nal.h>
-#include <portals/arg-blocks.h>
-/* Hack for 2.4.18 macro name collision */
-#ifdef yield
-#undef yield
-#endif
#include <portals/types.h>
-#ifndef PTL_NO_WRAP
int PtlInit(int *);
void PtlFini(void);
int PtlNIFini(ptl_handle_ni_t interface_in);
-#endif
-
int PtlGetId(ptl_handle_ni_t ni_handle, ptl_process_id_t *id);
int PtlNIDist(ptl_handle_ni_t interface_in, ptl_process_id_t process_in,
unsigned long *distance_out);
-#ifndef PTL_NO_WRAP
int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * interface_out);
-#endif
/*
int PtlMEUnlinkList(ptl_handle_me_t current_in);
-int PtlTblDump(ptl_handle_ni_t ni, int index_in);
-int PtlMEDump(ptl_handle_me_t current_in);
-
/*
* Memory descriptors
*/
-#ifndef PTL_NO_WRAP
int PtlMDAttach(ptl_handle_me_t current_in, ptl_md_t md_in,
ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out);
int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t * old_inout,
ptl_md_t * new_inout, ptl_handle_eq_t testq_in);
-#endif
/* These should not be called by users */
int PtlMDUpdate_internal(ptl_handle_md_t md_in, ptl_md_t * old_inout,
/*
* Event queues
*/
-#ifndef PTL_NO_WRAP
-
-/* These should be called by users */
int PtlEQAlloc(ptl_handle_ni_t ni_in, ptl_size_t count_in,
ptl_eq_handler_t handler,
ptl_handle_eq_t *handle_out);
int PtlEQFree(ptl_handle_eq_t eventq_in);
-int PtlEQCount(ptl_handle_eq_t eventq_in, ptl_size_t * count_out);
-
int PtlEQGet(ptl_handle_eq_t eventq_in, ptl_event_t * event_out);
int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
ptl_event_t *event_out, int *which_out);
-#endif
/*
* Access Control Table
+++ /dev/null
-#ifndef PTL_BLOCKS_H
-#define PTL_BLOCKS_H
-
-#include "build_check.h"
-
-/*
- * blocks.h
- *
- * Argument block types for the Portals 3.0 library
- * Generated by idl
- *
- */
-
-#include <portals/types.h>
-
-/* put LIB_MAX_DISPATCH last here -- these must match the
- assignements to the dispatch table in lib-p30/dispatch.c */
-#define PTL_GETID 1
-#define PTL_NISTATUS 2
-#define PTL_NIDIST 3
-// #define PTL_NIDEBUG 4
-#define PTL_MEATTACH 5
-#define PTL_MEINSERT 6
-// #define PTL_MEPREPEND 7
-#define PTL_MEUNLINK 8
-#define PTL_TBLDUMP 9
-#define PTL_MEDUMP 10
-#define PTL_MDATTACH 11
-// #define PTL_MDINSERT 12
-#define PTL_MDBIND 13
-#define PTL_MDUPDATE 14
-#define PTL_MDUNLINK 15
-#define PTL_EQALLOC 16
-#define PTL_EQFREE 17
-#define PTL_ACENTRY 18
-#define PTL_PUT 19
-#define PTL_GET 20
-#define PTL_FAILNID 21
-#define LIB_MAX_DISPATCH 21
-
-typedef struct PtlFailNid_in {
- ptl_handle_ni_t interface;
- ptl_nid_t nid;
- unsigned int threshold;
-} PtlFailNid_in;
-
-typedef struct PtlFailNid_out {
- int rc;
-} PtlFailNid_out;
-
-typedef struct PtlGetId_in {
- ptl_handle_ni_t handle_in;
-} PtlGetId_in;
-
-typedef struct PtlGetId_out {
- int rc;
- ptl_process_id_t id_out;
-} PtlGetId_out;
-
-typedef struct PtlNIStatus_in {
- ptl_handle_ni_t interface_in;
- ptl_sr_index_t register_in;
-} PtlNIStatus_in;
-
-typedef struct PtlNIStatus_out {
- int rc;
- ptl_sr_value_t status_out;
-} PtlNIStatus_out;
-
-
-typedef struct PtlNIDist_in {
- ptl_handle_ni_t interface_in;
- ptl_process_id_t process_in;
-} PtlNIDist_in;
-
-typedef struct PtlNIDist_out {
- int rc;
- unsigned long distance_out;
-} PtlNIDist_out;
-
-
-typedef struct PtlNIDebug_in {
- unsigned int mask_in;
-} PtlNIDebug_in;
-
-typedef struct PtlNIDebug_out {
- unsigned int rc;
-} PtlNIDebug_out;
-
-
-typedef struct PtlMEAttach_in {
- ptl_handle_ni_t interface_in;
- ptl_pt_index_t index_in;
- ptl_ins_pos_t position_in;
- ptl_process_id_t match_id_in;
- ptl_match_bits_t match_bits_in;
- ptl_match_bits_t ignore_bits_in;
- ptl_unlink_t unlink_in;
-} PtlMEAttach_in;
-
-typedef struct PtlMEAttach_out {
- int rc;
- ptl_handle_me_t handle_out;
-} PtlMEAttach_out;
-
-
-typedef struct PtlMEInsert_in {
- ptl_handle_me_t current_in;
- ptl_process_id_t match_id_in;
- ptl_match_bits_t match_bits_in;
- ptl_match_bits_t ignore_bits_in;
- ptl_unlink_t unlink_in;
- ptl_ins_pos_t position_in;
-} PtlMEInsert_in;
-
-typedef struct PtlMEInsert_out {
- int rc;
- ptl_handle_me_t handle_out;
-} PtlMEInsert_out;
-
-typedef struct PtlMEUnlink_in {
- ptl_handle_me_t current_in;
- ptl_unlink_t unlink_in;
-} PtlMEUnlink_in;
-
-typedef struct PtlMEUnlink_out {
- int rc;
-} PtlMEUnlink_out;
-
-
-typedef struct PtlTblDump_in {
- int index_in;
-} PtlTblDump_in;
-
-typedef struct PtlTblDump_out {
- int rc;
-} PtlTblDump_out;
-
-
-typedef struct PtlMEDump_in {
- ptl_handle_me_t current_in;
-} PtlMEDump_in;
-
-typedef struct PtlMEDump_out {
- int rc;
-} PtlMEDump_out;
-
-
-typedef struct PtlMDAttach_in {
- ptl_handle_me_t me_in;
- ptl_handle_eq_t eq_in;
- ptl_md_t md_in;
- ptl_unlink_t unlink_in;
-} PtlMDAttach_in;
-
-typedef struct PtlMDAttach_out {
- int rc;
- ptl_handle_md_t handle_out;
-} PtlMDAttach_out;
-
-
-typedef struct PtlMDBind_in {
- ptl_handle_ni_t ni_in;
- ptl_handle_eq_t eq_in;
- ptl_md_t md_in;
- ptl_unlink_t unlink_in;
-} PtlMDBind_in;
-
-typedef struct PtlMDBind_out {
- int rc;
- ptl_handle_md_t handle_out;
-} PtlMDBind_out;
-
-
-typedef struct PtlMDUpdate_internal_in {
- ptl_handle_md_t md_in;
- ptl_handle_eq_t testq_in;
- ptl_seq_t sequence_in;
-
- ptl_md_t old_inout;
- int old_inout_valid;
- ptl_md_t new_inout;
- int new_inout_valid;
-} PtlMDUpdate_internal_in;
-
-typedef struct PtlMDUpdate_internal_out {
- int rc;
- ptl_md_t old_inout;
- ptl_md_t new_inout;
-} PtlMDUpdate_internal_out;
-
-
-typedef struct PtlMDUnlink_in {
- ptl_handle_md_t md_in;
-} PtlMDUnlink_in;
-
-typedef struct PtlMDUnlink_out {
- int rc;
- ptl_md_t status_out;
-} PtlMDUnlink_out;
-
-
-typedef struct PtlEQAlloc_in {
- ptl_handle_ni_t ni_in;
- ptl_size_t count_in;
- void *base_in;
- int len_in;
- ptl_eq_handler_t callback_in;
-} PtlEQAlloc_in;
-
-typedef struct PtlEQAlloc_out {
- int rc;
- ptl_handle_eq_t handle_out;
-} PtlEQAlloc_out;
-
-
-typedef struct PtlEQFree_in {
- ptl_handle_eq_t eventq_in;
-} PtlEQFree_in;
-
-typedef struct PtlEQFree_out {
- int rc;
-} PtlEQFree_out;
-
-
-typedef struct PtlACEntry_in {
- ptl_handle_ni_t ni_in;
- ptl_ac_index_t index_in;
- ptl_process_id_t match_id_in;
- ptl_pt_index_t portal_in;
-} PtlACEntry_in;
-
-typedef struct PtlACEntry_out {
- int rc;
-} PtlACEntry_out;
-
-
-typedef struct PtlPut_in {
- ptl_handle_md_t md_in;
- ptl_ack_req_t ack_req_in;
- ptl_process_id_t target_in;
- ptl_pt_index_t portal_in;
- ptl_ac_index_t cookie_in;
- ptl_match_bits_t match_bits_in;
- ptl_size_t offset_in;
- ptl_hdr_data_t hdr_data_in;
-} PtlPut_in;
-
-typedef struct PtlPut_out {
- int rc;
-} PtlPut_out;
-
-
-typedef struct PtlGet_in {
- ptl_handle_md_t md_in;
- ptl_process_id_t target_in;
- ptl_pt_index_t portal_in;
- ptl_ac_index_t cookie_in;
- ptl_match_bits_t match_bits_in;
- ptl_size_t offset_in;
-} PtlGet_in;
-
-typedef struct PtlGet_out {
- int rc;
-} PtlGet_out;
-
-
-#endif
PTL_EQ_IN_USE = 21,
- PTL_MAX_ERRNO = 22
+ PTL_NI_INVALID = 22,
+ PTL_MD_ILLEGAL = 23,
+
+ PTL_MAX_ERRNO = 24
} ptl_err_t;
/* If you change these, you must update the string table in api-errno.c */
extern int ptl_init; /* Has the library been initialized */
-extern int ptl_ni_init(void);
-extern void ptl_ni_fini(void);
-
-static inline ptl_eq_t *
-ptl_handle2usereq (ptl_handle_eq_t *handle)
-{
- /* EQ handles are a little wierd. On the "user" side, the cookie
- * is just a pointer to a queue of events in shared memory. It's
- * cb_eq_handle is the "real" handle which we pass when we
- * call do_forward(). */
- return (ptl_eq_t *)((unsigned long)handle->cookie);
-}
-
#endif
+++ /dev/null
-#ifndef PTL_DISPATCH_H
-#define PTL_DISPATCH_H
-
-#include "build_check.h"
-/*
- * include/dispatch.h
- *
- * Dispatch table header and externs for remote side
- * operations
- *
- * Generated by idl
- *
- */
-
-#include <portals/lib-p30.h>
-#include <portals/arg-blocks.h>
-
-extern int do_PtlGetId(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlNIStatus(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlNIDist(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEAttach(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEInsert(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEPrepend(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlMEUnlink(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlTblDump(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMEDump(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlMDAttach(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlMDBind(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlEQAlloc_internal(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlEQFree_internal(nal_cb_t * nal, void *private, void *args,
- void *ret);
-extern int do_PtlPut(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlGet(nal_cb_t * nal, void *private, void *args, void *ret);
-extern int do_PtlFailNid (nal_cb_t *nal, void *private, void *args, void *ret);
-
-extern char *dispatch_name(int index);
-#endif
#else
# include <portals/list.h>
# include <string.h>
+# include <pthread.h>
#endif
#include <portals/types.h>
#include <linux/kp30.h>
#include <portals/p30.h>
+#include <portals/nal.h>
#include <portals/lib-types.h>
-#include <portals/lib-nal.h>
-#include <portals/lib-dispatch.h>
static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
{
wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
}
-#define state_lock(nal,flagsp) \
-do { \
- CDEBUG(D_PORTALS, "taking state lock\n"); \
- nal->cb_cli(nal, flagsp); \
-} while (0)
+#ifdef __KERNEL__
+#define LIB_LOCK(nal,flags) \
+ spin_lock_irqsave(&(nal)->libnal_ni.ni_lock, flags)
+#define LIB_UNLOCK(nal,flags) \
+ spin_unlock_irqrestore(&(nal)->libnal_ni.ni_lock, flags)
+#else
+#define LIB_LOCK(nal,flags) \
+ (pthread_mutex_lock(&(nal)->libnal_ni.ni_mutex), (flags) = 0)
+#define LIB_UNLOCK(nal,flags) \
+ pthread_mutex_unlock(&(nal)->libnal_ni.ni_mutex)
+#endif
-#define state_unlock(nal,flagsp) \
-{ \
- CDEBUG(D_PORTALS, "releasing state lock\n"); \
- nal->cb_sti(nal, flagsp); \
-}
#ifdef PTL_USE_LIB_FREELIST
#define MAX_MSGS 2048 /* Outstanding messages */
#define MAX_EQS 512
-extern int lib_freelist_init (nal_cb_t *nal, lib_freelist_t *fl, int nobj, int objsize);
-extern void lib_freelist_fini (nal_cb_t *nal, lib_freelist_t *fl);
+extern int lib_freelist_init (lib_nal_t *nal, lib_freelist_t *fl, int nobj, int objsize);
+extern void lib_freelist_fini (lib_nal_t *nal, lib_freelist_t *fl);
static inline void *
lib_freelist_alloc (lib_freelist_t *fl)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_freeobj_t *o;
if (list_empty (&fl->fl_list))
static inline void
lib_freelist_free (lib_freelist_t *fl, void *obj)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_freeobj_t *o = list_entry (obj, lib_freeobj_t, fo_contents);
list_add (&o->fo_list, &fl->fl_list);
static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_eq_t *eq;
- state_lock (nal, &flags);
- eq = (lib_eq_t *)lib_freelist_alloc (&nal->ni.ni_free_eqs);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ eq = (lib_eq_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_eqs);
+ LIB_UNLOCK (nal, flags);
return (eq);
}
static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_eqs, eq);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_eqs, eq);
}
static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_md_t *md;
- state_lock (nal, &flags);
- md = (lib_md_t *)lib_freelist_alloc (&nal->ni.ni_free_mds);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ md = (lib_md_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mds);
+ LIB_UNLOCK (nal, flags);
return (md);
}
static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_mds, md);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_mds, md);
}
static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_me_t *me;
- state_lock (nal, &flags);
- me = (lib_me_t *)lib_freelist_alloc (&nal->ni.ni_free_mes);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ me = (lib_me_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mes);
+ LIB_UNLOCK (nal, flags);
return (me);
}
static inline void
-lib_me_free (nal_cb_t *nal, lib_me_t *me)
+lib_me_free (lib_nal_t *nal, lib_me_t *me)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_mes, me);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_mes, me);
}
static inline lib_msg_t *
-lib_msg_alloc (nal_cb_t *nal)
+lib_msg_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_msg_t *msg;
- state_lock (nal, &flags);
- msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ msg = (lib_msg_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_msgs);
+ LIB_UNLOCK (nal, flags);
if (msg != NULL) {
/* NULL pointers, clear flags etc */
}
static inline void
-lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free (lib_nal_t *nal, lib_msg_t *msg)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_msgs, msg);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_msgs, msg);
}
#else
static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_eq_t *eq;
PORTAL_ALLOC(eq, sizeof(*eq));
}
static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(eq, sizeof(*eq));
}
static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_md_t *md;
int size;
int niov;
}
static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
int size;
if ((md->options & PTL_MD_KIOV) != 0)
}
static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_me_t *me;
PORTAL_ALLOC(me, sizeof(*me));
}
static inline void
-lib_me_free(nal_cb_t *nal, lib_me_t *me)
+lib_me_free(lib_nal_t *nal, lib_me_t *me)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(me, sizeof(*me));
}
static inline lib_msg_t *
-lib_msg_alloc(nal_cb_t *nal)
+lib_msg_alloc(lib_nal_t *nal)
{
- /* NEVER called with statelock held; may be in interrupt... */
+ /* NEVER called with liblock held; may be in interrupt... */
lib_msg_t *msg;
if (in_interrupt())
}
static inline void
-lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free(lib_nal_t *nal, lib_msg_t *msg)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(msg, sizeof(*msg));
}
#endif
-extern lib_handle_t *lib_lookup_cookie (nal_cb_t *nal, __u64 cookie, int type);
-extern void lib_initialise_handle (nal_cb_t *nal, lib_handle_t *lh, int type);
-extern void lib_invalidate_handle (nal_cb_t *nal, lib_handle_t *lh);
+extern lib_handle_t *lib_lookup_cookie (lib_nal_t *nal, __u64 cookie, int type);
+extern void lib_initialise_handle (lib_nal_t *nal, lib_handle_t *lh, int type);
+extern void lib_invalidate_handle (lib_nal_t *nal, lib_handle_t *lh);
static inline void
-ptl_eq2handle (ptl_handle_eq_t *handle, lib_eq_t *eq)
+ptl_eq2handle (ptl_handle_eq_t *handle, lib_nal_t *nal, lib_eq_t *eq)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = eq->eq_lh.lh_cookie;
}
static inline lib_eq_t *
-ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
+ptl_handle2eq (ptl_handle_eq_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_EQ);
if (lh == NULL)
}
static inline void
-ptl_md2handle (ptl_handle_md_t *handle, lib_md_t *md)
+ptl_md2handle (ptl_handle_md_t *handle, lib_nal_t *nal, lib_md_t *md)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = md->md_lh.lh_cookie;
}
static inline lib_md_t *
-ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
+ptl_handle2md (ptl_handle_md_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_MD);
if (lh == NULL)
}
static inline lib_md_t *
-ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
+ptl_wire_handle2md (ptl_handle_wire_t *wh, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh;
- if (wh->wh_interface_cookie != nal->ni.ni_interface_cookie)
+ if (wh->wh_interface_cookie != nal->libnal_ni.ni_interface_cookie)
return (NULL);
lh = lib_lookup_cookie (nal, wh->wh_object_cookie,
}
static inline void
-ptl_me2handle (ptl_handle_me_t *handle, lib_me_t *me)
+ptl_me2handle (ptl_handle_me_t *handle, lib_nal_t *nal, lib_me_t *me)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = me->me_lh.lh_cookie;
}
static inline lib_me_t *
-ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
+ptl_handle2me (ptl_handle_me_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_ME);
if (lh == NULL)
return (lh_entry (lh, lib_me_t, me_lh));
}
-extern int lib_init(nal_cb_t *cb, ptl_process_id_t pid,
+extern int lib_init(lib_nal_t *libnal, nal_t *apinal,
+ ptl_process_id_t pid,
ptl_ni_limits_t *desired_limits,
ptl_ni_limits_t *actual_limits);
-extern int lib_fini(nal_cb_t * cb);
-extern void lib_dispatch(nal_cb_t * cb, void *private, int index,
- void *arg_block, void *ret_block);
-extern char *dispatch_name(int index);
+extern int lib_fini(lib_nal_t *libnal);
/*
- * When the NAL detects an incoming message, it should call
- * lib_parse() decode it. The NAL callbacks will be handed
- * the private cookie as a way for the NAL to maintain state
- * about which transaction is being processed. An extra parameter,
- * lib_cookie will contain the necessary information for
- * finalizing the message.
- *
- * After it has finished the handling the message, it should
- * call lib_finalize() with the lib_cookie parameter.
- * Call backs will be made to write events, send acks or
- * replies and so on.
+ * When the NAL detects an incoming message header, it should call
+ * lib_parse() decode it. If the message header is garbage, lib_parse()
+ * returns immediately with failure, otherwise the NAL callbacks will be
+ * called to receive the message body. They are handed the private cookie
+ * as a way for the NAL to maintain state about which transaction is being
+ * processed. An extra parameter, lib_msg contains the lib-level message
+ * state for passing to lib_finalize() when the message body has been
+ * received.
*/
-extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+extern void lib_enq_event_locked (lib_nal_t *nal, void *private,
lib_eq_t *eq, ptl_event_t *ev);
-extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern void lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg,
ptl_ni_fail_t ni_fail_type);
-extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
-extern lib_msg_t *lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
+extern ptl_err_t lib_parse (lib_nal_t *nal, ptl_hdr_t *hdr, void *private);
+extern lib_msg_t *lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid,
lib_msg_t *get_msg);
-extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (lib_nal_t * nal, ptl_hdr_t * hdr);
extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
extern void lib_assert_wire_constants (void);
-extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+extern ptl_err_t lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern ptl_err_t lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
lib_md_t *md, ptl_size_t offset, ptl_size_t len);
-extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
- ptl_md_t * md_out);
-extern void lib_md_unlink(nal_cb_t * nal, lib_md_t * md_in);
-extern void lib_me_unlink(nal_cb_t * nal, lib_me_t * me_in);
+extern int lib_api_ni_status (nal_t *nal, ptl_sr_index_t sr_idx,
+ ptl_sr_value_t *status);
+extern int lib_api_ni_dist (nal_t *nal, ptl_process_id_t *pid,
+ unsigned long *dist);
+
+extern int lib_api_eq_alloc (nal_t *nal, ptl_size_t count,
+ ptl_eq_handler_t callback,
+ ptl_handle_eq_t *handle);
+extern int lib_api_eq_free(nal_t *nal, ptl_handle_eq_t *eqh);
+extern int lib_api_eq_poll (nal_t *nal,
+ ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+ ptl_event_t *event, int *which);
+
+extern int lib_api_me_attach(nal_t *nal,
+ ptl_pt_index_t portal,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits,
+ ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+extern int lib_api_me_insert(nal_t *nal,
+ ptl_handle_me_t *current_meh,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits,
+ ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+extern int lib_api_me_unlink (nal_t *nal, ptl_handle_me_t *meh);
+extern void lib_me_unlink(lib_nal_t *nal, lib_me_t *me);
+
+extern int lib_api_get_id(nal_t *nal, ptl_process_id_t *pid);
+
+extern void lib_md_unlink(lib_nal_t *nal, lib_md_t *md);
+extern void lib_md_deconstruct(lib_nal_t *nal, lib_md_t *lmd, ptl_md_t *umd);
+extern int lib_api_md_attach(nal_t *nal, ptl_handle_me_t *meh,
+ ptl_md_t *umd, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+extern int lib_api_md_bind(nal_t *nal, ptl_md_t *umd, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+extern int lib_api_md_unlink (nal_t *nal, ptl_handle_md_t *mdh);
+extern int lib_api_md_update (nal_t *nal, ptl_handle_md_t *mdh,
+ ptl_md_t *oldumd, ptl_md_t *newumd,
+ ptl_handle_eq_t *testqh);
+
+extern int lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh,
+ ptl_process_id_t *id,
+ ptl_pt_index_t portal, ptl_ac_index_t ac,
+ ptl_match_bits_t match_bits, ptl_size_t offset);
+extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh,
+ ptl_ack_req_t ack, ptl_process_id_t *id,
+ ptl_pt_index_t portal, ptl_ac_index_t ac,
+ ptl_match_bits_t match_bits,
+ ptl_size_t offset, ptl_hdr_data_t hdr_data);
+extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+
#endif
+++ /dev/null
-#ifndef _LIB_NAL_H_
-#define _LIB_NAL_H_
-
-#include "build_check.h"
-/*
- * nal.h
- *
- * Library side headers that define the abstraction layer's
- * responsibilities and interfaces
- */
-
-#include <portals/lib-types.h>
-
-struct nal_cb_t {
- /*
- * Per interface portal table, access control table
- * and NAL private data field;
- */
- lib_ni_t ni;
- void *nal_data;
- /*
- * send: Sends a preformatted header and payload data to a
- * specified remote process. The payload is scattered over 'niov'
- * fragments described by iov, starting at 'offset' for 'mlen'
- * bytes.
- * NB the NAL may NOT overwrite iov.
- * PTL_OK on success => NAL has committed to send and will call
- * lib_finalize on completion
- */
- ptl_err_t (*cb_send) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
- ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t mlen);
-
- /* as send, but with a set of page fragments (NULL if not supported) */
- ptl_err_t (*cb_send_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
- ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, ptl_kiov_t *iov,
- size_t offset, size_t mlen);
- /*
- * recv: Receives an incoming message from a remote process. The
- * payload is to be received into the scattered buffer of 'niov'
- * fragments described by iov, starting at 'offset' for 'mlen'
- * bytes. Payload bytes after 'mlen' up to 'rlen' are to be
- * discarded.
- * NB the NAL may NOT overwrite iov.
- * PTL_OK on success => NAL has committed to receive and will call
- * lib_finalize on completion
- */
- ptl_err_t (*cb_recv) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t mlen, size_t rlen);
-
- /* as recv, but with a set of page fragments (NULL if not supported) */
- ptl_err_t (*cb_recv_pages) (nal_cb_t * nal, void *private, lib_msg_t * cookie,
- unsigned int niov, ptl_kiov_t *iov,
- size_t offset, size_t mlen, size_t rlen);
- /*
- * read: Reads a block of data from a specified user address
- */
- ptl_err_t (*cb_read) (nal_cb_t * nal, void *private, void *dst_addr,
- user_ptr src_addr, size_t len);
-
- /*
- * write: Writes a block of data into a specified user address
- */
- ptl_err_t (*cb_write) (nal_cb_t * nal, void *private, user_ptr dsr_addr,
- void *src_addr, size_t len);
-
- /*
- * callback: Calls an event callback
- * NULL => lib calls eq's callback (if any) directly.
- */
- void (*cb_callback) (nal_cb_t * nal, void *private, lib_eq_t *eq,
- ptl_event_t *ev);
-
- /*
- * malloc: Acquire a block of memory in a system independent
- * fashion.
- */
- void *(*cb_malloc) (nal_cb_t * nal, size_t len);
-
- void (*cb_free) (nal_cb_t * nal, void *buf, size_t len);
-
- /*
- * (un)map: Tell the NAL about some memory it will access.
- * *addrkey passed to cb_unmap() is what cb_map() set it to.
- * type of *iov depends on options.
- * Set to NULL if not required.
- */
- ptl_err_t (*cb_map) (nal_cb_t * nal, unsigned int niov, struct iovec *iov,
- void **addrkey);
- void (*cb_unmap) (nal_cb_t * nal, unsigned int niov, struct iovec *iov,
- void **addrkey);
-
- /* as (un)map, but with a set of page fragments */
- ptl_err_t (*cb_map_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov,
- void **addrkey);
- void (*cb_unmap_pages) (nal_cb_t * nal, unsigned int niov, ptl_kiov_t *iov,
- void **addrkey);
-
- void (*cb_printf) (nal_cb_t * nal, const char *fmt, ...);
-
- /* Turn interrupts off (begin of protected area) */
- void (*cb_cli) (nal_cb_t * nal, unsigned long *flags);
-
- /* Turn interrupts on (end of protected area) */
- void (*cb_sti) (nal_cb_t * nal, unsigned long *flags);
-
- /*
- * Calculate a network "distance" to given node
- */
- int (*cb_dist) (nal_cb_t * nal, ptl_nid_t nid, unsigned long *dist);
-};
-
-#endif
#else
# include <portals/list.h>
# include <string.h>
+# include <pthread.h>
#endif
#include <portals/types.h>
#include <linux/kp30.h>
#include <portals/p30.h>
+#include <portals/nal.h>
#include <portals/lib-types.h>
-#include <portals/lib-nal.h>
-#include <portals/lib-dispatch.h>
static inline int ptl_is_wire_handle_none (ptl_handle_wire_t *wh)
{
wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
}
-#define state_lock(nal,flagsp) \
-do { \
- CDEBUG(D_PORTALS, "taking state lock\n"); \
- nal->cb_cli(nal, flagsp); \
-} while (0)
+#ifdef __KERNEL__
+#define LIB_LOCK(nal,flags) \
+ spin_lock_irqsave(&(nal)->libnal_ni.ni_lock, flags)
+#define LIB_UNLOCK(nal,flags) \
+ spin_unlock_irqrestore(&(nal)->libnal_ni.ni_lock, flags)
+#else
+#define LIB_LOCK(nal,flags) \
+ (pthread_mutex_lock(&(nal)->libnal_ni.ni_mutex), (flags) = 0)
+#define LIB_UNLOCK(nal,flags) \
+ pthread_mutex_unlock(&(nal)->libnal_ni.ni_mutex)
+#endif
-#define state_unlock(nal,flagsp) \
-{ \
- CDEBUG(D_PORTALS, "releasing state lock\n"); \
- nal->cb_sti(nal, flagsp); \
-}
#ifdef PTL_USE_LIB_FREELIST
#define MAX_MSGS 2048 /* Outstanding messages */
#define MAX_EQS 512
-extern int lib_freelist_init (nal_cb_t *nal, lib_freelist_t *fl, int nobj, int objsize);
-extern void lib_freelist_fini (nal_cb_t *nal, lib_freelist_t *fl);
+extern int lib_freelist_init (lib_nal_t *nal, lib_freelist_t *fl, int nobj, int objsize);
+extern void lib_freelist_fini (lib_nal_t *nal, lib_freelist_t *fl);
static inline void *
lib_freelist_alloc (lib_freelist_t *fl)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_freeobj_t *o;
if (list_empty (&fl->fl_list))
static inline void
lib_freelist_free (lib_freelist_t *fl, void *obj)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_freeobj_t *o = list_entry (obj, lib_freeobj_t, fo_contents);
list_add (&o->fo_list, &fl->fl_list);
static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_eq_t *eq;
- state_lock (nal, &flags);
- eq = (lib_eq_t *)lib_freelist_alloc (&nal->ni.ni_free_eqs);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ eq = (lib_eq_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_eqs);
+ LIB_UNLOCK (nal, flags);
return (eq);
}
static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_eqs, eq);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_eqs, eq);
}
static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_md_t *md;
- state_lock (nal, &flags);
- md = (lib_md_t *)lib_freelist_alloc (&nal->ni.ni_free_mds);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ md = (lib_md_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mds);
+ LIB_UNLOCK (nal, flags);
return (md);
}
static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_mds, md);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_mds, md);
}
static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_me_t *me;
- state_lock (nal, &flags);
- me = (lib_me_t *)lib_freelist_alloc (&nal->ni.ni_free_mes);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ me = (lib_me_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_mes);
+ LIB_UNLOCK (nal, flags);
return (me);
}
static inline void
-lib_me_free (nal_cb_t *nal, lib_me_t *me)
+lib_me_free (lib_nal_t *nal, lib_me_t *me)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_mes, me);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_mes, me);
}
static inline lib_msg_t *
-lib_msg_alloc (nal_cb_t *nal)
+lib_msg_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
unsigned long flags;
lib_msg_t *msg;
- state_lock (nal, &flags);
- msg = (lib_msg_t *)lib_freelist_alloc (&nal->ni.ni_free_msgs);
- state_unlock (nal, &flags);
+ LIB_LOCK (nal, flags);
+ msg = (lib_msg_t *)lib_freelist_alloc (&nal->libnal_ni.ni_free_msgs);
+ LIB_UNLOCK (nal, flags);
if (msg != NULL) {
/* NULL pointers, clear flags etc */
}
static inline void
-lib_msg_free (nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free (lib_nal_t *nal, lib_msg_t *msg)
{
- /* ALWAYS called with statelock held */
- lib_freelist_free (&nal->ni.ni_free_msgs, msg);
+ /* ALWAYS called with liblock held */
+ lib_freelist_free (&nal->libnal_ni.ni_free_msgs, msg);
}
#else
static inline lib_eq_t *
-lib_eq_alloc (nal_cb_t *nal)
+lib_eq_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_eq_t *eq;
PORTAL_ALLOC(eq, sizeof(*eq));
}
static inline void
-lib_eq_free (nal_cb_t *nal, lib_eq_t *eq)
+lib_eq_free (lib_nal_t *nal, lib_eq_t *eq)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(eq, sizeof(*eq));
}
static inline lib_md_t *
-lib_md_alloc (nal_cb_t *nal, ptl_md_t *umd)
+lib_md_alloc (lib_nal_t *nal, ptl_md_t *umd)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_md_t *md;
int size;
int niov;
}
static inline void
-lib_md_free (nal_cb_t *nal, lib_md_t *md)
+lib_md_free (lib_nal_t *nal, lib_md_t *md)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
int size;
if ((md->options & PTL_MD_KIOV) != 0)
}
static inline lib_me_t *
-lib_me_alloc (nal_cb_t *nal)
+lib_me_alloc (lib_nal_t *nal)
{
- /* NEVER called with statelock held */
+ /* NEVER called with liblock held */
lib_me_t *me;
PORTAL_ALLOC(me, sizeof(*me));
}
static inline void
-lib_me_free(nal_cb_t *nal, lib_me_t *me)
+lib_me_free(lib_nal_t *nal, lib_me_t *me)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(me, sizeof(*me));
}
static inline lib_msg_t *
-lib_msg_alloc(nal_cb_t *nal)
+lib_msg_alloc(lib_nal_t *nal)
{
- /* NEVER called with statelock held; may be in interrupt... */
+ /* NEVER called with liblock held; may be in interrupt... */
lib_msg_t *msg;
if (in_interrupt())
}
static inline void
-lib_msg_free(nal_cb_t *nal, lib_msg_t *msg)
+lib_msg_free(lib_nal_t *nal, lib_msg_t *msg)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
PORTAL_FREE(msg, sizeof(*msg));
}
#endif
-extern lib_handle_t *lib_lookup_cookie (nal_cb_t *nal, __u64 cookie, int type);
-extern void lib_initialise_handle (nal_cb_t *nal, lib_handle_t *lh, int type);
-extern void lib_invalidate_handle (nal_cb_t *nal, lib_handle_t *lh);
+extern lib_handle_t *lib_lookup_cookie (lib_nal_t *nal, __u64 cookie, int type);
+extern void lib_initialise_handle (lib_nal_t *nal, lib_handle_t *lh, int type);
+extern void lib_invalidate_handle (lib_nal_t *nal, lib_handle_t *lh);
static inline void
-ptl_eq2handle (ptl_handle_eq_t *handle, lib_eq_t *eq)
+ptl_eq2handle (ptl_handle_eq_t *handle, lib_nal_t *nal, lib_eq_t *eq)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = eq->eq_lh.lh_cookie;
}
static inline lib_eq_t *
-ptl_handle2eq (ptl_handle_eq_t *handle, nal_cb_t *nal)
+ptl_handle2eq (ptl_handle_eq_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_EQ);
if (lh == NULL)
}
static inline void
-ptl_md2handle (ptl_handle_md_t *handle, lib_md_t *md)
+ptl_md2handle (ptl_handle_md_t *handle, lib_nal_t *nal, lib_md_t *md)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = md->md_lh.lh_cookie;
}
static inline lib_md_t *
-ptl_handle2md (ptl_handle_md_t *handle, nal_cb_t *nal)
+ptl_handle2md (ptl_handle_md_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_MD);
if (lh == NULL)
}
static inline lib_md_t *
-ptl_wire_handle2md (ptl_handle_wire_t *wh, nal_cb_t *nal)
+ptl_wire_handle2md (ptl_handle_wire_t *wh, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh;
- if (wh->wh_interface_cookie != nal->ni.ni_interface_cookie)
+ if (wh->wh_interface_cookie != nal->libnal_ni.ni_interface_cookie)
return (NULL);
lh = lib_lookup_cookie (nal, wh->wh_object_cookie,
}
static inline void
-ptl_me2handle (ptl_handle_me_t *handle, lib_me_t *me)
+ptl_me2handle (ptl_handle_me_t *handle, lib_nal_t *nal, lib_me_t *me)
{
+ handle->nal_idx = nal->libnal_ni.ni_api->nal_handle.nal_idx;
handle->cookie = me->me_lh.lh_cookie;
}
static inline lib_me_t *
-ptl_handle2me (ptl_handle_me_t *handle, nal_cb_t *nal)
+ptl_handle2me (ptl_handle_me_t *handle, lib_nal_t *nal)
{
- /* ALWAYS called with statelock held */
+ /* ALWAYS called with liblock held */
lib_handle_t *lh = lib_lookup_cookie (nal, handle->cookie,
PTL_COOKIE_TYPE_ME);
if (lh == NULL)
return (lh_entry (lh, lib_me_t, me_lh));
}
-extern int lib_init(nal_cb_t *cb, ptl_process_id_t pid,
+extern int lib_init(lib_nal_t *libnal, nal_t *apinal,
+ ptl_process_id_t pid,
ptl_ni_limits_t *desired_limits,
ptl_ni_limits_t *actual_limits);
-extern int lib_fini(nal_cb_t * cb);
-extern void lib_dispatch(nal_cb_t * cb, void *private, int index,
- void *arg_block, void *ret_block);
-extern char *dispatch_name(int index);
+extern int lib_fini(lib_nal_t *libnal);
/*
- * When the NAL detects an incoming message, it should call
- * lib_parse() decode it. The NAL callbacks will be handed
- * the private cookie as a way for the NAL to maintain state
- * about which transaction is being processed. An extra parameter,
- * lib_cookie will contain the necessary information for
- * finalizing the message.
- *
- * After it has finished the handling the message, it should
- * call lib_finalize() with the lib_cookie parameter.
- * Call backs will be made to write events, send acks or
- * replies and so on.
+ * When the NAL detects an incoming message header, it should call
+ * lib_parse() decode it. If the message header is garbage, lib_parse()
+ * returns immediately with failure, otherwise the NAL callbacks will be
+ * called to receive the message body. They are handed the private cookie
+ * as a way for the NAL to maintain state about which transaction is being
+ * processed. An extra parameter, lib_msg contains the lib-level message
+ * state for passing to lib_finalize() when the message body has been
+ * received.
*/
-extern void lib_enq_event_locked (nal_cb_t *nal, void *private,
+extern void lib_enq_event_locked (lib_nal_t *nal, void *private,
lib_eq_t *eq, ptl_event_t *ev);
-extern void lib_finalize (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern void lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg,
ptl_ni_fail_t ni_fail_type);
-extern void lib_parse (nal_cb_t *nal, ptl_hdr_t *hdr, void *private);
-extern lib_msg_t *lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
+extern ptl_err_t lib_parse (lib_nal_t *nal, ptl_hdr_t *hdr, void *private);
+extern lib_msg_t *lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid,
lib_msg_t *get_msg);
-extern void print_hdr (nal_cb_t * nal, ptl_hdr_t * hdr);
+extern void print_hdr (lib_nal_t * nal, ptl_hdr_t * hdr);
extern ptl_size_t lib_iov_nob (int niov, struct iovec *iov);
extern void lib_assert_wire_constants (void);
-extern ptl_err_t lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
+extern ptl_err_t lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen);
-extern ptl_err_t lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
+extern ptl_err_t lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
lib_md_t *md, ptl_size_t offset, ptl_size_t len);
-extern void lib_md_deconstruct(nal_cb_t * nal, lib_md_t * md_in,
- ptl_md_t * md_out);
-extern void lib_md_unlink(nal_cb_t * nal, lib_md_t * md_in);
-extern void lib_me_unlink(nal_cb_t * nal, lib_me_t * me_in);
+extern int lib_api_ni_status (nal_t *nal, ptl_sr_index_t sr_idx,
+ ptl_sr_value_t *status);
+extern int lib_api_ni_dist (nal_t *nal, ptl_process_id_t *pid,
+ unsigned long *dist);
+
+extern int lib_api_eq_alloc (nal_t *nal, ptl_size_t count,
+ ptl_eq_handler_t callback,
+ ptl_handle_eq_t *handle);
+extern int lib_api_eq_free(nal_t *nal, ptl_handle_eq_t *eqh);
+extern int lib_api_eq_poll (nal_t *nal,
+ ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+ ptl_event_t *event, int *which);
+
+extern int lib_api_me_attach(nal_t *nal,
+ ptl_pt_index_t portal,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits,
+ ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+extern int lib_api_me_insert(nal_t *nal,
+ ptl_handle_me_t *current_meh,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits,
+ ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+extern int lib_api_me_unlink (nal_t *nal, ptl_handle_me_t *meh);
+extern void lib_me_unlink(lib_nal_t *nal, lib_me_t *me);
+
+extern int lib_api_get_id(nal_t *nal, ptl_process_id_t *pid);
+
+extern void lib_md_unlink(lib_nal_t *nal, lib_md_t *md);
+extern void lib_md_deconstruct(lib_nal_t *nal, lib_md_t *lmd, ptl_md_t *umd);
+extern int lib_api_md_attach(nal_t *nal, ptl_handle_me_t *meh,
+ ptl_md_t *umd, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+extern int lib_api_md_bind(nal_t *nal, ptl_md_t *umd, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+extern int lib_api_md_unlink (nal_t *nal, ptl_handle_md_t *mdh);
+extern int lib_api_md_update (nal_t *nal, ptl_handle_md_t *mdh,
+ ptl_md_t *oldumd, ptl_md_t *newumd,
+ ptl_handle_eq_t *testqh);
+
+extern int lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh,
+ ptl_process_id_t *id,
+ ptl_pt_index_t portal, ptl_ac_index_t ac,
+ ptl_match_bits_t match_bits, ptl_size_t offset);
+extern int lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh,
+ ptl_ack_req_t ack, ptl_process_id_t *id,
+ ptl_pt_index_t portal, ptl_ac_index_t ac,
+ ptl_match_bits_t match_bits,
+ ptl_size_t offset, ptl_hdr_data_t hdr_data);
+extern int lib_api_fail_nid(nal_t *apinal, ptl_nid_t nid, unsigned int threshold);
+
#endif
#include "build_check.h"
#include <portals/types.h>
+#include <portals/nal.h>
#ifdef __KERNEL__
# include <linux/uio.h>
# include <linux/smp_lock.h>
# include <sys/types.h>
#endif
-/* struct nal_cb_t is defined in lib-nal.h */
-typedef struct nal_cb_t nal_cb_t;
-
typedef char *user_ptr;
typedef struct lib_msg_t lib_msg_t;
typedef struct lib_ptl_t lib_ptl_t;
struct lib_eq_t {
struct list_head eq_list;
lib_handle_t eq_lh;
- ptl_seq_t sequence;
- ptl_size_t size;
- ptl_event_t *base;
+ ptl_seq_t eq_enq_seq;
+ ptl_seq_t eq_deq_seq;
+ ptl_size_t eq_size;
+ ptl_event_t *eq_events;
int eq_refcount;
- ptl_eq_handler_t event_callback;
+ ptl_eq_handler_t eq_callback;
void *eq_addrkey;
};
/* PTL_COOKIE_TYPES must be a power of 2, so the cookie type can be
* extracted by masking with (PTL_COOKIE_TYPES - 1) */
-typedef struct {
- ptl_nid_t nid;
- ptl_pid_t pid;
- lib_ptl_t tbl;
- lib_counters_t counters;
- ptl_ni_limits_t actual_limits;
+typedef struct lib_ni
+{
+ nal_t *ni_api;
+ ptl_process_id_t ni_pid;
+ lib_ptl_t ni_portals;
+ lib_counters_t ni_counters;
+ ptl_ni_limits_t ni_actual_limits;
int ni_lh_hash_size; /* size of lib handle hash table */
struct list_head *ni_lh_hash_table; /* all extant lib handles, this interface */
__u64 ni_next_object_cookie; /* cookie generator */
__u64 ni_interface_cookie; /* uniquely identifies this ni in this epoch */
- struct list_head ni_test_peers;
+ struct list_head ni_test_peers;
#ifdef PTL_USE_LIB_FREELIST
- lib_freelist_t ni_free_mes;
- lib_freelist_t ni_free_msgs;
- lib_freelist_t ni_free_mds;
- lib_freelist_t ni_free_eqs;
+ lib_freelist_t ni_free_mes;
+ lib_freelist_t ni_free_msgs;
+ lib_freelist_t ni_free_mds;
+ lib_freelist_t ni_free_eqs;
+#endif
+
+ struct list_head ni_active_msgs;
+ struct list_head ni_active_mds;
+ struct list_head ni_active_eqs;
+
+#ifdef __KERNEL__
+ spinlock_t ni_lock;
+ wait_queue_head_t ni_waitq;
+#else
+ pthread_mutex_t ni_mutex;
+ pthread_cond_t ni_cond;
#endif
- struct list_head ni_active_msgs;
- struct list_head ni_active_mds;
- struct list_head ni_active_eqs;
} lib_ni_t;
+
+typedef struct lib_nal
+{
+ /* lib-level interface state */
+ lib_ni_t libnal_ni;
+
+ /* NAL-private data */
+ void *libnal_data;
+
+ /*
+ * send: Sends a preformatted header and payload data to a
+ * specified remote process. The payload is scattered over 'niov'
+ * fragments described by iov, starting at 'offset' for 'mlen'
+ * bytes.
+ * NB the NAL may NOT overwrite iov.
+ * PTL_OK on success => NAL has committed to send and will call
+ * lib_finalize on completion
+ */
+ ptl_err_t (*libnal_send)
+ (struct lib_nal *nal, void *private, lib_msg_t *cookie,
+ ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int niov, struct iovec *iov,
+ size_t offset, size_t mlen);
+
+ /* as send, but with a set of page fragments (NULL if not supported) */
+ ptl_err_t (*libnal_send_pages)
+ (struct lib_nal *nal, void *private, lib_msg_t * cookie,
+ ptl_hdr_t * hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int niov, ptl_kiov_t *iov,
+ size_t offset, size_t mlen);
+ /*
+ * recv: Receives an incoming message from a remote process. The
+ * payload is to be received into the scattered buffer of 'niov'
+ * fragments described by iov, starting at 'offset' for 'mlen'
+ * bytes. Payload bytes after 'mlen' up to 'rlen' are to be
+ * discarded.
+ * NB the NAL may NOT overwrite iov.
+ * PTL_OK on success => NAL has committed to receive and will call
+ * lib_finalize on completion
+ */
+ ptl_err_t (*libnal_recv)
+ (struct lib_nal *nal, void *private, lib_msg_t * cookie,
+ unsigned int niov, struct iovec *iov,
+ size_t offset, size_t mlen, size_t rlen);
+
+ /* as recv, but with a set of page fragments (NULL if not supported) */
+ ptl_err_t (*libnal_recv_pages)
+ (struct lib_nal *nal, void *private, lib_msg_t * cookie,
+ unsigned int niov, ptl_kiov_t *iov,
+ size_t offset, size_t mlen, size_t rlen);
+
+ /*
+ * (un)map: Tell the NAL about some memory it will access.
+ * *addrkey passed to libnal_unmap() is what libnal_map() set it to.
+ * type of *iov depends on options.
+ * Set to NULL if not required.
+ */
+ ptl_err_t (*libnal_map)
+ (struct lib_nal *nal, unsigned int niov, struct iovec *iov,
+ void **addrkey);
+ void (*libnal_unmap)
+ (struct lib_nal *nal, unsigned int niov, struct iovec *iov,
+ void **addrkey);
+
+ /* as (un)map, but with a set of page fragments */
+ ptl_err_t (*libnal_map_pages)
+ (struct lib_nal *nal, unsigned int niov, ptl_kiov_t *iov,
+ void **addrkey);
+ void (*libnal_unmap_pages)
+ (struct lib_nal *nal, unsigned int niov, ptl_kiov_t *iov,
+ void **addrkey);
+
+ void (*libnal_printf)(struct lib_nal *nal, const char *fmt, ...);
+
+ /* Calculate a network "distance" to given node */
+ int (*libnal_dist) (struct lib_nal *nal, ptl_nid_t nid, unsigned long *dist);
+} lib_nal_t;
+
#endif
#include <portals/types.h>
-#ifdef yield
-#undef yield
-#endif
-
typedef struct nal_t nal_t;
struct nal_t {
+ /* common interface state */
int nal_refct;
+ ptl_handle_ni_t nal_handle;
+
+ /* NAL-private data */
void *nal_data;
- int (*startup) (nal_t *nal, ptl_pid_t requested_pid,
- ptl_ni_limits_t *req, ptl_ni_limits_t *actual);
+ /* NAL API implementation
+ * NB only nal_ni_init needs to be set when the NAL registers itself */
+ int (*nal_ni_init) (nal_t *nal, ptl_pid_t requested_pid,
+ ptl_ni_limits_t *req, ptl_ni_limits_t *actual);
- void (*shutdown) (nal_t *nal);
+ void (*nal_ni_fini) (nal_t *nal);
- int (*forward) (nal_t *nal, int index, /* Function ID */
- void *args, size_t arg_len, void *ret, size_t ret_len);
+ int (*nal_get_id) (nal_t *nal, ptl_process_id_t *id);
+ int (*nal_ni_status) (nal_t *nal, ptl_sr_index_t register, ptl_sr_value_t *status);
+ int (*nal_ni_dist) (nal_t *nal, ptl_process_id_t *id, unsigned long *distance);
+ int (*nal_fail_nid) (nal_t *nal, ptl_nid_t nid, unsigned int threshold);
- int (*yield) (nal_t *nal, unsigned long *flags, int milliseconds);
+ int (*nal_me_attach) (nal_t *nal, ptl_pt_index_t portal,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits, ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+ int (*nal_me_insert) (nal_t *nal, ptl_handle_me_t *me,
+ ptl_process_id_t match_id,
+ ptl_match_bits_t match_bits, ptl_match_bits_t ignore_bits,
+ ptl_unlink_t unlink, ptl_ins_pos_t pos,
+ ptl_handle_me_t *handle);
+ int (*nal_me_unlink) (nal_t *nal, ptl_handle_me_t *me);
+
+ int (*nal_md_attach) (nal_t *nal, ptl_handle_me_t *me,
+ ptl_md_t *md, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+ int (*nal_md_bind) (nal_t *nal,
+ ptl_md_t *md, ptl_unlink_t unlink,
+ ptl_handle_md_t *handle);
+ int (*nal_md_unlink) (nal_t *nal, ptl_handle_md_t *md);
+ int (*nal_md_update) (nal_t *nal, ptl_handle_md_t *md,
+ ptl_md_t *old_md, ptl_md_t *new_md,
+ ptl_handle_eq_t *testq);
- void (*lock) (nal_t *nal, unsigned long *flags);
+ int (*nal_eq_alloc) (nal_t *nal, ptl_size_t count,
+ ptl_eq_handler_t handler,
+ ptl_handle_eq_t *handle);
+ int (*nal_eq_free) (nal_t *nal, ptl_handle_eq_t *eq);
+ int (*nal_eq_poll) (nal_t *nal,
+ ptl_handle_eq_t *eqs, int neqs, int timeout,
+ ptl_event_t *event, int *which);
- void (*unlock) (nal_t *nal, unsigned long *flags);
+ int (*nal_ace_entry) (nal_t *nal, ptl_ac_index_t index,
+ ptl_process_id_t match_id, ptl_pt_index_t portal);
+
+ int (*nal_put) (nal_t *nal, ptl_handle_md_t *md, ptl_ack_req_t ack,
+ ptl_process_id_t *target, ptl_pt_index_t portal,
+ ptl_ac_index_t ac, ptl_match_bits_t match,
+ ptl_size_t offset, ptl_hdr_data_t hdr_data);
+ int (*nal_get) (nal_t *nal, ptl_handle_md_t *md,
+ ptl_process_id_t *target, ptl_pt_index_t portal,
+ ptl_ac_index_t ac, ptl_match_bits_t match,
+ ptl_size_t offset);
};
-extern nal_t *ptl_hndl2nal(ptl_handle_any_t * any);
+extern nal_t *ptl_hndl2nal(ptl_handle_any_t *any);
#ifdef __KERNEL__
extern int ptl_register_nal(ptl_interface_t interface, nal_t *nal);
#define PTL_EQ_HANDLER_NONE NULL
typedef struct {
- volatile ptl_seq_t sequence;
- ptl_size_t size;
- ptl_event_t *base;
- ptl_handle_any_t cb_eq_handle;
-} ptl_eq_t;
-
-typedef struct {
- ptl_eq_t *eq;
-} ptl_ni_t;
-
-typedef struct {
int max_mes;
int max_mds;
int max_eqs;
#define NRXTHREADS 10 /* max number of receiver threads */
typedef struct _gmnal_data_t {
- spinlock_t cb_lock;
spinlock_t stxd_lock;
struct semaphore stxd_token;
gmnal_stxd_t *stxd;
gmnal_srxd_t *srxd;
struct gm_hash *srxd_hash;
nal_t *nal;
- nal_cb_t *nal_cb;
+ lib_nal_t *libnal;
struct gm_port *gm_port;
unsigned int gm_local_nid;
unsigned int gm_global_nid;
#define GMNAL_GM_LOCK_INIT(a) spin_lock_init(&a->gm_lock);
#define GMNAL_GM_LOCK(a) spin_lock(&a->gm_lock);
#define GMNAL_GM_UNLOCK(a) spin_unlock(&a->gm_lock);
-#define GMNAL_CB_LOCK_INIT(a) spin_lock_init(&a->cb_lock);
/*
* CB NAL
*/
-int gmnal_cb_send(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_cb_send(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
int, ptl_nid_t, ptl_pid_t, unsigned int, struct iovec *, size_t);
-int gmnal_cb_send_pages(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_cb_send_pages(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
int, ptl_nid_t, ptl_pid_t, unsigned int, ptl_kiov_t *, size_t);
-int gmnal_cb_recv(nal_cb_t *, void *, lib_msg_t *,
+int gmnal_cb_recv(lib_nal_t *, void *, lib_msg_t *,
unsigned int, struct iovec *, size_t, size_t);
-int gmnal_cb_recv_pages(nal_cb_t *, void *, lib_msg_t *,
+int gmnal_cb_recv_pages(lib_nal_t *, void *, lib_msg_t *,
unsigned int, ptl_kiov_t *, size_t, size_t);
-int gmnal_cb_read(nal_cb_t *, void *private, void *, user_ptr, size_t);
-
-int gmnal_cb_write(nal_cb_t *, void *private, user_ptr, void *, size_t);
-
-int gmnal_cb_callback(nal_cb_t *, void *, lib_eq_t *, ptl_event_t *);
-
-void *gmnal_cb_malloc(nal_cb_t *, size_t);
-
-void gmnal_cb_free(nal_cb_t *, void *, size_t);
-
-void gmnal_cb_unmap(nal_cb_t *, unsigned int, struct iovec*, void **);
-
-int gmnal_cb_map(nal_cb_t *, unsigned int, struct iovec*, void **);
-
-void gmnal_cb_printf(nal_cb_t *, const char *fmt, ...);
-
-void gmnal_cb_cli(nal_cb_t *, unsigned long *);
-
-void gmnal_cb_sti(nal_cb_t *, unsigned long *);
-
-int gmnal_cb_dist(nal_cb_t *, ptl_nid_t, unsigned long *);
+int gmnal_cb_dist(lib_nal_t *, ptl_nid_t, unsigned long *);
int gmnal_init(void);
#define GMNAL_INIT_NAL_CB(a) do { \
- a->cb_send = gmnal_cb_send; \
- a->cb_send_pages = gmnal_cb_send_pages; \
- a->cb_recv = gmnal_cb_recv; \
- a->cb_recv_pages = gmnal_cb_recv_pages; \
- a->cb_read = gmnal_cb_read; \
- a->cb_write = gmnal_cb_write; \
- a->cb_callback = gmnal_cb_callback; \
- a->cb_malloc = gmnal_cb_malloc; \
- a->cb_free = gmnal_cb_free; \
- a->cb_map = NULL; \
- a->cb_unmap = NULL; \
- a->cb_printf = gmnal_cb_printf; \
- a->cb_cli = gmnal_cb_cli; \
- a->cb_sti = gmnal_cb_sti; \
- a->cb_dist = gmnal_cb_dist; \
- a->nal_data = NULL; \
+ a->libnal_send = gmnal_cb_send; \
+ a->libnal_send_pages = gmnal_cb_send_pages; \
+ a->libnal_recv = gmnal_cb_recv; \
+ a->libnal_recv_pages = gmnal_cb_recv_pages; \
+ a->libnal_map = NULL; \
+ a->libnal_unmap = NULL; \
+ a->libnal_dist = gmnal_cb_dist; \
+ a->libnal_data = NULL; \
} while (0)
/*
* Small messages
*/
-int gmnal_small_rx(nal_cb_t *, void *, lib_msg_t *, unsigned int,
+int gmnal_small_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int,
struct iovec *, size_t, size_t);
-int gmnal_small_tx(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_small_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
int, ptl_nid_t, ptl_pid_t,
unsigned int, struct iovec*, int);
void gmnal_small_tx_callback(gm_port_t *, void *, gm_status_t);
/*
* Large messages
*/
-int gmnal_large_rx(nal_cb_t *, void *, lib_msg_t *, unsigned int,
+int gmnal_large_rx(lib_nal_t *, void *, lib_msg_t *, unsigned int,
struct iovec *, size_t, size_t);
-int gmnal_large_tx(nal_cb_t *, void *, lib_msg_t *, ptl_hdr_t *,
+int gmnal_large_tx(lib_nal_t *, void *, lib_msg_t *, ptl_hdr_t *,
int, ptl_nid_t, ptl_pid_t, unsigned int,
struct iovec*, int);
{ 0 }
};
-
-
-
-
-
-/*
- * gmnal_api_forward
- * This function takes a pack block of arguments from the NAL API
- * module and passes them to the NAL CB module. The CB module unpacks
- * the args and calls the appropriate function indicated by index.
- * Typically this function is used to pass args between kernel and use
- * space.
- * As lgmanl exists entirely in kernel, just pass the arg block directly
- * to the NAL CB, buy passing the args to lib_dispatch
- * Arguments are
- * nal_t nal Our nal
- * int index the api function that initiated this call
- * void *args packed block of function args
- * size_t arg_len length of args block
- * void *ret A return value for the API NAL
- * size_t ret_len Size of the return value
- *
- */
-
-int
-gmnal_api_forward(nal_t *nal, int index, void *args, size_t arg_len,
- void *ret, size_t ret_len)
-{
-
- nal_cb_t *nal_cb = NULL;
- gmnal_data_t *nal_data = NULL;
-
-
-
-
-
- if (!nal || !args || (index < 0) || (arg_len < 0)) {
- CDEBUG(D_ERROR, "Bad args to gmnal_api_forward\n");
- return (PTL_FAIL);
- }
-
- if (ret && (ret_len <= 0)) {
- CDEBUG(D_ERROR, "Bad args to gmnal_api_forward\n");
- return (PTL_FAIL);
- }
-
-
- if (!nal->nal_data) {
- CDEBUG(D_ERROR, "bad nal, no nal data\n");
- return (PTL_FAIL);
- }
-
- nal_data = nal->nal_data;
- CDEBUG(D_INFO, "nal_data is [%p]\n", nal_data);
-
- if (!nal_data->nal_cb) {
- CDEBUG(D_ERROR, "bad nal_data, no nal_cb\n");
- return (PTL_FAIL);
- }
-
- nal_cb = nal_data->nal_cb;
- CDEBUG(D_INFO, "nal_cb is [%p]\n", nal_cb);
-
- CDEBUG(D_PORTALS, "gmnal_api_forward calling lib_dispatch\n");
- lib_dispatch(nal_cb, NULL, index, args, ret);
- CDEBUG(D_PORTALS, "gmnal_api_forward returns from lib_dispatch\n");
-
- return(PTL_OK);
-}
-
-
/*
* gmnal_api_shutdown
* nal_refct == 0 => called on last matching PtlNIFini()
gmnal_api_shutdown(nal_t *nal, int interface)
{
gmnal_data_t *nal_data;
- nal_cb_t *nal_cb;
+ lib_nal_t *libnal;
if (nal->nal_refct != 0)
return;
CDEBUG(D_TRACE, "gmnal_api_shutdown: nal_data [%p]\n", nal_data);
LASSERT(nal == global_nal_data->nal);
- nal_data = nal->nal_data;
+ libnal = (lib_nal_t *)nal->nal_data;
+ nal_data = (gmnal_data_t *)libnal->libnal_data;
LASSERT(nal_data == global_nal_data);
- nal_cb = nal_data->nal_cb;
/* Stop portals calling our ioctl handler */
libcfs_nal_cmd_unregister(GMNAL);
* flag so when lib calls us we fail immediately and dont queue any
* more work but our threads can still call into lib OK. THEN
* shutdown our threads, THEN lib_fini() */
- lib_fini(nal_cb);
+ lib_fini(libnal);
gmnal_stop_rxthread(nal_data);
gmnal_stop_ctthread(nal_data);
GMNAL_GM_UNLOCK(nal_data);
if (nal_data->sysctl)
unregister_sysctl_table (nal_data->sysctl);
- PORTAL_FREE(nal, sizeof(nal_t));
+ /* Don't free 'nal'; it's a static struct */
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
global_nal_data = NULL;
PORTAL_MODULE_UNUSE;
}
-/*
- * gmnal_api_validate
- * validate a user address for use in communications
- * There's nothing to be done here
- */
-int
-gmnal_api_validate(nal_t *nal, void *base, size_t extent)
-{
-
- return(PTL_OK);
-}
-
-
-
-/*
- * gmnal_api_yield
- * Give up the processor
- */
-void
-gmnal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
- CDEBUG(D_TRACE, "gmnal_api_yield : nal [%p]\n", nal);
-
- if (milliseconds != 0) {
- CERROR("Blocking yield not implemented yet\n");
- LBUG();
- }
-
- our_cond_resched();
- return;
-}
-
-
-
-/*
- * gmnal_api_lock
- * Take a threadsafe lock
- */
-void
-gmnal_api_lock(nal_t *nal, unsigned long *flags)
-{
-
- gmnal_data_t *nal_data;
- nal_cb_t *nal_cb;
-
- nal_data = nal->nal_data;
- nal_cb = nal_data->nal_cb;
-
- nal_cb->cb_cli(nal_cb, flags);
-
- return;
-}
-
-/*
- * gmnal_api_unlock
- * Release a threadsafe lock
- */
-void
-gmnal_api_unlock(nal_t *nal, unsigned long *flags)
-{
- gmnal_data_t *nal_data;
- nal_cb_t *nal_cb;
-
- nal_data = nal->nal_data;
- nal_cb = nal_data->nal_cb;
-
- nal_cb->cb_sti(nal_cb, flags);
-
- return;
-}
-
-
int
gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
ptl_ni_limits_t *requested_limits,
ptl_ni_limits_t *actual_limits)
{
- nal_cb_t *nal_cb = NULL;
+ lib_nal_t *libnal = NULL;
gmnal_data_t *nal_data = NULL;
gmnal_srxd_t *srxd = NULL;
gm_status_t gm_status;
if (nal->nal_refct != 0) {
if (actual_limits != NULL) {
- nal_data = (gmnal_data_t *)nal->nal_data;
- nal_cb = nal_data->nal_cb;
- *actual_limits = nal->_cb->ni.actual_limits;
+ libnal = (lib_nal_t *)nal->nal_data;
+ *actual_limits = nal->libnal_ni.ni_actual_limits;
return (PTL_OK);
}
CDEBUG(D_INFO, "Allocd and reset nal_data[%p]\n", nal_data);
CDEBUG(D_INFO, "small_msg_size is [%d]\n", nal_data->small_msg_size);
- PORTAL_ALLOC(nal_cb, sizeof(nal_cb_t));
- if (!nal_cb) {
+ PORTAL_ALLOC(libnal, sizeof(lib_nal_t));
+ if (!libnal) {
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
return(PTL_NO_SPACE);
}
- memset(nal_cb, 0, sizeof(nal_cb_t));
- CDEBUG(D_INFO, "Allocd and reset nal_cb[%p]\n", nal_cb);
+ memset(libnal, 0, sizeof(lib_nal_t));
+ CDEBUG(D_INFO, "Allocd and reset libnal[%p]\n", libnal);
- GMNAL_INIT_NAL_CB(nal_cb);
+ GMNAL_INIT_NAL_CB(libnal);
/*
* String them all together
*/
- nal->nal_data = (void*)nal_data;
- nal_cb->nal_data = (void*)nal_data;
+ libnal->libnal_data = (void*)nal_data;
nal_data->nal = nal;
- nal_data->nal_cb = nal_cb;
+ nal_data->libnal = libnal;
- GMNAL_CB_LOCK_INIT(nal_data);
GMNAL_GM_LOCK_INIT(nal_data);
if (gm_init() != GM_SUCCESS) {
CDEBUG(D_ERROR, "call to gm_init failed\n");
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
nal_data->gm_local_nid = local_nid;
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
CDEBUG(D_INFO, "Global node id is [%u]\n", global_nid);
CDEBUG(D_INFO, "portals_nid is ["LPU64"]\n", process_id.nid);
CDEBUG(D_PORTALS, "calling lib_init\n");
- if (lib_init(nal_cb, process_id,
+ if (lib_init(libnal, nal, process_id,
requested_limits, actual_limits) != PTL_OK) {
CDEBUG(D_ERROR, "lib_init failed\n");
gmnal_stop_rxthread(nal_data);
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
/* XXX these cleanup cases should be restructured to
* minimise duplication... */
- lib_fini(nal_cb);
+ lib_fini(libnal);
gmnal_stop_rxthread(nal_data);
gmnal_stop_ctthread(nal_data);
gm_finalize();
GMNAL_GM_UNLOCK(nal_data);
PORTAL_FREE(nal_data, sizeof(gmnal_data_t));
- PORTAL_FREE(nal_cb, sizeof(nal_cb_t));
+ PORTAL_FREE(libnal, sizeof(lib_nal_t));
return(PTL_FAIL);
}
*/
void gmnal_fini()
{
- gmnal_data_t *nal_data = global_nal_data;
- nal_t *nal = nal_data->nal;
- nal_cb_t *nal_cb = nal_data->nal_cb;
-
CDEBUG(D_TRACE, "gmnal_fini\n");
LASSERT(global_nal_data == NULL);
#include "gmnal.h"
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+int gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
unsigned int niov, struct iovec *iov, size_t mlen,
size_t rlen)
{
int status = PTL_OK;
- CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], "
+ CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
"niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- nal_cb, private, cookie, niov, iov, mlen, rlen);
+ libnal, private, cookie, niov, iov, mlen, rlen);
switch(srxd->type) {
case(GMNAL_SMALL_MESSAGE):
CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
- status = gmnal_small_rx(nal_cb, private, cookie, niov,
+ status = gmnal_small_rx(libnal, private, cookie, niov,
iov, mlen, rlen);
break;
case(GMNAL_LARGE_MESSAGE_INIT):
CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
- status = gmnal_large_rx(nal_cb, private, cookie, niov,
+ status = gmnal_large_rx(libnal, private, cookie, niov,
iov, mlen, rlen);
}
return(status);
}
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+int gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
unsigned int kniov, ptl_kiov_t *kiov, size_t mlen,
size_t rlen)
{
ptl_kiov_t *kiov_dup = kiov;;
- CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], "
+ CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
"cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- nal_cb, private, cookie, kniov, kiov, mlen, rlen);
+ libnal, private, cookie, kniov, kiov, mlen, rlen);
if (srxd->type == GMNAL_SMALL_MESSAGE) {
PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
kiov++;
}
CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(nal_cb, private, cookie, kniov,
+ status = gmnal_small_rx(libnal, private, cookie, kniov,
iovec_dup, mlen, rlen);
for (i=0; i<kniov; i++) {
kunmap(kiov_dup->kiov_page);
}
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+int gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
unsigned int niov, struct iovec *iov, size_t len)
{
CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n",
niov, len, nid);
- nal_data = nal_cb->nal_data;
+ nal_data = libnal->libnal_data;
if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
CDEBUG(D_INFO, "This is a small message send\n");
- gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid,
+ gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
niov, iov, len);
} else {
CDEBUG(D_ERROR, "Large message send it is not supported\n");
- lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+ lib_finalize(libnal, private, cookie, PTL_FAIL);
return(PTL_FAIL);
- gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid,
+ gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
niov, iov, len);
}
return(PTL_OK);
}
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov, ptl_kiov_t *kiov, size_t len)
+int gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+ ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+ unsigned int kniov, ptl_kiov_t *kiov, size_t len)
{
int i = 0;
ptl_kiov_t *kiov_dup = kiov;
CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
- nal_data = nal_cb->nal_data;
+ nal_data = libnal->libnal_data;
PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
iovec_dup = iovec;
if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
iovec++;
kiov++;
}
- gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid,
+ gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
pid, kniov, iovec_dup, len);
} else {
CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
iovec++;
kiov++;
}
- gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid,
+ gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
pid, kniov, iovec, len);
}
for (i=0; i<kniov; i++) {
return(PTL_OK);
}
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst,
- user_ptr src, size_t len)
-{
- gm_bcopy(src, dst, len);
- return(PTL_OK);
-}
-
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst,
- void *src, size_t len)
-{
- gm_bcopy(src, dst, len);
- return(PTL_OK);
-}
-
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq,
- ptl_event_t *ev)
-{
-
- if (eq->event_callback != NULL) {
- CDEBUG(D_INFO, "found callback\n");
- eq->event_callback(ev);
- }
-
- return(PTL_OK);
-}
-
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
- void *ptr = NULL;
- CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
- PORTAL_ALLOC(ptr, len);
- return(ptr);
-}
-
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
- CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
- PORTAL_FREE(buf, len);
- return;
-}
-
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov,
- void **addrkey)
-{
- return;
-}
-
-int gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov,
- void**addrkey)
-{
- return(PTL_OK);
-}
-
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
- CDEBUG(D_TRACE, "gmnal_cb_printf\n");
- printk(fmt);
- return;
-}
-
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
- spin_lock_irqsave(&nal_data->cb_lock, *flags);
- return;
-}
-
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
-{
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
- spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
- return;
-}
-
-void gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
- /* holding cb_lock */
-
- if (eq->event_callback != NULL)
- eq->event_callback(ev);
-
- /* We will wake theads sleeping in yield() here, AFTER the
- * callback, when we implement blocking yield */
-}
-
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
+int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
{
CDEBUG(D_TRACE, "gmnal_cb_dist\n");
if (dist)
unsigned int snode, sport, type, length;
gmnal_msghdr_t *gmnal_msghdr;
ptl_hdr_t *portals_hdr;
+ int rc;
CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
nal_data, we, gmnal_type);
*/
srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
- srxd->nal_data = nal_data;
if (!srxd) {
CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
- lib_parse(nal_data->nal_cb, portals_hdr, srxd);
+ /* I think passing a NULL srxd to lib_parse will crash
+ * gmnal_recv() */
+ LBUG();
+ lib_parse(nal_data->libnal, portals_hdr, srxd);
return(GMNAL_STATUS_FAIL);
}
return(GMNAL_STATUS_OK);
}
+ srxd->nal_data = nal_data;
srxd->type = gmnal_type;
srxd->nsiov = gmnal_msghdr->niov;
srxd->gm_source_node = gmnal_msghdr->sender_node_id;
* cb_recv is responsible for returning the buffer
* for future receive
*/
- lib_parse(nal_data->nal_cb, portals_hdr, srxd);
+ rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
+
+ if (rc != PTL_OK) {
+ /* I just received garbage; take appropriate action... */
+ LBUG();
+ }
return(GMNAL_STATUS_OK);
}
* Call lib_finalize
*/
int
-gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
{
gmnal_srxd_t *srxd = NULL;
void *buffer = NULL;
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+ gmnal_data_t *nal_data = (gmnal_data_t*)libnal->nal_data;
CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
if (!private) {
CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
- lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+ lib_finalize(libnal, private, cookie, PTL_FAIL);
return(PTL_FAIL);
}
* let portals library know receive is complete
*/
CDEBUG(D_PORTALS, "calling lib_finalize\n");
- lib_finalize(nal_cb, private, cookie, PTL_OK);
+ lib_finalize(libnal, private, cookie, PTL_OK);
/*
* return buffer so it can be used again
*/
* The callback function informs when the send is complete.
*/
int
-gmnal_small_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
unsigned int niov, struct iovec *iov, int size)
{
- gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+ gmnal_data_t *nal_data = (gmnal_data_t*)libnal->nal_data;
gmnal_stxd_t *stxd = NULL;
void *buffer = NULL;
gmnal_msghdr_t *msghdr = NULL;
unsigned int local_nid;
gm_status_t gm_status = GM_SUCCESS;
- CDEBUG(D_TRACE, "gmnal_small_tx nal_cb [%p] private [%p] cookie [%p] "
+ CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
"hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
- "iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type,
+ "iov [%p] size [%d]\n", libnal, private, cookie, hdr, type,
global_nid, pid, niov, iov, size);
CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
lib_msg_t *cookie = stxd->cookie;
gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
- nal_cb_t *nal_cb = nal_data->nal_cb;
+ lib_nal_t *libnal = nal_data->libnal;
if (!stxd) {
CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
return;
}
gmnal_return_stxd(nal_data, stxd);
- lib_finalize(nal_cb, stxd, cookie, PTL_OK);
+ lib_finalize(libnal, stxd, cookie, PTL_OK);
return;
}
* this ack, deregister the memory. Only 1 send token is required here.
*/
int
-gmnal_large_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
unsigned int niov, struct iovec *iov, int size)
{
int niov_dup;
- CDEBUG(D_TRACE, "gmnal_large_tx nal_cb [%p] private [%p], cookie [%p] "
+ CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
"hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
- "iov [%p], size [%d]\n", nal_cb, private, cookie, hdr, type,
+ "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
global_nid, pid, niov, iov, size);
- if (nal_cb)
- nal_data = (gmnal_data_t*)nal_cb->nal_data;
+ if (libnal)
+ nal_data = (gmnal_data_t*)libnal->nal_data;
else {
- CDEBUG(D_ERROR, "no nal_cb.\n");
+ CDEBUG(D_ERROR, "no libnal.\n");
return(GMNAL_STATUS_FAIL);
}
* data from the sender.
*/
int
-gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
+gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
unsigned int nriov, struct iovec *riov, size_t mlen,
size_t rlen)
{
- gmnal_data_t *nal_data = nal_cb->nal_data;
+ gmnal_data_t *nal_data = libnal->nal_data;
gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
void *buffer = NULL;
struct iovec *riov_dup;
gmnal_msghdr_t *msghdr = NULL;
gm_status_t gm_status;
- CDEBUG(D_TRACE, "gmnal_large_rx :: nal_cb[%p], private[%p], "
+ CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
"cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- nal_cb, private, cookie, nriov, riov, mlen, rlen);
+ libnal, private, cookie, nriov, riov, mlen, rlen);
if (!srxd) {
CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
- lib_finalize(nal_cb, private, cookie, PTL_FAIL);
+ lib_finalize(libnal, private, cookie, PTL_FAIL);
return(PTL_FAIL);
}
gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
gmnal_srxd_t *srxd = ltxd->srxd;
- nal_cb_t *nal_cb = srxd->nal_data->nal_cb;
+ lib_nal_t *libnal = srxd->nal_data->libnal;
int lastone;
struct iovec *riov;
int nriov;
* Let our client application proceed
*/
CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
- lib_finalize(nal_cb, srxd, srxd->cookie, PTL_OK);
+ lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
/*
* send an ack to the sender to let him know we got the data
void
gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
{
- nal_cb_t *nal_cb = nal_data->nal_cb;
+ lib_nal_t *libnal = nal_data->libnal;
gmnal_stxd_t *stxd = NULL;
gmnal_msghdr_t *msghdr = NULL;
void *buffer = NULL;
CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
- lib_finalize(nal_cb, stxd, stxd->cookie, PTL_OK);
+ lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
/*
* extract the iovec from the stxd, deregister the memory.
#define QSWNAL_SYSCTL_COPY_SMALL_FWD 2
static ctl_table kqswnal_ctl_table[] = {
+ {QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_puts",
+ &kqswnal_tunables.kqn_optimized_puts, sizeof (int),
+ 0644, NULL, &proc_dointvec},
{QSWNAL_SYSCTL_OPTIMIZED_GETS, "optimized_gets",
&kqswnal_tunables.kqn_optimized_gets, sizeof (int),
0644, NULL, &proc_dointvec},
};
#endif
-static int
-kqswnal_forward(nal_t *nal,
- int id,
- void *args, size_t args_len,
- void *ret, size_t ret_len)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- lib_dispatch(nal_cb, k, id, args, ret); /* nal needs k */
- return (PTL_OK);
-}
-
-static void
-kqswnal_lock (nal_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- nal_cb->cb_cli(nal_cb,flags);
-}
-
-static void
-kqswnal_unlock(nal_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *k = nal->nal_data;
- nal_cb_t *nal_cb = k->kqn_cb;
-
- LASSERT (nal == &kqswnal_api);
- LASSERT (k == &kqswnal_data);
- LASSERT (nal_cb == &kqswnal_lib);
-
- nal_cb->cb_sti(nal_cb,flags);
-}
-
-static int
-kqswnal_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
- /* NB called holding statelock */
- wait_queue_t wait;
- unsigned long now = jiffies;
-
- CDEBUG (D_NET, "yield\n");
-
- if (milliseconds == 0) {
- if (need_resched())
- schedule();
- return 0;
- }
-
- init_waitqueue_entry(&wait, current);
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
- kqswnal_unlock(nal, flags);
-
- if (milliseconds < 0)
- schedule ();
- else
- schedule_timeout((milliseconds * HZ) / 1000);
-
- kqswnal_lock(nal, flags);
-
- remove_wait_queue(&kqswnal_data.kqn_yield_waitq, &wait);
-
- if (milliseconds > 0) {
- milliseconds -= ((jiffies - now) * 1000) / HZ;
- if (milliseconds < 0)
- milliseconds = 0;
- }
-
- return (milliseconds);
-}
-
int
kqswnal_get_tx_desc (struct portals_cfg *pcfg)
{
kqswnal_data.kqn_nid_offset);
kqswnal_data.kqn_nid_offset =
pcfg->pcfg_nid - kqswnal_data.kqn_elanid;
- kqswnal_lib.ni.nid = pcfg->pcfg_nid;
+ kqswnal_lib.libnal_ni.ni_pid.nid = pcfg->pcfg_nid;
return (0);
default:
ptl_process_id_t my_process_id;
int pkmem = atomic_read(&portal_kmemory);
+ LASSERT (nal == &kqswnal_api);
+
if (nal->nal_refct != 0) {
if (actual_limits != NULL)
- *actual_limits = kqswnal_lib.ni.actual_limits;
+ *actual_limits = kqswnal_lib.libnal_ni.ni_actual_limits;
/* This module got the first ref */
PORTAL_MODULE_USE;
return (PTL_OK);
CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&portal_kmemory));
- memset(&kqswnal_rpc_success, 0, sizeof(kqswnal_rpc_success));
- memset(&kqswnal_rpc_failed, 0, sizeof(kqswnal_rpc_failed));
-#if MULTIRAIL_EKC
- kqswnal_rpc_failed.Data[0] = -ECONNREFUSED;
-#else
- kqswnal_rpc_failed.Status = -ECONNREFUSED;
-#endif
/* ensure all pointers NULL etc */
memset (&kqswnal_data, 0, sizeof (kqswnal_data));
- kqswnal_data.kqn_cb = &kqswnal_lib;
-
INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
INIT_LIST_HEAD (&kqswnal_data.kqn_nblk_idletxds);
INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
spin_lock_init (&kqswnal_data.kqn_sched_lock);
init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
- spin_lock_init (&kqswnal_data.kqn_statelock);
- init_waitqueue_head (&kqswnal_data.kqn_yield_waitq);
+ /* Leave kqn_rpc_success zeroed */
+#if MULTIRAIL_EKC
+ kqswnal_data.kqn_rpc_failed.Data[0] = -ECONNREFUSED;
+#else
+ kqswnal_data.kqn_rpc_failed.Status = -ECONNREFUSED;
+#endif
/* pointers/lists/locks initialised */
kqswnal_data.kqn_init = KQN_INIT_DATA;
kqswnal_data.kqn_ep = ep_system();
if (kqswnal_data.kqn_ep == NULL) {
CERROR("Can't initialise EKC\n");
- kqswnal_shutdown(&kqswnal_api);
+ kqswnal_shutdown(nal);
return (PTL_IFACE_INVALID);
}
if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
CERROR("Can't get elan ID\n");
- kqswnal_shutdown(&kqswnal_api);
+ kqswnal_shutdown(nal);
return (PTL_IFACE_INVALID);
}
#else
if (kqswnal_data.kqn_ep == NULL)
{
CERROR ("Can't get elan device 0\n");
- kqswnal_shutdown(&kqswnal_api);
+ kqswnal_shutdown(nal);
return (PTL_IFACE_INVALID);
}
#endif
if (kqswnal_data.kqn_eptx == NULL)
{
CERROR ("Can't allocate transmitter\n");
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
if (kqswnal_data.kqn_eprx_small == NULL)
{
CERROR ("Can't install small msg receiver\n");
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
if (kqswnal_data.kqn_eprx_large == NULL)
{
CERROR ("Can't install large msg receiver\n");
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
EP_PERM_WRITE);
if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
CERROR("Can't reserve tx dma space\n");
- kqswnal_shutdown(&kqswnal_api);
+ kqswnal_shutdown(nal);
return (PTL_NO_SPACE);
}
#else
if (rc != DDI_SUCCESS)
{
CERROR ("Can't reserve rx dma space\n");
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
#endif
EP_PERM_WRITE);
if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
CERROR("Can't reserve rx dma space\n");
- kqswnal_shutdown(&kqswnal_api);
+ kqswnal_shutdown(nal);
return (PTL_NO_SPACE);
}
#else
if (rc != DDI_SUCCESS)
{
CERROR ("Can't reserve rx dma space\n");
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
#endif
sizeof(kqswnal_tx_t) * (KQSW_NTXMSGS + KQSW_NNBLK_TXMSGS));
if (kqswnal_data.kqn_txds == NULL)
{
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
PORTAL_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
if (ktx->ktx_buffer == NULL)
{
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
sizeof (kqswnal_rx_t) * (KQSW_NRXMSGS_SMALL + KQSW_NRXMSGS_LARGE));
if (kqswnal_data.kqn_rxds == NULL)
{
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
struct page *page = alloc_page(GFP_KERNEL);
if (page == NULL) {
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_NO_SPACE);
}
my_process_id.nid = kqswnal_elanid2nid(kqswnal_data.kqn_elanid);
my_process_id.pid = 0;
- rc = lib_init(&kqswnal_lib, my_process_id,
+ rc = lib_init(&kqswnal_lib, nal, my_process_id,
requested_limits, actual_limits);
if (rc != PTL_OK)
{
CERROR ("lib_init failed %d\n", rc);
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (rc);
}
kqswnal_rx_t *krx = &kqswnal_data.kqn_rxds[i];
/* NB this enqueue can allocate/sleep (attr == 0) */
+ krx->krx_state = KRX_POSTED;
#if MULTIRAIL_EKC
rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
&krx->krx_elanbuffer, 0);
if (rc != EP_SUCCESS)
{
CERROR ("failed ep_queue_receive %d\n", rc);
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_FAIL);
}
}
if (rc != 0)
{
CERROR ("failed to spawn scheduling thread: %d\n", rc);
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_FAIL);
}
}
rc = libcfs_nal_cmd_register (QSWNAL, &kqswnal_cmd, NULL);
if (rc != 0) {
CERROR ("Can't initialise command interface (rc = %d)\n", rc);
- kqswnal_shutdown (&kqswnal_api);
+ kqswnal_shutdown (nal);
return (PTL_FAIL);
}
{
int rc;
- kqswnal_api.startup = kqswnal_startup;
- kqswnal_api.shutdown = kqswnal_shutdown;
- kqswnal_api.forward = kqswnal_forward;
- kqswnal_api.yield = kqswnal_yield;
- kqswnal_api.lock = kqswnal_lock;
- kqswnal_api.unlock = kqswnal_unlock;
- kqswnal_api.nal_data = &kqswnal_data;
-
- kqswnal_lib.nal_data = &kqswnal_data;
+ kqswnal_api.nal_ni_init = kqswnal_startup;
+ kqswnal_api.nal_ni_fini = kqswnal_shutdown;
/* Initialise dynamic tunables to defaults once only */
+ kqswnal_tunables.kqn_optimized_puts = KQSW_OPTIMIZED_PUTS;
kqswnal_tunables.kqn_optimized_gets = KQSW_OPTIMIZED_GETS;
rc = ptl_register_nal(QSWNAL, &kqswnal_api);
#define KQSW_RESCHED 100 /* # busy loops that forces scheduler to yield */
-#define KQSW_OPTIMIZED_GETS 1 /* optimized gets? */
+#define KQSW_OPTIMIZED_GETS 1 /* optimize gets >= this size */
+#define KQSW_OPTIMIZED_PUTS (32<<10) /* optimize puts >= this size */
#define KQSW_COPY_SMALL_FWD 0 /* copy small fwd messages to pre-mapped buffer? */
/*
int krx_npages; /* # pages in receive buffer */
int krx_nob; /* Number Of Bytes received into buffer */
int krx_rpc_reply_needed; /* peer waiting for EKC RPC reply */
- int krx_rpc_reply_sent; /* rpc reply sent */
+ int krx_rpc_reply_status; /* what status to send */
+ int krx_state; /* what this RX is doing */
atomic_t krx_refcount; /* how to tell when rpc is done */
kpr_fwd_desc_t krx_fwd; /* embedded forwarding descriptor */
ptl_kiov_t krx_kiov[KQSW_NRXMSGPAGES_LARGE]; /* buffer frags */
} kqswnal_rx_t;
+#define KRX_POSTED 1 /* receiving */
+#define KRX_PARSE 2 /* ready to be parsed */
+#define KRX_COMPLETING 3 /* waiting to be completed */
+
+
typedef struct
{
struct list_head ktx_list; /* enqueue idle/active */
int ktx_nmappedpages; /* # pages mapped for current message */
int ktx_port; /* destination ep port */
ptl_nid_t ktx_nid; /* destination node */
- void *ktx_args[2]; /* completion passthru */
+ void *ktx_args[3]; /* completion passthru */
char *ktx_buffer; /* pre-allocated contiguous buffer for hdr + small payloads */
unsigned long ktx_launchtime; /* when (in jiffies) the transmit was launched */
} kqswnal_tx_t;
#define KTX_IDLE 0 /* on kqn_(nblk_)idletxds */
-#define KTX_SENDING 1 /* local send */
-#define KTX_FORWARDING 2 /* routing a packet */
-#define KTX_GETTING 3 /* local optimised get */
+#define KTX_FORWARDING 1 /* sending a forwarded packet */
+#define KTX_SENDING 2 /* normal send */
+#define KTX_GETTING 3 /* sending optimised get */
+#define KTX_PUTTING 4 /* sending optimised put */
+#define KTX_RDMAING 5 /* handling optimised put/get */
typedef struct
{
/* dynamic tunables... */
+ int kqn_optimized_puts; /* optimized PUTs? */
int kqn_optimized_gets; /* optimized GETs? */
#if CONFIG_SYSCTL
struct ctl_table_header *kqn_sysctl; /* sysctl interface */
struct list_head kqn_delayedfwds; /* delayed forwards */
struct list_head kqn_delayedtxds; /* delayed transmits */
- spinlock_t kqn_statelock; /* cb_cli/cb_sti */
- wait_queue_head_t kqn_yield_waitq; /* where yield waits */
- nal_cb_t *kqn_cb; /* -> kqswnal_lib */
#if MULTIRAIL_EKC
EP_SYS *kqn_ep; /* elan system */
EP_NMH *kqn_ep_tx_nmh; /* elan reserved tx vaddrs */
ptl_nid_t kqn_nid_offset; /* this cluster's NID offset */
int kqn_nnodes; /* this cluster's size */
int kqn_elanid; /* this nodes's elan ID */
+
+ EP_STATUSBLK kqn_rpc_success; /* preset RPC reply status blocks */
+ EP_STATUSBLK kqn_rpc_failed;
} kqswnal_data_t;
/* kqn_init state */
#define KQN_INIT_LIB 2
#define KQN_INIT_ALL 3
-extern nal_cb_t kqswnal_lib;
+extern lib_nal_t kqswnal_lib;
extern nal_t kqswnal_api;
extern kqswnal_tunables_t kqswnal_tunables;
extern kqswnal_data_t kqswnal_data;
-/* global pre-prepared replies to keep off the stack */
-extern EP_STATUSBLK kqswnal_rpc_success;
-extern EP_STATUSBLK kqswnal_rpc_failed;
-
extern int kqswnal_thread_start (int (*fn)(void *arg), void *arg);
extern void kqswnal_rxhandler(EP_RXD *rxd);
extern int kqswnal_scheduler (void *);
extern void kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
-extern void kqswnal_dma_reply_complete (EP_RXD *rxd);
-extern void kqswnal_requeue_rx (kqswnal_rx_t *krx);
+extern void kqswnal_rx_done (kqswnal_rx_t *krx);
static inline ptl_nid_t
kqswnal_elanid2nid (int elanid)
return (nid - kqswnal_data.kqn_nid_offset);
}
+static inline ptl_nid_t
+kqswnal_rx_nid(kqswnal_rx_t *krx)
+{
+ return (kqswnal_elanid2nid(ep_rxd_node(krx->krx_rxd)));
+}
+
static inline int
kqswnal_pages_spanned (void *base, int nob)
{
}
#endif
-static inline void kqswnal_rx_done (kqswnal_rx_t *krx)
+static inline void kqswnal_rx_decref (kqswnal_rx_t *krx)
{
LASSERT (atomic_read (&krx->krx_refcount) > 0);
if (atomic_dec_and_test (&krx->krx_refcount))
- kqswnal_requeue_rx(krx);
+ kqswnal_rx_done(krx);
}
#if MULTIRAIL_EKC
#include "qswnal.h"
-EP_STATUSBLK kqswnal_rpc_success;
-EP_STATUSBLK kqswnal_rpc_failed;
-
/*
* LIB functions follow
*
*/
-static ptl_err_t
-kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static ptl_err_t
-kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
- size_t len)
-{
- CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
- nal->ni.nid, len, src_addr, dst_addr );
- memcpy( dst_addr, src_addr, len );
-
- return (PTL_OK);
-}
-
-static void *
-kqswnal_malloc(nal_cb_t *nal, size_t len)
-{
- void *buf;
-
- PORTAL_ALLOC(buf, len);
- return (buf);
-}
-
-static void
-kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
-{
- PORTAL_FREE(buf, len);
-}
-
-static void
-kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
-{
- va_list ap;
- char msg[256];
-
- va_start (ap, fmt);
- vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
- va_end (ap);
-
- msg[sizeof (msg) - 1] = 0; /* ensure terminated */
-
- CDEBUG (D_NET, "%s", msg);
-}
-
-#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
-# error "Can't save/restore irq contexts in different procedures"
-#endif
-
-static void
-kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_lock_irqsave(&data->kqn_statelock, *flags);
-}
-
-
-static void
-kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
-{
- kqswnal_data_t *data= nal->nal_data;
-
- spin_unlock_irqrestore(&data->kqn_statelock, *flags);
-}
-
-static void
-kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
- /* holding kqn_statelock */
-
- if (eq->event_callback != NULL)
- eq->event_callback(ev);
-
- if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
- wake_up_all(&kqswnal_data.kqn_yield_waitq);
-}
-
static int
-kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
{
- if (nid == nal->ni.nid)
+ if (nid == nal->libnal_ni.ni_pid.nid)
*dist = 0; /* it's me */
else if (kqswnal_nid2elanid (nid) >= 0)
*dist = 1; /* it's my peer */
do {
int fraglen = kiov->kiov_len - offset;
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
- /* each frag fits in a page */
+ /* each page frag is contained in one page */
LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
+ if (fraglen > nob)
+ fraglen = nob;
+
nmapped++;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
do {
int fraglen = iov->iov_len - offset;
- long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
-
- /* nob exactly spans the iovs */
- LASSERT (fraglen <= nob);
+ long npages;
+ if (fraglen > nob)
+ fraglen = nob;
+ npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
+
nmapped += npages;
if (nmapped > maxmapped) {
CERROR("Can't map message in %d pages (max %d)\n",
void
kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
{
- lib_msg_t *msg;
- lib_msg_t *repmsg = NULL;
-
switch (ktx->ktx_state) {
case KTX_FORWARDING: /* router asked me to forward this packet */
kpr_fwd_done (&kqswnal_data.kqn_router,
(kpr_fwd_desc_t *)ktx->ktx_args[0], error);
break;
- case KTX_SENDING: /* packet sourced locally */
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
+ case KTX_RDMAING: /* optimized GET/PUT handled */
+ case KTX_PUTTING: /* optimized PUT sent */
+ case KTX_SENDING: /* normal send */
+ lib_finalize (&kqswnal_lib, NULL,
(lib_msg_t *)ktx->ktx_args[1],
- (error == 0) ? PTL_OK :
- (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
- case KTX_GETTING: /* Peer has DMA-ed direct? */
- msg = (lib_msg_t *)ktx->ktx_args[1];
-
- if (error == 0) {
- repmsg = lib_create_reply_msg (&kqswnal_lib,
- ktx->ktx_nid, msg);
- if (repmsg == NULL)
- error = -ENOMEM;
- }
-
- if (error == 0) {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
- msg, PTL_OK);
- lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
- } else {
- lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
- (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
- }
+ case KTX_GETTING: /* optimized GET sent & REPLY received */
+ /* Complete the GET with success since we can't avoid
+ * delivering a REPLY event; we committed to it when we
+ * launched the GET */
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
+ lib_finalize (&kqswnal_lib, NULL,
+ (lib_msg_t *)ktx->ktx_args[2],
+ (error == 0) ? PTL_OK : PTL_FAIL);
break;
default:
kqswnal_notify_peer_down(ktx);
status = -EHOSTDOWN;
- } else if (ktx->ktx_state == KTX_GETTING) {
- /* RPC completed OK; what did our peer put in the status
+ } else switch (ktx->ktx_state) {
+
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* RPC completed OK; but what did our peer put in the status
* block? */
#if MULTIRAIL_EKC
status = ep_txd_statusblk(txd)->Data[0];
#else
status = ep_txd_statusblk(txd)->Status;
#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
status = 0;
+ break;
+
+ default:
+ LBUG();
+ break;
}
kqswnal_tx_done (ktx, status);
return (-ESHUTDOWN);
LASSERT (dest >= 0); /* must be a peer */
- if (ktx->ktx_state == KTX_GETTING) {
- /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
- * other frags are the GET sink which we obviously don't
- * send here :) */
-#if MULTIRAIL_EKC
+
+ switch (ktx->ktx_state) {
+ case KTX_GETTING:
+ case KTX_PUTTING:
+ /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
+ * The other frags are the payload, awaiting RDMA */
rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
NULL, ktx->ktx_frags, 1);
-#else
- rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
- ktx->ktx_port, attr, kqswnal_txhandler,
- ktx, NULL, ktx->ktx_frags, 1);
-#endif
- } else {
+ break;
+
+ case KTX_FORWARDING:
+ case KTX_SENDING:
#if MULTIRAIL_EKC
rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
ktx->ktx_port, attr,
kqswnal_txhandler, ktx,
ktx->ktx_frags, ktx->ktx_nfrag);
#endif
+ break;
+
+ default:
+ LBUG();
+ rc = -EINVAL; /* no compiler warning please */
+ break;
}
switch (rc) {
}
}
+#if 0
static char *
hdr_type_string (ptl_hdr_t *hdr)
{
}
} /* end of print_hdr() */
+#endif
#if !MULTIRAIL_EKC
void
CERROR ("DATAVEC too small\n");
return (-E2BIG);
}
+#else
+int
+kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
+ int nrfrag, EP_NMD *rfrag)
+{
+ int i;
+
+ if (nlfrag != nrfrag) {
+ CERROR("Can't cope with unequal # frags: %d local %d remote\n",
+ nlfrag, nrfrag);
+ return (-EINVAL);
+ }
+
+ for (i = 0; i < nlfrag; i++)
+ if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
+ CERROR("Can't cope with unequal frags %d(%d):"
+ " %d local %d remote\n",
+ i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
+ return (-EINVAL);
+ }
+
+ return (0);
+}
#endif
-int
-kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
- struct iovec *iov, ptl_kiov_t *kiov,
- int offset, int nob)
+kqswnal_remotemd_t *
+kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
{
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
- int rc;
-#if MULTIRAIL_EKC
- int i;
-#else
- EP_DATAVEC datav[EP_MAXFRAG];
- int ndatav;
-#endif
- LASSERT (krx->krx_rpc_reply_needed);
- LASSERT ((iov == NULL) != (kiov == NULL));
+ ptl_nid_t nid = kqswnal_rx_nid(krx);
+
+ /* Note (1) lib_parse has already flipped hdr.
+ * (2) RDMA addresses are sent in native endian-ness. When
+ * EKC copes with different endian nodes, I'll fix this (and
+ * eat my hat :) */
+
+ LASSERT (krx->krx_nob >= sizeof(*hdr));
+
+ if (hdr->type != type) {
+ CERROR ("Unexpected optimized get/put type %d (%d expected)"
+ "from "LPX64"\n", hdr->type, type, nid);
+ return (NULL);
+ }
+
+ if (hdr->src_nid != nid) {
+ CERROR ("Unexpected optimized get/put source NID "
+ LPX64" from "LPX64"\n", hdr->src_nid, nid);
+ return (NULL);
+ }
+
+ LASSERT (nid == expected_nid);
- /* see kqswnal_sendmsg comment regarding endian-ness */
if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
/* msg too small to discover rmd size */
CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
- return (-EINVAL);
+ return (NULL);
}
-
+
if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
/* rmd doesn't fit in the incoming message */
CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
krx->krx_nob, rmd->kqrmd_nfrag,
(int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
- return (-EINVAL);
+ return (NULL);
}
- /* Map the source data... */
+ return (rmd);
+}
+
+void
+kqswnal_rdma_store_complete (EP_RXD *rxd)
+{
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ krx->krx_rpc_reply_needed = 0;
+ kqswnal_rx_decref (krx);
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
+}
+
+void
+kqswnal_rdma_fetch_complete (EP_RXD *rxd)
+{
+ /* Completed fetching the PUT data */
+ int status = ep_rxd_status(rxd);
+ kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
+ kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
+ unsigned long flags;
+
+ CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
+ "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+
+ LASSERT (ktx->ktx_state == KTX_RDMAING);
+ LASSERT (krx->krx_rxd == rxd);
+ LASSERT (krx->krx_rpc_reply_needed);
+
+ /* Set the RPC completion status */
+ status = (status == EP_SUCCESS) ? 0 : -ECONNABORTED;
+ krx->krx_rpc_reply_status = status;
+
+ /* free ktx & finalize() its lib_msg_t */
+ kqswnal_tx_done(ktx, status);
+
+ if (!in_interrupt()) {
+ /* OK to complete the RPC now (iff I had the last ref) */
+ kqswnal_rx_decref (krx);
+ return;
+ }
+
+ LASSERT (krx->krx_state == KRX_PARSE);
+ krx->krx_state = KRX_COMPLETING;
+
+ /* Complete the RPC in thread context */
+ spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+
+ list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
+ wake_up (&kqswnal_data.kqn_sched_waitq);
+
+ spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+}
+
+int
+kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
+ int niov, struct iovec *iov, ptl_kiov_t *kiov,
+ size_t offset, size_t len)
+{
+ kqswnal_remotemd_t *rmd;
+ kqswnal_tx_t *ktx;
+ int eprc;
+ int rc;
+#if !MULTIRAIL_EKC
+ EP_DATAVEC datav[EP_MAXFRAG];
+ int ndatav;
+#endif
+
+ LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
+ /* Not both mapped and paged payload */
+ LASSERT (iov == NULL || kiov == NULL);
+ /* RPC completes with failure by default */
+ LASSERT (krx->krx_rpc_reply_needed);
+ LASSERT (krx->krx_rpc_reply_status != 0);
+
+ rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
+ if (rmd == NULL)
+ return (-EPROTO);
+
+ if (len == 0) {
+ /* data got truncated to nothing. */
+ lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
+ /* Let kqswnal_rx_done() complete the RPC with success */
+ krx->krx_rpc_reply_status = 0;
+ return (0);
+ }
+
+ /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
+ actually sending a portals message with it */
+ ktx = kqswnal_get_idle_tx(NULL, 0);
+ if (ktx == NULL) {
+ CERROR ("Can't get txd for RDMA with "LPX64"\n",
+ libmsg->ev.initiator.nid);
+ return (-ENOMEM);
+ }
+
+ ktx->ktx_state = KTX_RDMAING;
+ ktx->ktx_nid = libmsg->ev.initiator.nid;
+ ktx->ktx_args[0] = krx;
+ ktx->ktx_args[1] = libmsg;
+
+ /* Start mapping at offset 0 (we're not mapping any headers) */
ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
+
if (kiov != NULL)
- rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
+ rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
else
- rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
+ rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
if (rc != 0) {
- CERROR ("Can't map source data: %d\n", rc);
- return (rc);
+ CERROR ("Can't map local RDMA data: %d\n", rc);
+ goto out;
}
#if MULTIRAIL_EKC
- if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
- CERROR("Can't cope with unequal # frags: %d local %d remote\n",
- ktx->ktx_nfrag, rmd->kqrmd_nfrag);
- return (-EINVAL);
+ rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ if (rc != 0) {
+ CERROR ("Incompatible RDMA descriptors\n");
+ goto out;
}
-
- for (i = 0; i < rmd->kqrmd_nfrag; i++)
- if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
- CERROR("Can't cope with unequal frags %d(%d):"
- " %d local %d remote\n",
- i, rmd->kqrmd_nfrag,
- ktx->ktx_frags[i].nmd_len,
- rmd->kqrmd_frag[i].nmd_len);
- return (-EINVAL);
- }
#else
- ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
- ktx->ktx_nfrag, ktx->ktx_frags,
- rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ switch (type) {
+ default:
+ LBUG();
+
+ case PTL_MSG_GET:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ ktx->ktx_nfrag, ktx->ktx_frags,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag);
+ break;
+
+ case PTL_MSG_PUT:
+ ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
+ rmd->kqrmd_nfrag, rmd->kqrmd_frag,
+ ktx->ktx_nfrag, ktx->ktx_frags);
+ break;
+ }
+
if (ndatav < 0) {
CERROR ("Can't create datavec: %d\n", ndatav);
- return (ndatav);
+ rc = ndatav;
+ goto out;
}
#endif
- /* Our caller will start to race with kqswnal_dma_reply_complete... */
- LASSERT (atomic_read (&krx->krx_refcount) == 1);
- atomic_set (&krx->krx_refcount, 2);
+ LASSERT (atomic_read(&krx->krx_refcount) > 0);
+ /* Take an extra ref for the completion callback */
+ atomic_inc(&krx->krx_refcount);
-#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success,
- ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
- if (rc == EP_SUCCESS)
- return (0);
+ switch (type) {
+ default:
+ LBUG();
- /* Well we tried... */
- krx->krx_rpc_reply_needed = 0;
+ case PTL_MSG_GET:
+#if MULTIRAIL_EKC
+ eprc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
#else
- rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
- &kqswnal_rpc_success, datav, ndatav);
- if (rc == EP_SUCCESS)
- return (0);
-
- /* "old" EKC destroys rxd on failed completion */
- krx->krx_rxd = NULL;
+ eprc = ep_complete_rpc (krx->krx_rxd,
+ kqswnal_rdma_store_complete, ktx,
+ &kqswnal_data.kqn_rpc_success,
+ datav, ndatav);
+ if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("can't complete RPC: %d\n", eprc);
+ /* don't re-attempt RPC completion */
+ krx->krx_rpc_reply_needed = 0;
+ rc = -ECONNABORTED;
+ }
+ break;
+
+ case PTL_MSG_PUT:
+#if MULTIRAIL_EKC
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
+#else
+ eprc = ep_rpc_get (krx->krx_rxd,
+ kqswnal_rdma_fetch_complete, ktx,
+ datav, ndatav);
+#endif
+ if (eprc != EP_SUCCESS) {
+ CERROR("ep_rpc_get failed: %d\n", eprc);
+ rc = -ECONNABORTED;
+ }
+ break;
+ }
- CERROR("can't complete RPC: %d\n", rc);
-
- /* reset refcount back to 1: we're not going to be racing with
- * kqswnal_dma_reply_complete. */
- atomic_set (&krx->krx_refcount, 1);
+ out:
+ if (rc != 0) {
+ kqswnal_rx_decref(krx); /* drop callback's ref */
+ kqswnal_put_idle_tx (ktx);
+ }
- return (-ECONNABORTED);
+ atomic_dec(&kqswnal_data.kqn_pending_txs);
+ return (rc);
}
static ptl_err_t
-kqswnal_sendmsg (nal_cb_t *nal,
+kqswnal_sendmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int sumoff;
int sumnob;
#endif
+ /* NB 1. hdr is in network byte order */
+ /* 2. 'private' depends on the message type */
CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
" pid %u\n", payload_nob, payload_niov, nid, pid);
return (PTL_FAIL);
}
+ if (type == PTL_MSG_REPLY && /* can I look in 'private' */
+ ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
+ /* Must be a REPLY for an optimized GET */
+ rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
+ payload_niov, payload_iov, payload_kiov,
+ payload_offset, payload_nob);
+ return ((rc == 0) ? PTL_OK : PTL_FAIL);
+ }
+
targetnid = nid;
if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
type == PTL_MSG_REPLY ||
in_interrupt()));
if (ktx == NULL) {
- kqswnal_cerror_hdr (hdr);
+ CERROR ("Can't get txd for msg type %d for "LPX64"\n",
+ type, libmsg->ev.initiator.nid);
return (PTL_NO_SPACE);
}
+ ktx->ktx_state = KTX_SENDING;
ktx->ktx_nid = targetnid;
ktx->ktx_args[0] = private;
ktx->ktx_args[1] = libmsg;
-
- if (type == PTL_MSG_REPLY &&
- ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
- if (nid != targetnid ||
- kqswnal_nid2elanid(nid) !=
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
- CERROR("Optimized reply nid conflict: "
- "nid "LPX64" via "LPX64" elanID %d\n",
- nid, targetnid,
- ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
- rc = -EINVAL;
- goto out;
- }
-
- /* peer expects RPC completion with GET data */
- rc = kqswnal_dma_reply (ktx, payload_niov,
- payload_iov, payload_kiov,
- payload_offset, payload_nob);
- if (rc != 0)
- CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
- goto out;
- }
+ ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
#endif
- if (kqswnal_tunables.kqn_optimized_gets &&
- type == PTL_MSG_GET && /* doing a GET */
- nid == targetnid) { /* not forwarding */
+ /* The first frag will be the pre-mapped buffer for (at least) the
+ * portals header. */
+ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
+
+ if (nid == targetnid && /* not forwarding */
+ ((type == PTL_MSG_GET && /* optimize GET? */
+ kqswnal_tunables.kqn_optimized_gets != 0 &&
+ NTOH__u32(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
+ (type == PTL_MSG_PUT && /* optimize PUT? */
+ kqswnal_tunables.kqn_optimized_puts != 0 &&
+ payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
lib_md_t *md = libmsg->md;
kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
- /* Optimised path: I send over the Elan vaddrs of the get
- * sink buffers, and my peer DMAs directly into them.
+ /* Optimised path: I send over the Elan vaddrs of the local
+ * buffers, and my peer DMAs directly to/from them.
*
* First I set up ktx as if it was going to send this
* payload, (it needs to map it anyway). This fills
* ktx_frags[1] and onward with the network addresses
* of the GET sink frags. I copy these into ktx_buffer,
- * immediately after the header, and send that as my GET
- * message.
- *
- * Note that the addresses are sent in native endian-ness.
- * When EKC copes with different endian nodes, I'll fix
- * this (and eat my hat :) */
+ * immediately after the header, and send that as my
+ * message. */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_GETTING;
+ ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
if ((libmsg->md->options & PTL_MD_KIOV) != 0)
rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
#endif
+ if (type == PTL_MSG_GET) {
+ /* Allocate reply message now while I'm in thread context */
+ ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
+ nid, libmsg);
+ if (ktx->ktx_args[2] == NULL)
+ goto out;
+
+ /* NB finalizing the REPLY message is my
+ * responsibility now, whatever happens. */
+ }
+
} else if (payload_nob <= KQSW_TX_MAXCONTIG) {
/* small message: single frag copied into the pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE + payload_nob);
/* large message: multiple frags: first is hdr in pre-mapped buffer */
- ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
- ktx->ktx_state = KTX_SENDING;
#if MULTIRAIL_EKC
ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
0, KQSW_HDR_SIZE);
rc == 0 ? "Sent" : "Failed to send",
payload_nob, nid, targetnid, rc);
- if (rc != 0)
+ if (rc != 0) {
+ if (ktx->ktx_state == KTX_GETTING &&
+ ktx->ktx_args[2] != NULL) {
+ /* We committed to reply, but there was a problem
+ * launching the GET. We can't avoid delivering a
+ * REPLY event since we committed above, so we
+ * pretend the GET succeeded but the REPLY
+ * failed. */
+ rc = 0;
+ lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
+ lib_finalize (&kqswnal_lib, private,
+ (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
+ }
+
kqswnal_put_idle_tx (ktx);
-
+ }
+
atomic_dec(&kqswnal_data.kqn_pending_txs);
return (rc == 0 ? PTL_OK : PTL_FAIL);
}
static ptl_err_t
-kqswnal_send (nal_cb_t *nal,
+kqswnal_send (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
}
static ptl_err_t
-kqswnal_send_pages (nal_cb_t *nal,
+kqswnal_send_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
if (ktx == NULL) /* can't get txd right now */
return; /* fwd will be scheduled when tx desc freed */
- if (nid == kqswnal_lib.ni.nid) /* gateway is me */
+ if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
nid = fwd->kprfd_target_nid; /* target is final dest */
if (kqswnal_nid2elanid (nid) < 0) {
if (rc != 0) {
CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
- kqswnal_put_idle_tx (ktx);
/* complete now (with failure) */
- kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
+ kqswnal_tx_done (ktx, rc);
}
atomic_dec(&kqswnal_data.kqn_pending_txs);
NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
}
- kqswnal_requeue_rx (krx);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
+ kqswnal_rx_decref (krx);
}
void
-kqswnal_dma_reply_complete (EP_RXD *rxd)
+kqswnal_requeue_rx (kqswnal_rx_t *krx)
{
- int status = ep_rxd_status(rxd);
- kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
- kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
- lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
-
- CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
- "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
+ LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (!krx->krx_rpc_reply_needed);
- LASSERT (krx->krx_rxd == rxd);
- LASSERT (krx->krx_rpc_reply_needed);
+ krx->krx_state = KRX_POSTED;
- krx->krx_rpc_reply_needed = 0;
- kqswnal_rx_done (krx);
+#if MULTIRAIL_EKC
+ if (kqswnal_data.kqn_shuttingdown) {
+ /* free EKC rxd on shutdown */
+ ep_complete_receive(krx->krx_rxd);
+ } else {
+ /* repost receive */
+ ep_requeue_receive(krx->krx_rxd,
+ kqswnal_rxhandler, krx,
+ &krx->krx_elanbuffer, 0);
+ }
+#else
+ if (kqswnal_data.kqn_shuttingdown)
+ return;
- lib_finalize (&kqswnal_lib, NULL, msg,
- (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
- kqswnal_put_idle_tx (ktx);
+ if (krx->krx_rxd == NULL) {
+ /* We had a failed ep_complete_rpc() which nukes the
+ * descriptor in "old" EKC */
+ int eprc = ep_queue_receive(krx->krx_eprx,
+ kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE, 0);
+ LASSERT (eprc == EP_SUCCESS);
+ /* We don't handle failure here; it's incredibly rare
+ * (never reported?) and only happens with "old" EKC */
+ } else {
+ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
+ krx->krx_elanbuffer,
+ krx->krx_npages * PAGE_SIZE);
+ }
+#endif
}
void
}
void
-kqswnal_requeue_rx (kqswnal_rx_t *krx)
+kqswnal_rx_done (kqswnal_rx_t *krx)
{
- int rc;
+ int rc;
+ EP_STATUSBLK *sblk;
LASSERT (atomic_read(&krx->krx_refcount) == 0);
if (krx->krx_rpc_reply_needed) {
+ /* We've not completed the peer's RPC yet... */
+ sblk = (krx->krx_rpc_reply_status == 0) ?
+ &kqswnal_data.kqn_rpc_success :
+ &kqswnal_data.kqn_rpc_failed;
- /* We failed to complete the peer's optimized GET (e.g. we
- * couldn't map the source buffers). We complete the
- * peer's EKC rpc now with failure. */
+ LASSERT (!in_interrupt());
#if MULTIRAIL_EKC
- rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, NULL, 0);
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, NULL, 0);
if (rc == EP_SUCCESS)
return;
-
- CERROR("can't complete RPC: %d\n", rc);
#else
- if (krx->krx_rxd != NULL) {
- /* We didn't try (and fail) to complete earlier... */
- rc = ep_complete_rpc(krx->krx_rxd,
- kqswnal_rpc_complete, krx,
- &kqswnal_rpc_failed, NULL, 0);
- if (rc == EP_SUCCESS)
- return;
-
- CERROR("can't complete RPC: %d\n", rc);
- }
-
- /* NB the old ep_complete_rpc() frees rxd on failure, so we
- * have to requeue from scratch here, unless we're shutting
- * down */
- if (kqswnal_data.kqn_shuttingdown)
+ rc = ep_complete_rpc(krx->krx_rxd,
+ kqswnal_rpc_complete, krx,
+ sblk, NULL, 0);
+ if (rc == EP_SUCCESS)
return;
- rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer,
- krx->krx_npages * PAGE_SIZE, 0);
- LASSERT (rc == EP_SUCCESS);
- /* We don't handle failure here; it's incredibly rare
- * (never reported?) and only happens with "old" EKC */
- return;
+ /* "old" EKC destroys rxd on failed completion */
+ krx->krx_rxd = NULL;
#endif
+ CERROR("can't complete RPC: %d\n", rc);
+ krx->krx_rpc_reply_needed = 0;
}
-#if MULTIRAIL_EKC
- if (kqswnal_data.kqn_shuttingdown) {
- /* free EKC rxd on shutdown */
- ep_complete_receive(krx->krx_rxd);
- } else {
- /* repost receive */
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- &krx->krx_elanbuffer, 0);
- }
-#else
- /* don't actually requeue on shutdown */
- if (!kqswnal_data.kqn_shuttingdown)
- ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
- krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
-#endif
+ kqswnal_requeue_rx(krx);
}
void
-kqswnal_rx (kqswnal_rx_t *krx)
+kqswnal_parse (kqswnal_rx_t *krx)
{
ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
int nob;
int niov;
- LASSERT (atomic_read(&krx->krx_refcount) == 0);
+ LASSERT (atomic_read(&krx->krx_refcount) == 1);
+
+ if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
+ /* I ignore parse errors since I'm not consuming a byte
+ * stream */
+ (void)lib_parse (&kqswnal_lib, hdr, krx);
- if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
- atomic_set(&krx->krx_refcount, 1);
- lib_parse (&kqswnal_lib, hdr, krx);
- kqswnal_rx_done(krx);
+ /* Drop my ref; any RDMA activity takes an additional ref */
+ kqswnal_rx_decref(krx);
return;
}
#if KQSW_CHECKSUM
- CERROR ("checksums for forwarded packets not implemented\n");
- LBUG ();
+ LASSERTF (0, "checksums for forwarded packets not implemented\n");
#endif
+
if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
{
CERROR("dropping packet from "LPX64" for "LPX64
": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref (krx);
return;
}
rxd, krx, nob, status);
LASSERT (krx != NULL);
-
+ LASSERT (krx->krx_state = KRX_POSTED);
+
+ krx->krx_state = KRX_PARSE;
krx->krx_rxd = rxd;
krx->krx_nob = nob;
#if MULTIRAIL_EKC
#else
krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
#endif
-
+ /* Default to failure if an RPC reply is requested but not handled */
+ krx->krx_rpc_reply_status = -EPROTO;
+ atomic_set (&krx->krx_refcount, 1);
+
/* must receive a whole header to be able to parse */
if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
{
CERROR("receive status failed with status %d nob %d\n",
ep_rxd_status(rxd), nob);
#endif
- kqswnal_requeue_rx (krx);
+ kqswnal_rx_decref(krx);
return;
}
if (!in_interrupt()) {
- kqswnal_rx (krx);
+ kqswnal_parse(krx);
return;
}
#endif
static ptl_err_t
-kqswnal_recvmsg (nal_cb_t *nal,
+kqswnal_recvmsg (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
{
kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
char *buffer = page_address(krx->krx_kiov[0].kiov_page);
+ ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
int page;
char *page_ptr;
int page_nob;
char *iov_ptr;
int iov_nob;
int frag;
+ int rc;
#if KQSW_CHECKSUM
kqsw_csum_t senders_csum;
kqsw_csum_t payload_csum = 0;
- kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
+ kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
size_t csum_len = mlen;
int csum_frags = 0;
int csum_nob = 0;
if (senders_csum != hdr_csum)
kqswnal_csum_error (krx, 1);
#endif
+ /* NB lib_parse() has already flipped *hdr */
+
CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
+ if (krx->krx_rpc_reply_needed &&
+ hdr->type == PTL_MSG_PUT) {
+ /* This must be an optimized PUT */
+ rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
+ niov, iov, kiov, offset, mlen);
+ return (rc == 0 ? PTL_OK : PTL_FAIL);
+ }
+
/* What was actually received must be >= payload. */
LASSERT (mlen <= rlen);
if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
}
static ptl_err_t
-kqswnal_recv(nal_cb_t *nal,
+kqswnal_recv(lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
}
static ptl_err_t
-kqswnal_recv_pages (nal_cb_t *nal,
+kqswnal_recv_pages (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
unsigned int niov,
spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
flags);
- kqswnal_rx (krx);
+ switch (krx->krx_state) {
+ case KRX_PARSE:
+ kqswnal_parse (krx);
+ break;
+ case KRX_COMPLETING:
+ /* Drop last ref to reply to RPC and requeue */
+ LASSERT (krx->krx_rpc_reply_needed);
+ kqswnal_rx_decref (krx);
+ break;
+ default:
+ LBUG();
+ }
did_something = 1;
spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
return (0);
}
-nal_cb_t kqswnal_lib =
+lib_nal_t kqswnal_lib =
{
- nal_data: &kqswnal_data, /* NAL private data */
- cb_send: kqswnal_send,
- cb_send_pages: kqswnal_send_pages,
- cb_recv: kqswnal_recv,
- cb_recv_pages: kqswnal_recv_pages,
- cb_read: kqswnal_read,
- cb_write: kqswnal_write,
- cb_malloc: kqswnal_malloc,
- cb_free: kqswnal_free,
- cb_printf: kqswnal_printf,
- cb_cli: kqswnal_cli,
- cb_sti: kqswnal_sti,
- cb_callback: kqswnal_callback,
- cb_dist: kqswnal_dist
+ libnal_data: &kqswnal_data, /* NAL private data */
+ libnal_send: kqswnal_send,
+ libnal_send_pages: kqswnal_send_pages,
+ libnal_recv: kqswnal_recv,
+ libnal_recv_pages: kqswnal_recv_pages,
+ libnal_dist: kqswnal_dist
};
#endif
int
-ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
- void *ret, size_t ret_len)
-{
- ksock_nal_data_t *k;
- nal_cb_t *nal_cb;
-
- k = nal->nal_data;
- nal_cb = k->ksnd_nal_cb;
-
- lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
- return PTL_OK;
-}
-
-void
-ksocknal_api_lock(nal_t *nal, unsigned long *flags)
-{
- ksock_nal_data_t *k;
- nal_cb_t *nal_cb;
-
- k = nal->nal_data;
- nal_cb = k->ksnd_nal_cb;
- nal_cb->cb_cli(nal_cb,flags);
-}
-
-void
-ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
-{
- ksock_nal_data_t *k;
- nal_cb_t *nal_cb;
-
- k = nal->nal_data;
- nal_cb = k->ksnd_nal_cb;
- nal_cb->cb_sti(nal_cb,flags);
-}
-
-int
-ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
-{
- /* NB called holding statelock */
- wait_queue_t wait;
- unsigned long now = jiffies;
-
- CDEBUG (D_NET, "yield\n");
-
- if (milliseconds == 0) {
- our_cond_resched();
- return 0;
- }
-
- init_waitqueue_entry(&wait, current);
- set_current_state (TASK_INTERRUPTIBLE);
- add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
-
- ksocknal_api_unlock(nal, flags);
-
- if (milliseconds < 0)
- schedule ();
- else
- schedule_timeout((milliseconds * HZ) / 1000);
-
- ksocknal_api_lock(nal, flags);
-
- remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
-
- if (milliseconds > 0) {
- milliseconds -= ((jiffies - now) * 1000) / HZ;
- if (milliseconds < 0)
- milliseconds = 0;
- }
-
- return (milliseconds);
-}
-
-int
ksocknal_set_mynid(ptl_nid_t nid)
{
- lib_ni_t *ni = &ksocknal_lib.ni;
+ lib_ni_t *ni = &ksocknal_lib.libnal_ni;
/* FIXME: we have to do this because we call lib_init() at module
* insertion time, which is before we have 'mynid' available. lib_init
* problem. */
CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
- nid, ni->nid);
+ nid, ni->ni_pid.nid);
- ni->nid = nid;
+ ni->ni_pid.nid = nid;
return (0);
}
/* flag threads to terminate; wake and wait for them to die */
ksocknal_data.ksnd_shuttingdown = 1;
+ mb();
wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
for (i = 0; i < SOCKNAL_N_SCHED; i++)
wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
+ i = 4;
while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
- CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
+ i++;
+ CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
+ "waiting for %d threads to terminate\n",
atomic_read (&ksocknal_data.ksnd_nthreads));
set_current_state (TASK_UNINTERRUPTIBLE);
schedule_timeout (HZ);
if (nal->nal_refct != 0) {
if (actual_limits != NULL)
- *actual_limits = ksocknal_lib.ni.actual_limits;
+ *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
/* This module got the first ref */
PORTAL_MODULE_USE;
return (PTL_OK);
rwlock_init(&ksocknal_data.ksnd_global_lock);
- ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
- spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
- init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
-
spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
if (ksocknal_data.ksnd_schedulers == NULL) {
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (-ENOMEM);
}
process_id.pid = 0;
process_id.nid = 0;
- rc = lib_init(&ksocknal_lib, process_id,
+ rc = lib_init(&ksocknal_lib, nal, process_id,
requested_limits, actual_limits);
if (rc != PTL_OK) {
CERROR("lib_init failed: error %d\n", rc);
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (rc);
}
if (rc != 0) {
CERROR("Can't spawn socknal scheduler[%d]: %d\n",
i, rc);
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (rc);
}
}
rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
if (rc != 0) {
CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (rc);
}
}
rc = ksocknal_thread_start (ksocknal_reaper, NULL);
if (rc != 0) {
CERROR ("Can't spawn socknal reaper: %d\n", rc);
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (rc);
}
PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
fmb_kiov[pool->fmp_buff_pages]));
if (fmb == NULL) {
- ksocknal_api_shutdown(&ksocknal_api);
+ ksocknal_api_shutdown(nal);
return (-ENOMEM);
}
fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
if (fmb->fmb_kiov[j].kiov_page == NULL) {
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (-ENOMEM);
}
rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
if (rc != 0) {
CERROR ("Can't initialise command interface (rc = %d)\n", rc);
- ksocknal_api_shutdown (&ksocknal_api);
+ ksocknal_api_shutdown (nal);
return (rc);
}
/* check ksnr_connected/connecting field large enough */
LASSERT(SOCKNAL_CONN_NTYPES <= 4);
- ksocknal_api.startup = ksocknal_api_startup;
- ksocknal_api.forward = ksocknal_api_forward;
- ksocknal_api.shutdown = ksocknal_api_shutdown;
- ksocknal_api.lock = ksocknal_api_lock;
- ksocknal_api.unlock = ksocknal_api_unlock;
- ksocknal_api.nal_data = &ksocknal_data;
-
- ksocknal_lib.nal_data = &ksocknal_data;
+ ksocknal_api.nal_ni_init = ksocknal_api_startup;
+ ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
/* Initialise dynamic tunables to defaults once only */
ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
struct list_head *ksnd_peers; /* hash table of all my known peers */
int ksnd_peer_hash_size; /* size of ksnd_peers */
- nal_cb_t *ksnd_nal_cb;
- spinlock_t ksnd_nal_cb_lock; /* lib cli/sti lock */
- wait_queue_head_t ksnd_yield_waitq; /* where yield waits */
-
atomic_t ksnd_nthreads; /* # live threads */
int ksnd_shuttingdown; /* tell threads to exit */
ksock_sched_t *ksnd_schedulers; /* scheduler state */
} ksock_peer_t;
-extern nal_cb_t ksocknal_lib;
+extern lib_nal_t ksocknal_lib;
extern ksock_nal_data_t ksocknal_data;
extern ksock_tunables_t ksocknal_tunables;
* LIB functions follow
*
*/
-ptl_err_t
-ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
- user_ptr src_addr, size_t len)
-{
- CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
- nal->ni.nid, (long)len, src_addr, dst_addr);
-
- memcpy( dst_addr, src_addr, len );
- return PTL_OK;
-}
-
-ptl_err_t
-ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
- void *src_addr, size_t len)
-{
- CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
- nal->ni.nid, (long)len, src_addr, dst_addr);
-
- memcpy( dst_addr, src_addr, len );
- return PTL_OK;
-}
-
-void *
-ksocknal_malloc(nal_cb_t *nal, size_t len)
-{
- void *buf;
-
- PORTAL_ALLOC(buf, len);
-
- if (buf != NULL)
- memset(buf, 0, len);
-
- return (buf);
-}
-
-void
-ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
-{
- PORTAL_FREE(buf, len);
-}
-
-void
-ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
-{
- va_list ap;
- char msg[256];
-
- va_start (ap, fmt);
- vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
- va_end (ap);
-
- msg[sizeof (msg) - 1] = 0; /* ensure terminated */
-
- CDEBUG (D_NET, "%s", msg);
-}
-
-void
-ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
-{
- ksock_nal_data_t *data = nal->nal_data;
-
- /* OK to ignore 'flags'; we're only ever serialise threads and
- * never need to lock out interrupts */
- spin_lock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
-{
- ksock_nal_data_t *data;
- data = nal->nal_data;
-
- /* OK to ignore 'flags'; we're only ever serialise threads and
- * never need to lock out interrupts */
- spin_unlock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
- /* holding ksnd_nal_cb_lock */
-
- if (eq->event_callback != NULL)
- eq->event_callback(ev);
-
- if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
- wake_up_all(&ksocknal_data.ksnd_yield_waitq);
-}
-
int
-ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
{
/* I would guess that if ksocknal_get_peer (nid) == NULL,
and we're not routing, then 'nid' is very distant :) */
- if ( nal->ni.nid == nid ) {
+ if (nal->libnal_ni.ni_pid.nid == nid) {
*dist = 0;
} else {
*dist = 1;
{
struct list_head *tmp;
ksock_route_t *route;
- ksock_route_t *candidate = NULL;
- int found = 0;
+ ksock_route_t *first_lazy = NULL;
+ int found_connecting_or_connected = 0;
int bits;
list_for_each (tmp, &peer->ksnp_routes) {
/* All typed connections have been established, or
* an untyped connection has been established, or
* connections are currently being established */
- found = 1;
+ found_connecting_or_connected = 1;
continue;
}
if (!time_after_eq (jiffies, route->ksnr_timeout))
continue;
- /* always do eager routes */
+ /* eager routes always want to be connected */
if (route->ksnr_eager)
return (route);
- if (candidate == NULL) {
- /* If we don't find any other route that is fully
- * connected or connecting, the first connectable
- * route is returned. If it fails to connect, it
- * will get placed at the end of the list */
- candidate = route;
- }
+ if (first_lazy == NULL)
+ first_lazy = route;
}
-
- return (found ? NULL : candidate);
+
+ /* No eager routes need to be connected. If some connection has
+ * already been established, or is being established there's nothing to
+ * do. Otherwise we return the first lazy route we found. If it fails
+ * to connect, it will go to the end of the list. */
+
+ if (!list_empty (&peer->ksnp_conns) ||
+ found_connecting_or_connected)
+ return (NULL);
+
+ return (first_lazy);
}
ksock_route_t *
}
ptl_err_t
-ksocknal_sendmsg(nal_cb_t *nal,
+ksocknal_sendmsg(lib_nal_t *nal,
void *private,
lib_msg_t *cookie,
ptl_hdr_t *hdr,
}
ptl_err_t
-ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
+ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
unsigned int payload_niov, struct iovec *payload_iov,
size_t payload_offset, size_t payload_len)
}
ptl_err_t
-ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
+ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
unsigned int payload_niov, ptl_kiov_t *payload_kiov,
size_t payload_offset, size_t payload_len)
fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
/* I'm the gateway; must be the last hop */
- if (nid == ksocknal_lib.ni.nid)
+ if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
nid = fwd->kprfd_target_nid;
/* setup iov for hdr */
switch (conn->ksnc_rx_state) {
case SOCKNAL_RX_HEADER:
if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
- NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
+ NTOH__u64(conn->ksnc_hdr.dest_nid) !=
+ ksocknal_lib.libnal_ni.ni_pid.nid) {
/* This packet isn't for me */
ksocknal_fwd_parse (conn);
switch (conn->ksnc_rx_state) {
}
/* sets wanted_len, iovs etc */
- lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+ rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+
+ if (rc != PTL_OK) {
+ /* I just received garbage: give up on this conn */
+ ksocknal_close_conn_and_siblings (conn, rc);
+ return (-EPROTO);
+ }
if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
conn->ksnc_rx_state = SOCKNAL_RX_BODY;
}
ptl_err_t
-ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
unsigned int niov, struct iovec *iov,
size_t offset, size_t mlen, size_t rlen)
{
}
ptl_err_t
-ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
unsigned int niov, ptl_kiov_t *kiov,
size_t offset, size_t mlen, size_t rlen)
{
hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
- hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
+ hdr.src_nid = __cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
hdr.msg.hello.type = __cpu_to_le32 (*type);
return (0);
}
-nal_cb_t ksocknal_lib = {
- nal_data: &ksocknal_data, /* NAL private data */
- cb_send: ksocknal_send,
- cb_send_pages: ksocknal_send_pages,
- cb_recv: ksocknal_recv,
- cb_recv_pages: ksocknal_recv_pages,
- cb_read: ksocknal_read,
- cb_write: ksocknal_write,
- cb_malloc: ksocknal_malloc,
- cb_free: ksocknal_free,
- cb_printf: ksocknal_printf,
- cb_cli: ksocknal_cli,
- cb_sti: ksocknal_sti,
- cb_callback: ksocknal_callback,
- cb_dist: ksocknal_dist
+lib_nal_t ksocknal_lib = {
+ libnal_data: &ksocknal_data, /* NAL private data */
+ libnal_send: ksocknal_send,
+ libnal_send_pages: ksocknal_send_pages,
+ libnal_recv: ksocknal_recv,
+ libnal_recv_pages: ksocknal_recv_pages,
+ libnal_dist: ksocknal_dist
};
#define PORTAL_MINOR 240
struct nal_cmd_handler {
+ int nch_number;
nal_cmd_handler_fn *nch_handler;
void *nch_private;
};
-static struct nal_cmd_handler nal_cmd[NAL_MAX_NR + 1];
+static struct nal_cmd_handler nal_cmd[16];
static DECLARE_MUTEX(nal_cmd_sem);
#ifdef PORTAL_DEBUG
PORTAL_FREE(data, len);
}
+struct nal_cmd_handler *
+libcfs_find_nal_cmd_handler(int nal)
+{
+ int i;
+
+ for (i = 0; i < sizeof(nal_cmd)/sizeof(nal_cmd[0]); i++)
+ if (nal_cmd[i].nch_handler != NULL &&
+ nal_cmd[i].nch_number == nal)
+ return (&nal_cmd[i]);
+
+ return (NULL);
+}
+
int
libcfs_nal_cmd_register(int nal, nal_cmd_handler_fn *handler, void *private)
{
- int rc = 0;
+ struct nal_cmd_handler *cmd;
+ int i;
+ int rc;
CDEBUG(D_IOCTL, "Register NAL %d, handler: %p\n", nal, handler);
- if (nal > 0 && nal <= NAL_MAX_NR) {
- down(&nal_cmd_sem);
- if (nal_cmd[nal].nch_handler != NULL)
- rc = -EBUSY;
- else {
- nal_cmd[nal].nch_handler = handler;
- nal_cmd[nal].nch_private = private;
+ down(&nal_cmd_sem);
+
+ if (libcfs_find_nal_cmd_handler(nal) != NULL) {
+ up (&nal_cmd_sem);
+ return (-EBUSY);
+ }
+
+ cmd = NULL;
+ for (i = 0; i < sizeof(nal_cmd)/sizeof(nal_cmd[0]); i++)
+ if (nal_cmd[i].nch_handler == NULL) {
+ cmd = &nal_cmd[i];
+ break;
}
- up(&nal_cmd_sem);
+
+ if (cmd == NULL) {
+ rc = -EBUSY;
+ } else {
+ rc = 0;
+ cmd->nch_number = nal;
+ cmd->nch_handler = handler;
+ cmd->nch_private = private;
}
+
+ up(&nal_cmd_sem);
+
return rc;
}
EXPORT_SYMBOL(libcfs_nal_cmd_register);
void
libcfs_nal_cmd_unregister(int nal)
{
- CDEBUG(D_IOCTL, "Unregister NAL %d\n", nal);
+ struct nal_cmd_handler *cmd;
- LASSERT(nal > 0 && nal <= NAL_MAX_NR);
- LASSERT(nal_cmd[nal].nch_handler != NULL);
+ CDEBUG(D_IOCTL, "Unregister NAL %d\n", nal);
down(&nal_cmd_sem);
- nal_cmd[nal].nch_handler = NULL;
- nal_cmd[nal].nch_private = NULL;
+ cmd = libcfs_find_nal_cmd_handler(nal);
+ LASSERT (cmd != NULL);
+ cmd->nch_handler = NULL;
+ cmd->nch_private = NULL;
up(&nal_cmd_sem);
}
EXPORT_SYMBOL(libcfs_nal_cmd_unregister);
int
libcfs_nal_cmd(struct portals_cfg *pcfg)
{
+ struct nal_cmd_handler *cmd;
__u32 nal = pcfg->pcfg_nal;
int rc = -EINVAL;
ENTRY;
down(&nal_cmd_sem);
- if (nal > 0 && nal <= NAL_MAX_NR &&
- nal_cmd[nal].nch_handler != NULL) {
+ cmd = libcfs_find_nal_cmd_handler(nal);
+ if (cmd != NULL) {
CDEBUG(D_IOCTL, "calling handler nal: %d, cmd: %d\n", nal,
pcfg->pcfg_command);
- rc = nal_cmd[nal].nch_handler(pcfg, nal_cmd[nal].nch_private);
+ rc = cmd->nch_handler(pcfg, cmd->nch_private);
}
up(&nal_cmd_sem);
MODULES := portals
-portals-objs := api-eq.o api-init.o api-me.o api-errno.o api-ni.o api-wrap.o
-portals-objs += lib-dispatch.o lib-init.o lib-me.o lib-msg.o lib-eq.o lib-md.o
+portals-objs := api-errno.o api-ni.o api-wrap.o
+portals-objs += lib-init.o lib-me.o lib-msg.o lib-eq.o lib-md.o
portals-objs += lib-move.o lib-ni.o lib-pid.o module.o
@INCLUDE_RULES@
include $(src)/../Kernelenv
obj-y += portals.o
-portals-objs := lib-dispatch.o lib-eq.o lib-init.o lib-md.o lib-me.o \
+portals-objs := lib-eq.o lib-init.o lib-md.o lib-me.o \
lib-move.o lib-msg.o lib-ni.o lib-pid.o \
- api-eq.o api-errno.o api-init.o api-me.o api-ni.o \
- api-wrap.o module.o
+ api-errno.o api-ni.o api-wrap.o \
+ module.o
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-eq.c
- * User-level event queue management routines
- *
- * Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
- *
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
-int ptl_get_event (ptl_eq_t *eq, ptl_event_t *ev)
-{
- int new_index = eq->sequence & (eq->size - 1);
- ptl_event_t *new_event = &eq->base[new_index];
- ENTRY;
-
- CDEBUG(D_INFO, "new_event: %p, sequence: %lu, eq->size: %u\n",
- new_event, eq->sequence, eq->size);
-
- if (PTL_SEQ_GT (eq->sequence, new_event->sequence)) {
- RETURN(PTL_EQ_EMPTY);
- }
-
- *ev = *new_event;
-
- /* ensure event is delivered correctly despite possible
- races with lib_finalize */
- if (eq->sequence != new_event->sequence) {
- CERROR("DROPPING EVENT: eq seq %lu ev seq %lu\n",
- eq->sequence, new_event->sequence);
- RETURN(PTL_EQ_DROPPED);
- }
-
- eq->sequence = new_event->sequence + 1;
- RETURN(PTL_OK);
-}
-
-int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev)
-{
- int which;
-
- return (PtlEQPoll (&eventq, 1, 0, ev, &which));
-}
-
-int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
-{
- int which;
-
- return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER,
- event_out, &which));
-}
-
-int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
- ptl_event_t *event_out, int *which_out)
-{
- nal_t *nal;
- int i;
- int rc;
- unsigned long flags;
-
- if (!ptl_init)
- RETURN(PTL_NO_INIT);
-
- if (neq_in < 1)
- RETURN(PTL_EQ_INVALID);
-
- nal = ptl_hndl2nal(&eventqs_in[0]);
- if (nal == NULL)
- RETURN(PTL_EQ_INVALID);
-
- nal->lock(nal, &flags);
-
- for (;;) {
- for (i = 0; i < neq_in; i++) {
- ptl_eq_t *eq = ptl_handle2usereq(&eventqs_in[i]);
-
- if (i > 0 &&
- ptl_hndl2nal(&eventqs_in[i]) != nal) {
- nal->unlock(nal, &flags);
- RETURN (PTL_EQ_INVALID);
- }
-
- /* size must be a power of 2 to handle a wrapped sequence # */
- LASSERT (eq->size != 0 &&
- eq->size == LOWEST_BIT_SET (eq->size));
-
- rc = ptl_get_event (eq, event_out);
- if (rc != PTL_EQ_EMPTY) {
- nal->unlock(nal, &flags);
- *which_out = i;
- RETURN(rc);
- }
- }
-
- if (timeout == 0) {
- nal->unlock(nal, &flags);
- RETURN (PTL_EQ_EMPTY);
- }
-
- timeout = nal->yield(nal, &flags, timeout);
- }
-}
"PTL_EQ_IN_USE",
+ "PTL_NI_INVALID",
+ "PTL_MD_ILLEGAL",
+
"PTL_MAX_ERRNO"
};
/* If you change these, you must update the number table in portals/errno.h */
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-init.c
- * Initialization and global data for the p30 user side library
- *
- * Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
- *
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
-int PtlInit(int *max_interfaces)
-{
- if (max_interfaces != NULL)
- *max_interfaces = NAL_MAX_NR;
-
- LASSERT(!strcmp(ptl_err_str[PTL_MAX_ERRNO], "PTL_MAX_ERRNO"));
-
- return ptl_ni_init();
-}
-
-
-void PtlFini(void)
-{
- ptl_ni_fini();
-}
-
-
-void PtlSnprintHandle(char *str, int len, ptl_handle_any_t h)
-{
- snprintf(str, len, "0x%lx."LPX64, h.nal_idx, h.cookie);
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * api/api-me.c
- * Match Entry local operations.
- *
- * Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
- *
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/api-support.h>
-
* invalidated out from under her (or worse, swapped for a
* completely different interface!) */
+ LASSERT (ptl_init);
+
if (((idx ^ NI_HANDLE_MAGIC) & ~NI_HANDLE_MASK) != 0)
return NULL;
ptl_mutex_exit();
}
-int ptl_ni_init(void)
+int PtlInit(int *max_interfaces)
{
+ LASSERT(!strcmp(ptl_err_str[PTL_MAX_ERRNO], "PTL_MAX_ERRNO"));
+
/* If this assertion fails, we need more bits in NI_HANDLE_MASK and
* to shift NI_HANDLE_MAGIC left appropriately */
LASSERT (NAL_MAX_NR <= (NI_HANDLE_MASK + 1));
+ if (max_interfaces != NULL)
+ *max_interfaces = NAL_MAX_NR;
+
ptl_mutex_enter();
if (!ptl_init) {
return PTL_OK;
}
-void ptl_ni_fini(void)
+void PtlFini(void)
{
nal_t *nal;
int i;
if (nal->nal_refct != 0) {
CWARN("NAL %d has outstanding refcount %d\n",
i, nal->nal_refct);
- nal->shutdown(nal);
+ nal->nal_ni_fini(nal);
}
ptl_nal_table[i] = NULL;
}
nal = ptl_nal_table[interface];
-
+ nal->nal_handle.nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | interface;
+ nal->nal_handle.cookie = 0;
+
CDEBUG(D_OTHER, "Starting up NAL (%d) refs %d\n", interface, nal->nal_refct);
- rc = nal->startup(nal, requested_pid, desired_limits, actual_limits);
+ rc = nal->nal_ni_init(nal, requested_pid, desired_limits, actual_limits);
if (rc != PTL_OK) {
CERROR("Error %d starting up NAL %d, refs %d\n", rc,
}
nal->nal_refct++;
- handle->nal_idx = (NI_HANDLE_MAGIC & ~NI_HANDLE_MASK) | interface;
+ *handle = nal->nal_handle;
out:
ptl_mutex_exit ();
+
return rc;
}
nal->nal_refct--;
/* nal_refct == 0 tells nal->shutdown to really shut down */
- nal->shutdown(nal);
+ nal->nal_ni_fini(nal);
ptl_mutex_exit ();
return PTL_OK;
}
-
-int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t * ni_out)
-{
- *ni_out = handle_in;
-
- return PTL_OK;
-}
# define DEBUG_SUBSYSTEM S_PORTALS
#include <portals/api-support.h>
-static int do_forward(ptl_handle_any_t any_h, int cmd, void *argbuf,
- int argsize, void *retbuf, int retsize)
+void PtlSnprintHandle(char *str, int len, ptl_handle_any_t h)
{
- nal_t *nal;
+ snprintf(str, len, "0x%lx."LPX64, h.nal_idx, h.cookie);
+}
- if (!ptl_init) {
- CERROR("Not initialized\n");
+int PtlNIHandle(ptl_handle_any_t handle_in, ptl_handle_ni_t *ni_out)
+{
+ if (!ptl_init)
return PTL_NO_INIT;
- }
-
- nal = ptl_hndl2nal(&any_h);
- if (!nal)
+
+ if (ptl_hndl2nal(&handle_in) == NULL)
return PTL_HANDLE_INVALID;
-
- nal->forward(nal, cmd, argbuf, argsize, retbuf, retsize);
-
+
+ *ni_out = handle_in;
return PTL_OK;
}
int PtlGetId(ptl_handle_ni_t ni_handle, ptl_process_id_t *id)
{
- PtlGetId_in args;
- PtlGetId_out ret;
- int rc;
-
- args.handle_in = ni_handle;
+ nal_t *nal;
- rc = do_forward(ni_handle, PTL_GETID, &args, sizeof(args), &ret,
- sizeof(ret));
- if (rc != PTL_OK)
- return rc;
+ if (!ptl_init)
+ return PTL_NO_INIT;
- if (id)
- *id = ret.id_out;
+ nal = ptl_hndl2nal(&ni_handle);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- return ret.rc;
+ return nal->nal_get_id(nal, id);
}
int PtlFailNid (ptl_handle_ni_t interface, ptl_nid_t nid, unsigned int threshold)
{
- PtlFailNid_in args;
- PtlFailNid_out ret;
- int rc;
-
- args.interface = interface;
- args.nid = nid;
- args.threshold = threshold;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
- rc = do_forward (interface, PTL_FAILNID,
- &args, sizeof(args), &ret, sizeof (ret));
+ nal = ptl_hndl2nal(&interface);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- return ((rc != PTL_OK) ? rc : ret.rc);
+ return nal->nal_fail_nid(nal, nid, threshold);
}
int PtlNIStatus(ptl_handle_ni_t interface_in, ptl_sr_index_t register_in,
- ptl_sr_value_t * status_out)
+ ptl_sr_value_t *status_out)
{
- PtlNIStatus_in args;
- PtlNIStatus_out ret;
- int rc;
+ nal_t *nal;
- args.interface_in = interface_in;
- args.register_in = register_in;
-
- rc = do_forward(interface_in, PTL_NISTATUS, &args, sizeof(args), &ret,
- sizeof(ret));
-
- if (rc != PTL_OK)
- return rc;
-
- if (status_out)
- *status_out = ret.status_out;
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&interface_in);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- return ret.rc;
+ return nal->nal_ni_status(nal, register_in, status_out);
}
int PtlNIDist(ptl_handle_ni_t interface_in, ptl_process_id_t process_in,
unsigned long *distance_out)
{
- PtlNIDist_in args;
- PtlNIDist_out ret;
- int rc;
-
- args.interface_in = interface_in;
- args.process_in = process_in;
-
- rc = do_forward(interface_in, PTL_NIDIST, &args, sizeof(args), &ret,
- sizeof(ret));
+ nal_t *nal;
- if (rc != PTL_OK)
- return rc;
-
- if (distance_out)
- *distance_out = ret.distance_out;
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&interface_in);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- return ret.rc;
+ return nal->nal_ni_dist(nal, &process_in, distance_out);
}
int PtlMEAttach(ptl_handle_ni_t interface_in, ptl_pt_index_t index_in,
ptl_process_id_t match_id_in, ptl_match_bits_t match_bits_in,
ptl_match_bits_t ignore_bits_in, ptl_unlink_t unlink_in,
- ptl_ins_pos_t pos_in, ptl_handle_me_t * handle_out)
+ ptl_ins_pos_t pos_in, ptl_handle_me_t *handle_out)
{
- PtlMEAttach_in args;
- PtlMEAttach_out ret;
- int rc;
-
- args.interface_in = interface_in;
- args.index_in = index_in;
- args.match_id_in = match_id_in;
- args.match_bits_in = match_bits_in;
- args.ignore_bits_in = ignore_bits_in;
- args.unlink_in = unlink_in;
- args.position_in = pos_in;
-
- rc = do_forward(interface_in, PTL_MEATTACH, &args, sizeof(args), &ret,
- sizeof(ret));
-
- if (rc != PTL_OK)
- return rc;
-
- if (handle_out) {
- handle_out->nal_idx = interface_in.nal_idx;
- handle_out->cookie = ret.handle_out.cookie;
- }
-
- return ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&interface_in);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
+
+ return nal->nal_me_attach(nal, index_in, match_id_in,
+ match_bits_in, ignore_bits_in,
+ unlink_in, pos_in, handle_out);
}
int PtlMEInsert(ptl_handle_me_t current_in, ptl_process_id_t match_id_in,
ptl_unlink_t unlink_in, ptl_ins_pos_t position_in,
ptl_handle_me_t * handle_out)
{
- PtlMEInsert_in args;
- PtlMEInsert_out ret;
- int rc;
-
- args.current_in = current_in;
- args.match_id_in = match_id_in;
- args.match_bits_in = match_bits_in;
- args.ignore_bits_in = ignore_bits_in;
- args.unlink_in = unlink_in;
- args.position_in = position_in;
-
- rc = do_forward(current_in, PTL_MEINSERT, &args, sizeof(args), &ret,
- sizeof(ret));
-
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
-
- if (handle_out) {
- handle_out->nal_idx = current_in.nal_idx;
- handle_out->cookie = ret.handle_out.cookie;
- }
- return ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(¤t_in);
+ if (nal == NULL)
+ return PTL_ME_INVALID;
+
+ return nal->nal_me_insert(nal, ¤t_in, match_id_in,
+ match_bits_in, ignore_bits_in,
+ unlink_in, position_in, handle_out);
}
int PtlMEUnlink(ptl_handle_me_t current_in)
{
- PtlMEUnlink_in args;
- PtlMEUnlink_out ret;
- int rc;
+ nal_t *nal;
- args.current_in = current_in;
- args.unlink_in = PTL_RETAIN;
-
- rc = do_forward(current_in, PTL_MEUNLINK, &args, sizeof(args), &ret,
- sizeof(ret));
-
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(¤t_in);
+ if (nal == NULL)
+ return PTL_ME_INVALID;
- return ret.rc;
+ return nal->nal_me_unlink(nal, ¤t_in);
}
-int PtlTblDump(ptl_handle_ni_t ni, int index_in)
+int PtlMDAttach(ptl_handle_me_t me_in, ptl_md_t md_in,
+ ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
{
- PtlTblDump_in args;
- PtlTblDump_out ret;
- int rc;
+ nal_t *nal;
- args.index_in = index_in;
-
- rc = do_forward(ni, PTL_TBLDUMP, &args, sizeof(args), &ret,
- sizeof(ret));
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&me_in);
+ if (nal == NULL)
+ return PTL_ME_INVALID;
- if (rc != PTL_OK)
- return rc;
+ if (!PtlHandleIsEqual(md_in.eventq, PTL_EQ_NONE) &&
+ ptl_hndl2nal(&md_in.eventq) != nal)
+ return PTL_MD_ILLEGAL;
- return ret.rc;
+ return (nal->nal_md_attach)(nal, &me_in, &md_in,
+ unlink_in, handle_out);
}
-int PtlMEDump(ptl_handle_me_t current_in)
+int PtlMDBind(ptl_handle_ni_t ni_in, ptl_md_t md_in,
+ ptl_unlink_t unlink_in, ptl_handle_md_t *handle_out)
{
- PtlMEDump_in args;
- PtlMEDump_out ret;
- int rc;
+ nal_t *nal;
- args.current_in = current_in;
-
- rc = do_forward(current_in, PTL_MEDUMP, &args, sizeof(args), &ret,
- sizeof(ret));
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&ni_in);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
+ if (!PtlHandleIsEqual(md_in.eventq, PTL_EQ_NONE) &&
+ ptl_hndl2nal(&md_in.eventq) != nal)
+ return PTL_MD_ILLEGAL;
- return ret.rc;
+ return (nal->nal_md_bind)(nal, &md_in, unlink_in, handle_out);
}
-static ptl_handle_eq_t md2eq (ptl_md_t *md)
+int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t *old_inout,
+ ptl_md_t *new_inout, ptl_handle_eq_t testq_in)
{
- if (PtlHandleIsEqual (md->eventq, PTL_EQ_NONE))
- return (PTL_EQ_NONE);
+ nal_t *nal;
- return (ptl_handle2usereq (&md->eventq)->cb_eq_handle);
-}
-
-
-int PtlMDAttach(ptl_handle_me_t me_in, ptl_md_t md_in,
- ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
-{
- PtlMDAttach_in args;
- PtlMDAttach_out ret;
- int rc;
-
- args.eq_in = md2eq(&md_in);
- args.me_in = me_in;
- args.md_in = md_in;
- args.unlink_in = unlink_in;
-
- rc = do_forward(me_in, PTL_MDATTACH,
- &args, sizeof(args), &ret, sizeof(ret));
-
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_ME_INVALID : rc;
-
- if (handle_out) {
- handle_out->nal_idx = me_in.nal_idx;
- handle_out->cookie = ret.handle_out.cookie;
- }
- return ret.rc;
-}
-
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&md_in);
+ if (nal == NULL)
+ return PTL_MD_INVALID;
+ if (!PtlHandleIsEqual(testq_in, PTL_EQ_NONE) &&
+ ptl_hndl2nal(&testq_in) != nal)
+ return PTL_EQ_INVALID;
-int PtlMDBind(ptl_handle_ni_t ni_in, ptl_md_t md_in,
- ptl_unlink_t unlink_in, ptl_handle_md_t * handle_out)
-{
- PtlMDBind_in args;
- PtlMDBind_out ret;
- int rc;
-
- args.eq_in = md2eq(&md_in);
- args.ni_in = ni_in;
- args.md_in = md_in;
- args.unlink_in = unlink_in;
-
- rc = do_forward(ni_in, PTL_MDBIND,
- &args, sizeof(args), &ret, sizeof(ret));
-
- if (rc != PTL_OK)
- return rc;
-
- if (handle_out) {
- handle_out->nal_idx = ni_in.nal_idx;
- handle_out->cookie = ret.handle_out.cookie;
- }
- return ret.rc;
+ return (nal->nal_md_update)(nal, &md_in,
+ old_inout, new_inout, &testq_in);
}
-int PtlMDUpdate(ptl_handle_md_t md_in, ptl_md_t *old_inout,
- ptl_md_t *new_inout, ptl_handle_eq_t testq_in)
+int PtlMDUnlink(ptl_handle_md_t md_in)
{
- PtlMDUpdate_internal_in args;
- PtlMDUpdate_internal_out ret;
- int rc;
-
- args.md_in = md_in;
-
- if (old_inout) {
- args.old_inout = *old_inout;
- args.old_inout_valid = 1;
- } else
- args.old_inout_valid = 0;
-
- if (new_inout) {
- args.new_inout = *new_inout;
- args.new_inout_valid = 1;
- } else
- args.new_inout_valid = 0;
-
- if (PtlHandleIsEqual (testq_in, PTL_EQ_NONE)) {
- args.testq_in = PTL_EQ_NONE;
- args.sequence_in = -1;
- } else {
- ptl_eq_t *eq = ptl_handle2usereq (&testq_in);
-
- args.testq_in = eq->cb_eq_handle;
- args.sequence_in = eq->sequence;
- }
-
- rc = do_forward(md_in, PTL_MDUPDATE, &args, sizeof(args), &ret,
- sizeof(ret));
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_MD_INVALID : rc;
-
- if (old_inout)
- *old_inout = ret.old_inout;
-
- return ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&md_in);
+ if (nal == NULL)
+ return PTL_MD_INVALID;
+
+ return (nal->nal_md_unlink)(nal, &md_in);
}
-int PtlMDUnlink(ptl_handle_md_t md_in)
+int PtlEQAlloc(ptl_handle_ni_t interface, ptl_size_t count,
+ ptl_eq_handler_t callback,
+ ptl_handle_eq_t *handle_out)
{
- PtlMDUnlink_in args;
- PtlMDUnlink_out ret;
- int rc;
-
- args.md_in = md_in;
- rc = do_forward(md_in, PTL_MDUNLINK, &args, sizeof(args), &ret,
- sizeof(ret));
- if (rc != PTL_OK)
- return (rc == PTL_HANDLE_INVALID) ? PTL_MD_INVALID : rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&interface);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
- return ret.rc;
+ return (nal->nal_eq_alloc)(nal, count, callback, handle_out);
}
-int PtlEQAlloc(ptl_handle_ni_t interface, ptl_size_t count,
- ptl_eq_handler_t callback,
- ptl_handle_eq_t * handle_out)
+int PtlEQFree(ptl_handle_eq_t eventq)
{
- ptl_eq_t *eq = NULL;
- ptl_event_t *ev = NULL;
- PtlEQAlloc_in args;
- PtlEQAlloc_out ret;
- int rc, i;
- nal_t *nal;
+ nal_t *nal;
if (!ptl_init)
return PTL_NO_INIT;
- nal = ptl_hndl2nal (&interface);
+ nal = ptl_hndl2nal(&eventq);
if (nal == NULL)
- return PTL_HANDLE_INVALID;
+ return PTL_EQ_INVALID;
- if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
- do { /* knock off all but the top bit... */
- count &= ~LOWEST_BIT_SET (count);
- } while (count != LOWEST_BIT_SET(count));
-
- count <<= 1; /* ...and round up */
- }
-
- if (count == 0) /* catch bad parameter / overflow on roundup */
- return (PTL_VAL_FAILED);
-
- PORTAL_ALLOC(ev, count * sizeof(ptl_event_t));
- if (!ev)
- return PTL_NO_SPACE;
-
- for (i = 0; i < count; i++)
- ev[i].sequence = 0;
-
- args.ni_in = interface;
- args.count_in = count;
- args.base_in = ev;
- args.len_in = count * sizeof(*ev);
- args.callback_in = callback;
-
- rc = do_forward(interface, PTL_EQALLOC, &args, sizeof(args), &ret,
- sizeof(ret));
- if (rc != PTL_OK)
- goto fail;
- if (ret.rc)
- GOTO(fail, rc = ret.rc);
-
- PORTAL_ALLOC(eq, sizeof(*eq));
- if (!eq) {
- rc = PTL_NO_SPACE;
- goto fail;
- }
-
- eq->sequence = 1;
- eq->size = count;
- eq->base = ev;
-
- /* EQ handles are a little wierd. PtlEQGet() just looks at the
- * queued events in shared memory. It doesn't want to do_forward()
- * at all, so the cookie in the EQ handle we pass out of here is
- * simply a pointer to the event queue we just set up. We stash
- * the handle returned by do_forward(), so we can pass it back via
- * do_forward() when we need to. */
-
- eq->cb_eq_handle.nal_idx = interface.nal_idx;
- eq->cb_eq_handle.cookie = ret.handle_out.cookie;
-
- handle_out->nal_idx = interface.nal_idx;
- handle_out->cookie = (__u64)((unsigned long)eq);
- return PTL_OK;
+ return (nal->nal_eq_free)(nal, &eventq);
+}
-fail:
- PORTAL_FREE(ev, count * sizeof(ptl_event_t));
- return rc;
+int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t *ev)
+{
+ int which;
+
+ return (PtlEQPoll (&eventq, 1, 0, ev, &which));
}
-int PtlEQFree(ptl_handle_eq_t eventq)
+int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
{
- PtlEQFree_in args;
- PtlEQFree_out ret;
- ptl_eq_t *eq;
- int rc;
+ int which;
+
+ return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER,
+ event_out, &which));
+}
- eq = ptl_handle2usereq (&eventq);
- args.eventq_in = eq->cb_eq_handle;
+int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
+ ptl_event_t *event_out, int *which_out)
+{
+ int i;
+ nal_t *nal;
- rc = do_forward(eq->cb_eq_handle, PTL_EQFREE, &args,
- sizeof(args), &ret, sizeof(ret));
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ if (neq_in < 1)
+ return PTL_EQ_INVALID;
+
+ nal = ptl_hndl2nal(&eventqs_in[0]);
+ if (nal == NULL)
+ return PTL_EQ_INVALID;
- /* XXX we're betting rc == PTL_OK here */
- PORTAL_FREE(eq->base, eq->size * sizeof(ptl_event_t));
- PORTAL_FREE(eq, sizeof(*eq));
+ for (i = 1; i < neq_in; i++)
+ if (ptl_hndl2nal(&eventqs_in[i]) != nal)
+ return PTL_EQ_INVALID;
- return rc;
+ return (nal->nal_eq_poll)(nal, eventqs_in, neq_in, timeout,
+ event_out, which_out);
}
+
int PtlACEntry(ptl_handle_ni_t ni_in, ptl_ac_index_t index_in,
ptl_process_id_t match_id_in, ptl_pt_index_t portal_in)
{
- PtlACEntry_in args;
- PtlACEntry_out ret;
- int rc;
-
- /*
- * Copy arguments into the argument block to
- * hand to the forwarding object
- */
- args.ni_in = ni_in;
- args.index_in = index_in;
- args.match_id_in = match_id_in;
- args.portal_in = portal_in;
-
- rc = do_forward(ni_in, PTL_ACENTRY, &args, sizeof(args), &ret,
- sizeof(ret));
-
- return (rc != PTL_OK) ? rc : ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&ni_in);
+ if (nal == NULL)
+ return PTL_NI_INVALID;
+
+ return (nal->nal_ace_entry)(nal, index_in, match_id_in, portal_in);
}
int PtlPut(ptl_handle_md_t md_in, ptl_ack_req_t ack_req_in,
ptl_process_id_t target_in, ptl_pt_index_t portal_in,
- ptl_ac_index_t cookie_in, ptl_match_bits_t match_bits_in,
+ ptl_ac_index_t ac_in, ptl_match_bits_t match_bits_in,
ptl_size_t offset_in, ptl_hdr_data_t hdr_data_in)
{
- PtlPut_in args;
- PtlPut_out ret;
- int rc;
-
- /*
- * Copy arguments into the argument block to
- * hand to the forwarding object
- */
- args.md_in = md_in;
- args.ack_req_in = ack_req_in;
- args.target_in = target_in;
- args.portal_in = portal_in;
- args.cookie_in = cookie_in;
- args.match_bits_in = match_bits_in;
- args.offset_in = offset_in;
- args.hdr_data_in = hdr_data_in;
-
- rc = do_forward(md_in, PTL_PUT, &args, sizeof(args), &ret, sizeof(ret));
-
- return (rc != PTL_OK) ? rc : ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&md_in);
+ if (nal == NULL)
+ return PTL_MD_INVALID;
+
+ return (nal->nal_put)(nal, &md_in, ack_req_in,
+ &target_in, portal_in, ac_in,
+ match_bits_in, offset_in, hdr_data_in);
}
int PtlGet(ptl_handle_md_t md_in, ptl_process_id_t target_in,
- ptl_pt_index_t portal_in, ptl_ac_index_t cookie_in,
+ ptl_pt_index_t portal_in, ptl_ac_index_t ac_in,
ptl_match_bits_t match_bits_in, ptl_size_t offset_in)
{
- PtlGet_in args;
- PtlGet_out ret;
- int rc;
-
- /*
- * Copy arguments into the argument block to
- * hand to the forwarding object
- */
- args.md_in = md_in;
- args.target_in = target_in;
- args.portal_in = portal_in;
- args.cookie_in = cookie_in;
- args.match_bits_in = match_bits_in;
- args.offset_in = offset_in;
-
- rc = do_forward(md_in, PTL_GET, &args, sizeof(args), &ret, sizeof(ret));
-
- return (rc != PTL_OK) ? rc : ret.rc;
+ nal_t *nal;
+
+ if (!ptl_init)
+ return PTL_NO_INIT;
+
+ nal = ptl_hndl2nal(&md_in);
+ if (nal == NULL)
+ return PTL_MD_INVALID;
+
+ return (nal->nal_get)(nal, &md_in,
+ &target_in, portal_in, ac_in,
+ match_bits_in, offset_in);
}
+
# This code is issued under the GNU General Public License.
# See the file COPYING in this distribution
-my_sources = api-eq.c api-init.c api-me.c api-errno.c api-ni.c api-wrap.c \
- lib-dispatch.c lib-init.c lib-me.c lib-msg.c lib-eq.c \
+my_sources = api-errno.c api-ni.c api-wrap.c \
+ lib-init.c lib-me.c lib-msg.c lib-eq.c \
lib-md.c lib-move.c lib-ni.c lib-pid.c
if !CRAY_PORTALS
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * lib/lib-dispatch.c
- *
- * Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
- *
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/lib-p30.h>
-#include <portals/lib-dispatch.h>
-
-typedef struct {
- int (*fun) (nal_cb_t * nal, void *private, void *in, void *out);
- char *name;
-} dispatch_table_t;
-
-static dispatch_table_t dispatch_table[] = {
- [PTL_GETID] {do_PtlGetId, "PtlGetId"},
- [PTL_NISTATUS] {do_PtlNIStatus, "PtlNIStatus"},
- &