Whamcloud - gitweb
LU-56 lnet: move "match" functions to lib-ptl.c
authorLiang Zhen <liang@whamcloud.com>
Mon, 28 May 2012 06:40:00 +0000 (14:40 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 13:22:08 +0000 (09:22 -0400)
This is still an intermediate patch for LNet SMP improvements,
it covered a few things:
- create a new file lib-ptl.c, all functions about portals
  are moved into this file.
- always precreate match hash-table for all portals, it's a little
  wasty for unique-portal, but it will save a lot of problems for
  upcoming patches
- instead of storing all portals in contiguous buffer, we allocate
  memory for each portal in turn, this change is also for upcoming
  patches.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I19e61c7f3a01f1c90a9f3f78d48d81dc00cd037d
Reviewed-on: http://review.whamcloud.com/2926
Tested-by: Hudson
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/Makefile.in
lnet/lnet/api-ni.c
lnet/lnet/autoMakefile.am
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/lnet/lib-move.c
lnet/lnet/lib-ptl.c [new file with mode: 0644]

index 8a8b7a4..f5a49b7 100644 (file)
@@ -499,62 +499,6 @@ lnet_handle2me(lnet_handle_me_t *handle)
        return lh_entry(lh, lnet_me_t, me_lh);
 }
 
        return lh_entry(lh, lnet_me_t, me_lh);
 }
 
-static inline int
-lnet_portal_is_lazy(lnet_portal_t *ptl)
-{
-        return !!(ptl->ptl_options & LNET_PTL_LAZY);
-}
-
-static inline int
-lnet_portal_is_unique(lnet_portal_t *ptl)
-{
-        return !!(ptl->ptl_options & LNET_PTL_MATCH_UNIQUE); 
-}
-
-static inline int
-lnet_portal_is_wildcard(lnet_portal_t *ptl)
-{
-        return !!(ptl->ptl_options & LNET_PTL_MATCH_WILDCARD);
-}
-
-static inline void
-lnet_portal_setopt(lnet_portal_t *ptl, int opt)
-{
-        ptl->ptl_options |= opt;
-}
-
-static inline void
-lnet_portal_unsetopt(lnet_portal_t *ptl, int opt)
-{
-        ptl->ptl_options &= ~opt;
-}
-
-static inline int
-lnet_match_is_unique(lnet_process_id_t match_id,
-                     __u64 match_bits, __u64 ignore_bits)
-{
-        return ignore_bits == 0 &&
-               match_id.nid != LNET_NID_ANY &&
-               match_id.pid != LNET_PID_ANY;
-}
-
-static inline cfs_list_t *
-lnet_portal_me_head(int index, lnet_process_id_t id, __u64 mbits)
-{
-        lnet_portal_t *ptl = &the_lnet.ln_portals[index];
-
-        if (lnet_portal_is_wildcard(ptl)) {
-                return &ptl->ptl_mlist;
-        } else if (lnet_portal_is_unique(ptl)) {
-                LASSERT (ptl->ptl_mhash != NULL);
-                return &ptl->ptl_mhash[lnet_match_to_hash(id, mbits)];
-        }
-        return NULL;
-}
-
-cfs_list_t *lnet_portal_mhash_alloc(void);
-void lnet_portal_mhash_free(cfs_list_t *mhash);
-
 static inline void
 lnet_peer_addref_locked(lnet_peer_t *lp)
 {
 static inline void
 lnet_peer_addref_locked(lnet_peer_t *lp)
 {
@@ -679,13 +623,72 @@ lnet_remotenet_t *lnet_find_net_locked (__u32 net);
 int lnet_islocalnid(lnet_nid_t nid);
 int lnet_islocalnet(__u32 net);
 
 int lnet_islocalnid(lnet_nid_t nid);
 int lnet_islocalnet(__u32 net);
 
+void lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg);
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
 void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
 void lnet_return_credits_locked (lnet_msg_t *msg);
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
 void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
 void lnet_return_credits_locked (lnet_msg_t *msg);
+
+/* portals functions */
+static inline int
+lnet_ptl_is_lazy(lnet_portal_t *ptl)
+{
+       return !!(ptl->ptl_options & LNET_PTL_LAZY);
+}
+
+static inline int
+lnet_ptl_is_unique(lnet_portal_t *ptl)
+{
+       return !!(ptl->ptl_options & LNET_PTL_MATCH_UNIQUE);
+}
+
+static inline int
+lnet_ptl_is_wildcard(lnet_portal_t *ptl)
+{
+       return !!(ptl->ptl_options & LNET_PTL_MATCH_WILDCARD);
+}
+
+static inline void
+lnet_ptl_setopt(lnet_portal_t *ptl, int opt)
+{
+       ptl->ptl_options |= opt;
+}
+
+static inline void
+lnet_ptl_unsetopt(lnet_portal_t *ptl, int opt)
+{
+       ptl->ptl_options &= ~opt;
+}
+
+static inline cfs_list_t *
+lnet_ptl_me_head(int index, lnet_process_id_t id, __u64 mbits)
+{
+       lnet_portal_t *ptl = the_lnet.ln_portals[index];
+
+       if (lnet_ptl_is_wildcard(ptl)) {
+               return &ptl->ptl_mlist;
+       } else if (lnet_ptl_is_unique(ptl)) {
+               LASSERT(ptl->ptl_mhash != NULL);
+               return &ptl->ptl_mhash[lnet_match_to_hash(id, mbits)];
+       }
+       return NULL;
+}
+
+int lnet_portals_create(void);
+void lnet_portals_destroy(void);
+
+int lnet_ptl_type_match(struct lnet_portal *ptl, lnet_process_id_t id,
+                       __u64 mbits, __u64 ignore_bits);
 void lnet_match_blocked_msg(lnet_libmd_t *md);
 void lnet_match_blocked_msg(lnet_libmd_t *md);
+int lnet_match_md(int index, int op_mask, lnet_process_id_t src,
+                 unsigned int rlength, unsigned int roffset,
+                 __u64 match_bits, lnet_msg_t *msg,
+                 unsigned int *mlength_out, unsigned int *offset_out,
+                 lnet_libmd_t **md_out);
+
+/* message functions */
 int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
                 lnet_nid_t fromnid, void *private, int rdma_req);
 void lnet_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
 int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
                 lnet_nid_t fromnid, void *private, int rdma_req);
 void lnet_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
@@ -693,6 +696,8 @@ void lnet_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
 lnet_msg_t *lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *get_msg);
 void lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *msg, unsigned int len);
 void lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int rc);
 lnet_msg_t *lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *get_msg);
 void lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *msg, unsigned int len);
 void lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int rc);
+void lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason);
+void lnet_recv_delayed_msg_list(cfs_list_t *head);
 
 int lnet_msg_container_setup(struct lnet_msg_container *container);
 void lnet_msg_container_cleanup(struct lnet_msg_container *container);
 
 int lnet_msg_container_setup(struct lnet_msg_container *container);
 void lnet_msg_container_cleanup(struct lnet_msg_container *container);
index caec262..bec4c2b 100644 (file)
@@ -516,6 +516,15 @@ typedef struct {
 
 #define LNET_NRBPOOLS         3                 /* # different router buffer pools */
 
 
 #define LNET_NRBPOOLS         3                 /* # different router buffer pools */
 
+enum {
+       /* Didn't match anything */
+       LNET_MATCHMD_NONE       = (1 << 0),
+       /* Matched OK */
+       LNET_MATCHMD_OK         = (1 << 1),
+       /* Must be discarded */
+       LNET_MATCHMD_DROP       = (1 << 2),
+};
+
 /* Options for lnet_portal_t::ptl_options */
 #define LNET_PTL_LAZY               (1 << 0)
 #define LNET_PTL_MATCH_UNIQUE       (1 << 1)    /* unique match, for RDMA */
 /* Options for lnet_portal_t::ptl_options */
 #define LNET_PTL_LAZY               (1 << 0)
 #define LNET_PTL_MATCH_UNIQUE       (1 << 1)    /* unique match, for RDMA */
@@ -525,7 +534,8 @@ typedef struct {
 #define LNET_PORTAL_HASH_BITS        8
 #define LNET_PORTAL_HASH_SIZE       (1 << LNET_PORTAL_HASH_BITS)
 
 #define LNET_PORTAL_HASH_BITS        8
 #define LNET_PORTAL_HASH_SIZE       (1 << LNET_PORTAL_HASH_BITS)
 
-typedef struct {
+typedef struct lnet_portal {
+       unsigned int            ptl_index;      /* portal ID, reserved */
         cfs_list_t       *ptl_mhash;            /* match hash */
         cfs_list_t        ptl_mlist;            /* match list */
         cfs_list_t        ptl_msgq;             /* messages blocking for MD */
         cfs_list_t       *ptl_mhash;            /* match hash */
         cfs_list_t        ptl_mlist;            /* match list */
         cfs_list_t        ptl_msgq;             /* messages blocking for MD */
@@ -577,6 +587,8 @@ typedef struct
         int                    ln_init;             /* LNetInit() called? */
         int                    ln_refcount;         /* LNetNIInit/LNetNIFini counter */
         int                    ln_niinit_self;      /* Have I called LNetNIInit myself? */
         int                    ln_init;             /* LNetInit() called? */
         int                    ln_refcount;         /* LNetNIInit/LNetNIFini counter */
         int                    ln_niinit_self;      /* Have I called LNetNIInit myself? */
+       /* shutdown in progress */
+       int                             ln_shutdown;
 
         cfs_list_t             ln_lnds;             /* registered LNDs */
 
 
         cfs_list_t             ln_lnds;             /* registered LNDs */
 
@@ -604,11 +616,10 @@ typedef struct
        /* Event Queue container */
        struct lnet_res_container       ln_eq_container;
 
        /* Event Queue container */
        struct lnet_res_container       ln_eq_container;
 
-        /* Stuff initialised at LNetNIInit() */
-
-        int                    ln_shutdown;         /* shutdown in progress */
-        int                    ln_nportals;         /* # portals */
-        lnet_portal_t         *ln_portals;          /* the vector of portals */
+       /* # portals */
+       int                             ln_nportals;
+       /* the vector of portals */
+       lnet_portal_t                   **ln_portals;
 
         lnet_pid_t             ln_pid;              /* requested pid */
 
 
         lnet_pid_t             ln_pid;              /* requested pid */
 
index 3bc86f6..498abfc 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := lnet
 
 lnet-objs := api-errno.o api-ni.o config.o
 MODULES := lnet
 
 lnet-objs := api-errno.o api-ni.o config.o
-lnet-objs += lib-me.o lib-msg.o lib-eq.o lib-md.o
+lnet-objs += lib-me.o lib-msg.o lib-eq.o lib-md.o lib-ptl.o
 lnet-objs += lib-move.o module.o lo.o
 lnet-objs += router.o router_proc.o acceptor.o peer.o
 
 lnet-objs += lib-move.o module.o lo.o
 lnet-objs += router.o router_proc.o acceptor.o peer.o
 
index 720682d..e69edf6 100644 (file)
@@ -556,39 +556,6 @@ lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh)
        cfs_list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
 }
 
        cfs_list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
 }
 
-cfs_list_t *
-lnet_portal_mhash_alloc(void)
-{
-        cfs_list_t       *mhash;
-        int               i;
-
-        LIBCFS_ALLOC(mhash, sizeof(cfs_list_t) * LNET_PORTAL_HASH_SIZE);
-        if (mhash == NULL)
-                return NULL;
-
-        for (i = 0; i < LNET_PORTAL_HASH_SIZE; i++)
-                CFS_INIT_LIST_HEAD(&mhash[i]);
-
-        return mhash;
-}
-
-void
-lnet_portal_mhash_free(cfs_list_t *mhash)
-{
-        int     i;
-
-        for (i = 0; i < LNET_PORTAL_HASH_SIZE; i++) {
-                while (!cfs_list_empty(&mhash[i])) {
-                        lnet_me_t *me = cfs_list_entry(mhash[i].next,
-                                                       lnet_me_t, me_list);
-                        CERROR ("Active ME %p on exit portal mhash\n", me);
-                        cfs_list_del(&me->me_list);
-                        lnet_me_free(me);
-                }
-        }
-        LIBCFS_FREE(mhash, sizeof(cfs_list_t) * LNET_PORTAL_HASH_SIZE);
-}
-
 #ifndef __KERNEL__
 /**
  * Reserved API - do not use.
 #ifndef __KERNEL__
 /**
  * Reserved API - do not use.
@@ -607,7 +574,6 @@ lnet_prepare(lnet_pid_t requested_pid)
 {
         /* Prepare to bring up the network */
         int               rc = 0;
 {
         /* Prepare to bring up the network */
         int               rc = 0;
-        int               i;
 
         LASSERT (the_lnet.ln_refcount == 0);
 
 
         LASSERT (the_lnet.ln_refcount == 0);
 
@@ -679,22 +645,13 @@ lnet_prepare(lnet_pid_t requested_pid)
                goto failed3;
        }
 
                goto failed3;
        }
 
-        the_lnet.ln_nportals = MAX_PORTALS;
-        LIBCFS_ALLOC(the_lnet.ln_portals,
-                     the_lnet.ln_nportals *
-                     sizeof(*the_lnet.ln_portals));
-        if (the_lnet.ln_portals == NULL) {
-                rc = -ENOMEM;
-                goto failed3;
-        }
-
-        for (i = 0; i < the_lnet.ln_nportals; i++) {
-                CFS_INIT_LIST_HEAD(&(the_lnet.ln_portals[i].ptl_mlist));
-                CFS_INIT_LIST_HEAD(&(the_lnet.ln_portals[i].ptl_msgq));
-                the_lnet.ln_portals[i].ptl_options = 0;
-        }
+       rc = lnet_portals_create();
+       if (rc != 0) {
+               CERROR("Failed to create portals for LNet: %d\n", rc);
+               goto failed3;
+       }
 
 
-        return 0;
+       return 0;
 
  failed3:
        /* NB: lnet_res_container_cleanup is safe to call for
 
  failed3:
        /* NB: lnet_res_container_cleanup is safe to call for
@@ -713,8 +670,6 @@ lnet_prepare(lnet_pid_t requested_pid)
 int
 lnet_unprepare (void)
 {
 int
 lnet_unprepare (void)
 {
-        int       idx;
-
         /* NB no LNET_LOCK since this is the last reference.  All LND instances
          * have shut down already, so it is safe to unlink and free all
          * descriptors, even those that appear committed to a network op (eg MD
         /* NB no LNET_LOCK since this is the last reference.  All LND instances
          * have shut down already, so it is safe to unlink and free all
          * descriptors, even those that appear committed to a network op (eg MD
@@ -728,33 +683,13 @@ lnet_unprepare (void)
         LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
         LASSERT (the_lnet.ln_nzombie_nis == 0);
 
         LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
         LASSERT (the_lnet.ln_nzombie_nis == 0);
 
-        for (idx = 0; idx < the_lnet.ln_nportals; idx++) {
-                lnet_portal_t *ptl = &the_lnet.ln_portals[idx];
-
-                LASSERT (cfs_list_empty(&ptl->ptl_msgq));
-
-                while (!cfs_list_empty(&ptl->ptl_mlist)) {
-                        lnet_me_t *me = cfs_list_entry(ptl->ptl_mlist.next,
-                                                       lnet_me_t, me_list);
-                        CERROR ("Active ME %p on exit\n", me);
-                        cfs_list_del (&me->me_list);
-                        lnet_me_free (me);
-                }
-
-                if (ptl->ptl_mhash != NULL) {
-                        LASSERT (lnet_portal_is_unique(ptl));
-                        lnet_portal_mhash_free(ptl->ptl_mhash);
-                }
-        }
+       lnet_portals_destroy();
 
        lnet_res_container_cleanup(&the_lnet.ln_md_container);
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
 
        lnet_res_container_cleanup(&the_lnet.ln_md_container);
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
-       LIBCFS_FREE(the_lnet.ln_portals,
-                   the_lnet.ln_nportals * sizeof(*the_lnet.ln_portals));
-
-       lnet_free_rtrpools();
+        lnet_free_rtrpools();
        lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
        lnet_peer_table_destroy();
 
        lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
        lnet_peer_table_destroy();
 
index 4f6d14b..298e17d 100644 (file)
@@ -1,6 +1,6 @@
 my_sources =    api-errno.c api-ni.c config.c \
                lib-me.c lib-msg.c lib-eq.c \
 my_sources =    api-errno.c api-ni.c config.c \
                lib-me.c lib-msg.c lib-eq.c \
-               lib-md.c lib-move.c lo.c \
+               lib-md.c lib-ptl.c lib-move.c lo.c \
                router.c router_proc.c \
                acceptor.c peer.c
 
                router.c router_proc.c \
                acceptor.c peer.c
 
@@ -22,7 +22,7 @@ if DARWIN
 macos_PROGRAMS := lnet
 
 lnet_SOURCES := api-errno.c api-ni.c config.c
 macos_PROGRAMS := lnet
 
 lnet_SOURCES := api-errno.c api-ni.c config.c
-lnet_SOURCES += lib-me.c lib-msg.c lib-eq.c lib-md.c
+lnet_SOURCES += lib-me.c lib-msg.c lib-eq.c lib-md.c lib-ptl.c
 lnet_SOURCES += lib-move.c module.c lo.c router.c router_proc.c
 lnet_SOURCES += acceptor.c peer.c
 
 lnet_SOURCES += lib-move.c module.c lo.c router.c router_proc.c
 lnet_SOURCES += acceptor.c peer.c
 
index 821178d..137299c 100644 (file)
@@ -287,7 +287,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
         } else {
                 rc = lib_md_build(md, &umd, unlink);
                 if (rc == 0) {
         } else {
                 rc = lib_md_build(md, &umd, unlink);
                 if (rc == 0) {
-                        the_lnet.ln_portals[me->me_portal].ptl_ml_version++;
+                       the_lnet.ln_portals[me->me_portal]->ptl_ml_version++;
 
                         me->me_md = md;
                         md->md_me = me;
 
                         me->me_md = md;
                         md->md_me = me;
index 31a8575..d798b19 100644 (file)
 
 #include <lnet/lib-lnet.h>
 
 
 #include <lnet/lib-lnet.h>
 
-static int
-lnet_me_match_portal(lnet_portal_t *ptl, lnet_process_id_t id,
-                     __u64 match_bits, __u64 ignore_bits)
-{
-        cfs_list_t       *mhash = NULL;
-        int               unique;
-
-        LASSERT (!(lnet_portal_is_unique(ptl) &&
-                   lnet_portal_is_wildcard(ptl)));
-
-        /* prefer to check w/o any lock */
-        unique = lnet_match_is_unique(id, match_bits, ignore_bits);
-        if (likely(lnet_portal_is_unique(ptl) ||
-                   lnet_portal_is_wildcard(ptl)))
-                goto match;
-
-        /* unset, new portal */
-        if (unique) {
-                mhash = lnet_portal_mhash_alloc();
-                if (mhash == NULL)
-                        return -ENOMEM;
-        }
-
-        LNET_LOCK();
-        if (lnet_portal_is_unique(ptl) ||
-            lnet_portal_is_wildcard(ptl)) {
-                /* someone set it before me */
-                if (mhash != NULL)
-                        lnet_portal_mhash_free(mhash);
-                LNET_UNLOCK();
-                goto match;
-        }
-
-        /* still not set */
-        LASSERT (ptl->ptl_mhash == NULL);
-        if (unique) {
-                ptl->ptl_mhash = mhash;
-                lnet_portal_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
-        } else {
-                lnet_portal_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
-        }
-        LNET_UNLOCK();
-        return 0;
-
- match:
-        if (lnet_portal_is_unique(ptl) && !unique)
-                return -EPERM;
-
-        if (lnet_portal_is_wildcard(ptl) && unique)
-                return -EPERM;
-
-        return 0;
-}
-
 /**
  * Create and attach a match entry to the match list of \a portal. The new
  * ME is empty, i.e. not associated with a memory descriptor. LNetMDAttach()
 /**
  * Create and attach a match entry to the match list of \a portal. The new
  * ME is empty, i.e. not associated with a memory descriptor. LNetMDAttach()
@@ -142,10 +88,10 @@ LNetMEAttach(unsigned int portal,
         if ((int)portal >= the_lnet.ln_nportals)
                 return -EINVAL;
 
         if ((int)portal >= the_lnet.ln_nportals)
                 return -EINVAL;
 
-        ptl = &the_lnet.ln_portals[portal];
-        rc = lnet_me_match_portal(ptl, match_id, match_bits, ignore_bits);
-        if (rc != 0)
-                return rc;
+       ptl = the_lnet.ln_portals[portal];
+       rc = lnet_ptl_type_match(ptl, match_id, match_bits, ignore_bits);
+       if (!rc)
+               return -EPERM;
 
         me = lnet_me_alloc();
         if (me == NULL)
 
         me = lnet_me_alloc();
         if (me == NULL)
@@ -161,7 +107,7 @@ LNetMEAttach(unsigned int portal,
         me->me_md = NULL;
 
        lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh);
         me->me_md = NULL;
 
        lnet_res_lh_initialize(&the_lnet.ln_me_container, &me->me_lh);
-        head = lnet_portal_me_head(portal, match_id, match_bits);
+       head = lnet_ptl_me_head(portal, match_id, match_bits);
         LASSERT (head != NULL);
 
         if (pos == LNET_INS_AFTER)
         LASSERT (head != NULL);
 
         if (pos == LNET_INS_AFTER)
@@ -223,8 +169,8 @@ LNetMEInsert(lnet_handle_me_t current_meh,
 
         LASSERT (current_me->me_portal < the_lnet.ln_nportals);
 
 
         LASSERT (current_me->me_portal < the_lnet.ln_nportals);
 
-        ptl = &the_lnet.ln_portals[current_me->me_portal];
-        if (lnet_portal_is_unique(ptl)) {
+       ptl = the_lnet.ln_portals[current_me->me_portal];
+       if (lnet_ptl_is_unique(ptl)) {
                 /* nosense to insertion on unique portal */
                lnet_me_free_locked(new_me);
                 LNET_UNLOCK();
                 /* nosense to insertion on unique portal */
                lnet_me_free_locked(new_me);
                 LNET_UNLOCK();
index 9661ec3..683d555 100644 (file)
@@ -44,172 +44,6 @@ static int local_nid_dist_zero = 1;
 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
                 "Reserved");
 
 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
                 "Reserved");
 
-/* forward ref */
-static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
-
-#define LNET_MATCHMD_NONE     0   /* Didn't match */
-#define LNET_MATCHMD_OK       1   /* Matched OK */
-#define LNET_MATCHMD_DROP     2   /* Must be discarded */
-
-static int
-lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
-                   unsigned int rlength, unsigned int roffset,
-                   __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
-                   unsigned int *mlength_out, unsigned int *offset_out)
-{
-        /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
-         * lnet_match_blocked_msg() relies on this to avoid races */
-        unsigned int  offset;
-        unsigned int  mlength;
-        lnet_me_t    *me = md->md_me;
-
-        /* mismatched MD op */
-        if ((md->md_options & op_mask) == 0)
-                return LNET_MATCHMD_NONE;
-
-        /* MD exhausted */
-        if (lnet_md_exhausted(md))
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME nid/pid? */
-        if (me->me_match_id.nid != LNET_NID_ANY &&
-            me->me_match_id.nid != src.nid)
-                return LNET_MATCHMD_NONE;
-
-        if (me->me_match_id.pid != LNET_PID_ANY &&
-            me->me_match_id.pid != src.pid)
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME matchbits? */
-        if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
-                return LNET_MATCHMD_NONE;
-
-        /* Hurrah! This _is_ a match; check it out... */
-
-        if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
-                offset = md->md_offset;
-        else
-                offset = roffset;
-
-        if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
-                mlength = md->md_max_size;
-                LASSERT (md->md_offset + mlength <= md->md_length);
-        } else {
-                mlength = md->md_length - offset;
-        }
-
-        if (rlength <= mlength) {        /* fits in allowed space */
-                mlength = rlength;
-        } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
-                /* this packet _really_ is too big */
-                CERROR("Matching packet from %s, match "LPU64
-                       " length %d too big: %d left, %d allowed\n",
-                       libcfs_id2str(src), match_bits, rlength,
-                       md->md_length - offset, mlength);
-
-                return LNET_MATCHMD_DROP;
-        }
-
-        /* Commit to this ME/MD */
-        CDEBUG(D_NET, "Incoming %s index %x from %s of "
-               "length %d/%d into md "LPX64" [%d] + %d\n",
-               (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
-               index, libcfs_id2str(src), mlength, rlength,
-               md->md_lh.lh_cookie, md->md_niov, offset);
-
-        lnet_commit_md(md, msg);
-        md->md_offset = offset + mlength;
-
-        /* NB Caller will set ev.type and ev.hdr_data */
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.pt_index = index;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = rlength;
-        msg->msg_ev.mlength = mlength;
-        msg->msg_ev.offset = offset;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
-        *offset_out = offset;
-        *mlength_out = mlength;
-
-        /* Auto-unlink NOW, so the ME gets unlinked if required.
-         * We bumped md->md_refcount above so the MD just gets flagged
-         * for unlink when it is finalized. */
-        if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
-            lnet_md_exhausted(md)) {
-                lnet_md_unlink(md);
-        }
-
-        return LNET_MATCHMD_OK;
-}
-
-static int
-lnet_match_md(int index, int op_mask, lnet_process_id_t src,
-              unsigned int rlength, unsigned int roffset,
-              __u64 match_bits, lnet_msg_t *msg,
-              unsigned int *mlength_out, unsigned int *offset_out,
-              lnet_libmd_t **md_out)
-{
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[index];
-        cfs_list_t       *head;
-        lnet_me_t        *me;
-        lnet_me_t        *tmp;
-        lnet_libmd_t     *md;
-        int               rc;
-
-        CDEBUG (D_NET, "Request from %s of length %d into portal %d "
-                "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
-
-        if (index < 0 || index >= the_lnet.ln_nportals) {
-                CERROR("Invalid portal %d not in [0-%d]\n",
-                       index, the_lnet.ln_nportals);
-                return LNET_MATCHMD_DROP;
-        }
-
-        head = lnet_portal_me_head(index, src, match_bits);
-        if (head == NULL) /* nobody posted anything on this portal */
-                goto out;
-
-        cfs_list_for_each_entry_safe_typed (me, tmp, head,
-                                            lnet_me_t, me_list) {
-                md = me->me_md;
-
-                /* ME attached but MD not attached yet */
-                if (md == NULL)
-                        continue;
-
-                LASSERT (me == md->md_me);
-
-                rc = lnet_try_match_md(index, op_mask, src, rlength,
-                                       roffset, match_bits, md, msg,
-                                       mlength_out, offset_out);
-                switch (rc) {
-                default:
-                        LBUG();
-
-                case LNET_MATCHMD_NONE:
-                        continue;
-
-                case LNET_MATCHMD_OK:
-                        *md_out = md;
-                        return LNET_MATCHMD_OK;
-
-                case LNET_MATCHMD_DROP:
-                        return LNET_MATCHMD_DROP;
-                }
-                /* not reached */
-        }
-
- out:
-        if (op_mask == LNET_MD_OP_GET ||
-            !lnet_portal_is_lazy(ptl))
-                return LNET_MATCHMD_DROP;
-
-        return LNET_MATCHMD_NONE;
-}
-
 int
 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
 {
 int
 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
 {
@@ -1502,7 +1336,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         return 0;
 }
 
         return 0;
 }
 
-static void
+void
 lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
 {
         /* ALWAYS called holding the LNET_LOCK */
 lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
 {
         /* ALWAYS called holding the LNET_LOCK */
@@ -1543,140 +1377,6 @@ lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
 }
 
 static void
 }
 
 static void
-lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
-{
-        lnet_process_id_t id = {0};
-
-        id.nid = msg->msg_hdr.src_nid;
-        id.pid = msg->msg_hdr.src_pid;
-
-        LASSERT (msg->msg_md == NULL);
-        LASSERT (msg->msg_delayed);
-        LASSERT (msg->msg_rxpeer != NULL);
-        LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
-
-        CWARN("Dropping delayed PUT from %s portal %d match "LPU64
-              " offset %d length %d: %s\n", 
-              libcfs_id2str(id),
-              msg->msg_hdr.msg.put.ptl_index,
-              msg->msg_hdr.msg.put.match_bits,
-              msg->msg_hdr.msg.put.offset,
-              msg->msg_hdr.payload_length,
-              reason);
-
-        /* NB I can't drop msg's ref on msg_rxpeer until after I've
-         * called lnet_drop_message(), so I just hang onto msg as well
-         * until that's done */
-
-        lnet_drop_message(msg->msg_rxpeer->lp_ni,
-                          msg->msg_private, msg->msg_len);
-
-        LNET_LOCK();
-
-        lnet_peer_decref_locked(msg->msg_rxpeer);
-        msg->msg_rxpeer = NULL;
-
-       lnet_msg_free_locked(msg);
-
-        LNET_UNLOCK();
-}
-
-/**
- * Turn on the lazy portal attribute. Use with caution!
- *
- * This portal attribute only affects incoming PUT requests to the portal,
- * and is off by default. By default, if there's no matching MD for an
- * incoming PUT request, it is simply dropped. With the lazy attribute on,
- * such requests are queued indefinitely until either a matching MD is
- * posted to the portal or the lazy attribute is turned off.
- *
- * It would prevent dropped requests, however it should be regarded as the
- * last line of defense - i.e. users must keep a close watch on active
- * buffers on a lazy portal and once it becomes too low post more buffers as
- * soon as possible. This is because delayed requests usually have detrimental
- * effects on underlying network connections. A few delayed requests often
- * suffice to bring an underlying connection to a complete halt, due to flow
- * control mechanisms.
- *
- * There's also a DOS attack risk. If users don't post match-all MDs on a
- * lazy portal, a malicious peer can easily stop a service by sending some
- * PUT requests with match bits that won't match any MD. A routed server is
- * especially vulnerable since the connections to its neighbor routers are
- * shared among all clients.
- *
- * \param portal Index of the portal to enable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetSetLazyPortal(int portal)
-{
-        lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
-
-        LNET_LOCK();
-        lnet_portal_setopt(ptl, LNET_PTL_LAZY);
-        LNET_UNLOCK();
-
-        return 0;
-}
-
-/**
- * Turn off the lazy portal attribute. Delayed requests on the portal,
- * if any, will be all dropped when this function returns.
- *
- * \param portal Index of the portal to disable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetClearLazyPortal(int portal)
-{
-        cfs_list_t        zombies;
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[portal];
-        lnet_msg_t       *msg;
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        LNET_LOCK();
-
-        if (!lnet_portal_is_lazy(ptl)) {
-                LNET_UNLOCK();
-                return 0;
-        }
-
-        if (the_lnet.ln_shutdown)
-                CWARN ("Active lazy portal %d on exit\n", portal);
-        else
-                CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
-
-        /* grab all the blocked messages atomically */
-        cfs_list_add(&zombies, &ptl->ptl_msgq);
-        cfs_list_del_init(&ptl->ptl_msgq);
-
-        ptl->ptl_msgq_version++;
-        lnet_portal_unsetopt(ptl, LNET_PTL_LAZY);
-
-        LNET_UNLOCK();
-
-        while (!cfs_list_empty(&zombies)) {
-                msg = cfs_list_entry(zombies.next, lnet_msg_t, msg_list);
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
-        }
-
-        return 0;
-}
-
-static void
 lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
               unsigned int offset, unsigned int mlength)
 {
 lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
               unsigned int offset, unsigned int mlength)
 {
@@ -1708,104 +1408,6 @@ lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
                      hdr->payload_length);
 }
 
                      hdr->payload_length);
 }
 
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
-{
-        CFS_LIST_HEAD    (drops);
-        CFS_LIST_HEAD    (matches);
-        cfs_list_t       *tmp;
-        cfs_list_t       *entry;
-        lnet_msg_t       *msg;
-        lnet_portal_t    *ptl;
-        lnet_me_t        *me  = md->md_me;
-
-        LASSERT (me->me_portal < (unsigned int)the_lnet.ln_nportals);
-
-        ptl = &the_lnet.ln_portals[me->me_portal];
-        if (!lnet_portal_is_lazy(ptl)) {
-                LASSERT (cfs_list_empty(&ptl->ptl_msgq));
-                return;
-        }
-
-        LASSERT (md->md_refcount == 0); /* a brand new MD */
-
-        cfs_list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
-                int               rc;
-                int               index;
-                unsigned int      mlength;
-                unsigned int      offset;
-                lnet_hdr_t       *hdr;
-                lnet_process_id_t src;
-
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                LASSERT (msg->msg_delayed);
-
-                hdr   = &msg->msg_hdr;
-                index = hdr->msg.put.ptl_index;
-
-                src.nid = hdr->src_nid;
-                src.pid = hdr->src_pid;
-
-                rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
-                                       hdr->payload_length,
-                                       hdr->msg.put.offset,
-                                       hdr->msg.put.match_bits,
-                                       md, msg, &mlength, &offset);
-
-                if (rc == LNET_MATCHMD_NONE)
-                        continue;
-
-                /* Hurrah! This _is_ a match */
-                cfs_list_del(&msg->msg_list);
-                ptl->ptl_msgq_version++;
-
-                if (rc == LNET_MATCHMD_OK) {
-                        cfs_list_add_tail(&msg->msg_list, &matches);
-
-                        CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
-                               "match "LPU64" offset %d length %d.\n",
-                               libcfs_id2str(src),
-                               hdr->msg.put.ptl_index,
-                               hdr->msg.put.match_bits,
-                               hdr->msg.put.offset,
-                               hdr->payload_length);
-                } else {
-                        LASSERT (rc == LNET_MATCHMD_DROP);
-
-                        cfs_list_add_tail(&msg->msg_list, &drops);
-                }
-
-                if (lnet_md_exhausted(md))
-                        break;
-        }
-
-        LNET_UNLOCK();
-
-        cfs_list_for_each_safe (entry, tmp, &drops) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Bad match");
-        }
-
-        cfs_list_for_each_safe (entry, tmp, &matches) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                /* md won't disappear under me, since each msg
-                 * holds a ref on it */
-                lnet_recv_put(md, msg, 1,
-                              msg->msg_ev.offset,
-                              msg->msg_ev.mlength);
-        }
-
-        LNET_LOCK();
-}
-
 static int
 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
 static int
 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
@@ -1847,7 +1449,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
                 return 0;
 
         case LNET_MATCHMD_NONE:
                 return 0;
 
         case LNET_MATCHMD_NONE:
-                ptl = &the_lnet.ln_portals[index];
+               ptl = the_lnet.ln_portals[index];
                 version = ptl->ptl_ml_version;
 
                 rc = 0;
                 version = ptl->ptl_ml_version;
 
                 rc = 0;
@@ -1856,7 +1458,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 
                 if (rc == 0 &&
                     !the_lnet.ln_shutdown &&
 
                 if (rc == 0 &&
                     !the_lnet.ln_shutdown &&
-                    lnet_portal_is_lazy(ptl)) {
+                   lnet_ptl_is_lazy(ptl)) {
                         if (version != ptl->ptl_ml_version)
                                 goto again;
 
                         if (version != ptl->ptl_ml_version)
                                 goto again;
 
@@ -2410,6 +2012,80 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         return 0;
 }
 
         return 0;
 }
 
+void
+lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_process_id_t       id = {0};
+               lnet_msg_t              *msg;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_md == NULL);
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CWARN("Dropping delayed PUT from %s portal %d match "LPU64
+                     " offset %d length %d: %s\n",
+                     libcfs_id2str(id),
+                     msg->msg_hdr.msg.put.ptl_index,
+                     msg->msg_hdr.msg.put.match_bits,
+                     msg->msg_hdr.msg.put.offset,
+                     msg->msg_hdr.payload_length, reason);
+
+               /* NB I can't drop msg's ref on msg_rxpeer until after I've
+                * called lnet_drop_message(), so I just hang onto msg as well
+                * until that's done */
+
+               lnet_drop_message(msg->msg_rxpeer->lp_ni,
+                                 msg->msg_private, msg->msg_len);
+
+               LNET_LOCK();
+               lnet_peer_decref_locked(msg->msg_rxpeer);
+               LNET_UNLOCK();
+
+               lnet_msg_free(msg);
+       }
+}
+
+void
+lnet_recv_delayed_msg_list(cfs_list_t *head)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_msg_t        *msg;
+               lnet_process_id_t  id;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               /* md won't disappear under me, since each msg
+                * holds a ref on it */
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_md != NULL);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+                      "match "LPU64" offset %d length %d.\n",
+                       libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
+                       msg->msg_hdr.msg.put.match_bits,
+                       msg->msg_hdr.msg.put.offset,
+                       msg->msg_hdr.payload_length);
+
+               lnet_recv_put(msg->msg_md, msg, 1,
+                             msg->msg_ev.offset, msg->msg_ev.mlength);
+       }
+}
+
 /**
  * Initiate an asynchronous PUT operation.
  *
 /**
  * Initiate an asynchronous PUT operation.
  *
diff --git a/lnet/lnet/lib-ptl.c b/lnet/lnet/lib-ptl.c
new file mode 100644 (file)
index 0000000..0f7e527
--- /dev/null
@@ -0,0 +1,510 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/lnet/lib-ptl.c
+ *
+ * portal & match routines
+ *
+ * Author: liang@whamcloud.com
+ */
+
+#define DEBUG_SUBSYSTEM S_LNET
+
+#include <lnet/lib-lnet.h>
+
+int
+lnet_ptl_type_match(struct lnet_portal *ptl, lnet_process_id_t match_id,
+                   __u64 mbits, __u64 ignore_bits)
+{
+       int unique;
+
+       unique = ignore_bits == 0 &&
+                match_id.nid != LNET_NID_ANY &&
+                match_id.pid != LNET_PID_ANY;
+
+       LASSERT(!lnet_ptl_is_unique(ptl) || !lnet_ptl_is_wildcard(ptl));
+
+       /* prefer to check w/o any lock */
+       if (likely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl)))
+               goto match;
+
+       /* unset, new portal */
+       LNET_LOCK();
+       /* check again with lock */
+       if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
+               LNET_UNLOCK();
+               goto match;
+       }
+
+       /* still not set */
+       if (unique)
+               lnet_ptl_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
+       else
+               lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
+
+       LNET_UNLOCK();
+
+       return 1;
+
+ match:
+       if ((lnet_ptl_is_unique(ptl) && !unique) ||
+           (lnet_ptl_is_wildcard(ptl) && unique))
+               return 0;
+       return 1;
+}
+
+static int
+lnet_try_match_md(int index, int op_mask, lnet_process_id_t src,
+                 unsigned int rlength, unsigned int roffset,
+                 __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
+                 unsigned int *mlength_out, unsigned int *offset_out)
+{
+       /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
+        * lnet_match_blocked_msg() relies on this to avoid races */
+       unsigned int    offset;
+       unsigned int    mlength;
+       lnet_me_t       *me = md->md_me;
+
+       /* mismatched MD op */
+       if ((md->md_options & op_mask) == 0)
+               return LNET_MATCHMD_NONE;
+
+       /* MD exhausted */
+       if (lnet_md_exhausted(md))
+               return LNET_MATCHMD_NONE;
+
+       /* mismatched ME nid/pid? */
+       if (me->me_match_id.nid != LNET_NID_ANY &&
+           me->me_match_id.nid != src.nid)
+               return LNET_MATCHMD_NONE;
+
+       if (me->me_match_id.pid != LNET_PID_ANY &&
+           me->me_match_id.pid != src.pid)
+               return LNET_MATCHMD_NONE;
+
+       /* mismatched ME matchbits? */
+       if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
+               return LNET_MATCHMD_NONE;
+
+       /* Hurrah! This _is_ a match; check it out... */
+
+       if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
+               offset = md->md_offset;
+       else
+               offset = roffset;
+
+       if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
+               mlength = md->md_max_size;
+               LASSERT(md->md_offset + mlength <= md->md_length);
+       } else {
+               mlength = md->md_length - offset;
+       }
+
+       if (rlength <= mlength) {        /* fits in allowed space */
+               mlength = rlength;
+       } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
+               /* this packet _really_ is too big */
+               CERROR("Matching packet from %s, match "LPU64
+                      " length %d too big: %d left, %d allowed\n",
+                      libcfs_id2str(src), match_bits, rlength,
+                      md->md_length - offset, mlength);
+
+               return LNET_MATCHMD_DROP;
+       }
+
+       /* Commit to this ME/MD */
+       CDEBUG(D_NET, "Incoming %s index %x from %s of "
+              "length %d/%d into md "LPX64" [%d] + %d\n",
+              (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
+              index, libcfs_id2str(src), mlength, rlength,
+              md->md_lh.lh_cookie, md->md_niov, offset);
+
+       lnet_commit_md(md, msg);
+       md->md_offset = offset + mlength;
+
+       /* NB Caller will set ev.type and ev.hdr_data */
+       msg->msg_ev.initiator = src;
+       msg->msg_ev.pt_index = index;
+       msg->msg_ev.match_bits = match_bits;
+       msg->msg_ev.rlength = rlength;
+       msg->msg_ev.mlength = mlength;
+       msg->msg_ev.offset = offset;
+
+       lnet_md_deconstruct(md, &msg->msg_ev.md);
+       lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+       *offset_out = offset;
+       *mlength_out = mlength;
+
+       /* Auto-unlink NOW, so the ME gets unlinked if required.
+        * We bumped md->md_refcount above so the MD just gets flagged
+        * for unlink when it is finalized. */
+       if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
+           lnet_md_exhausted(md)) {
+               lnet_md_unlink(md);
+       }
+
+       return LNET_MATCHMD_OK;
+}
+
+int
+lnet_match_md(int index, int op_mask, lnet_process_id_t src,
+             unsigned int rlength, unsigned int roffset,
+             __u64 match_bits, lnet_msg_t *msg,
+             unsigned int *mlength_out, unsigned int *offset_out,
+             lnet_libmd_t **md_out)
+{
+       struct lnet_portal      *ptl = the_lnet.ln_portals[index];
+       cfs_list_t              *head;
+       lnet_me_t               *me;
+       lnet_me_t               *tmp;
+       lnet_libmd_t            *md;
+       int                     rc;
+
+       CDEBUG(D_NET, "Request from %s of length %d into portal %d "
+              "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
+
+       if (index < 0 || index >= the_lnet.ln_nportals) {
+               CERROR("Invalid portal %d not in [0-%d]\n",
+                      index, the_lnet.ln_nportals);
+               return LNET_MATCHMD_DROP;
+       }
+
+       head = lnet_ptl_me_head(index, src, match_bits);
+       if (head == NULL) /* nobody posted anything on this portal */
+               goto out;
+
+       cfs_list_for_each_entry_safe(me, tmp, head, me_list) {
+               md = me->me_md;
+
+               /* ME attached but MD not attached yet */
+               if (md == NULL)
+                       continue;
+
+               LASSERT(me == md->md_me);
+
+               rc = lnet_try_match_md(index, op_mask, src, rlength,
+                                      roffset, match_bits, md, msg,
+                                      mlength_out, offset_out);
+               switch (rc) {
+               default:
+                       LBUG();
+
+               case LNET_MATCHMD_NONE:
+                       continue;
+
+               case LNET_MATCHMD_OK:
+                       *md_out = md;
+                       return LNET_MATCHMD_OK;
+
+               case LNET_MATCHMD_DROP:
+                       return LNET_MATCHMD_DROP;
+               }
+               /* not reached */
+       }
+
+ out:
+       if (op_mask == LNET_MD_OP_GET ||
+           !lnet_ptl_is_lazy(ptl))
+               return LNET_MATCHMD_DROP;
+
+       return LNET_MATCHMD_NONE;
+}
+
+/* called with LNET_LOCK held */
+void
+lnet_match_blocked_msg(lnet_libmd_t *md)
+{
+       CFS_LIST_HEAD           (drops);
+       CFS_LIST_HEAD           (matches);
+       cfs_list_t              *tmp;
+       cfs_list_t              *entry;
+       lnet_msg_t              *msg;
+       struct lnet_portal      *ptl;
+       lnet_me_t               *me  = md->md_me;
+
+       LASSERT(me->me_portal < (unsigned int)the_lnet.ln_nportals);
+
+       ptl = the_lnet.ln_portals[me->me_portal];
+       if (!lnet_ptl_is_lazy(ptl)) {
+               LASSERT(cfs_list_empty(&ptl->ptl_msgq));
+               return;
+       }
+
+       LASSERT(md->md_refcount == 0); /* a brand new MD */
+
+       cfs_list_for_each_safe(entry, tmp, &ptl->ptl_msgq) {
+               int               rc;
+               int               index;
+               unsigned int      mlength;
+               unsigned int      offset;
+               lnet_hdr_t       *hdr;
+               lnet_process_id_t src;
+
+               msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
+
+               LASSERT(msg->msg_delayed);
+
+               hdr   = &msg->msg_hdr;
+               index = hdr->msg.put.ptl_index;
+
+               src.nid = hdr->src_nid;
+               src.pid = hdr->src_pid;
+
+               rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
+                                      hdr->payload_length,
+                                      hdr->msg.put.offset,
+                                      hdr->msg.put.match_bits,
+                                      md, msg, &mlength, &offset);
+
+               if (rc == LNET_MATCHMD_NONE)
+                       continue;
+
+               /* Hurrah! This _is_ a match */
+               cfs_list_del(&msg->msg_list);
+               ptl->ptl_msgq_version++;
+
+               if (rc == LNET_MATCHMD_OK) {
+                       cfs_list_add_tail(&msg->msg_list, &matches);
+
+                       CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+                              "match "LPU64" offset %d length %d.\n",
+                              libcfs_id2str(src),
+                              hdr->msg.put.ptl_index,
+                              hdr->msg.put.match_bits,
+                              hdr->msg.put.offset,
+                              hdr->payload_length);
+               } else {
+                       LASSERT(rc == LNET_MATCHMD_DROP);
+
+                       cfs_list_add_tail(&msg->msg_list, &drops);
+               }
+
+               if (lnet_md_exhausted(md))
+                       break;
+       }
+
+       LNET_UNLOCK();
+
+       lnet_drop_delayed_msg_list(&drops, "Bad match");
+       lnet_recv_delayed_msg_list(&matches);
+
+       LNET_LOCK();
+}
+
+void
+lnet_ptl_cleanup(struct lnet_portal *ptl)
+{
+       lnet_me_t               *me;
+       int                     j;
+
+       LASSERT(cfs_list_empty(&ptl->ptl_msgq));
+       LASSERT(cfs_list_empty(&ptl->ptl_mlist));
+
+       if (ptl->ptl_mhash == NULL) /* uninitialized portal */
+               return;
+
+       /* cleanup ME */
+       while (!cfs_list_empty(&ptl->ptl_mlist)) {
+               me = cfs_list_entry(ptl->ptl_mlist.next,
+                                   lnet_me_t, me_list);
+               CERROR("Active wildcard ME %p on exit\n", me);
+               cfs_list_del(&me->me_list);
+               lnet_me_free(me);
+       }
+
+       for (j = 0; j < LNET_PORTAL_HASH_SIZE; j++) {
+               while (!cfs_list_empty(&ptl->ptl_mhash[j])) {
+                       me = cfs_list_entry(ptl->ptl_mhash[j].next,
+                                      lnet_me_t, me_list);
+                       CERROR("Active unique ME %p on exit\n", me);
+                       cfs_list_del(&me->me_list);
+                       lnet_me_free(me);
+               }
+       }
+
+       LIBCFS_FREE(ptl->ptl_mhash,
+                   LNET_PORTAL_HASH_SIZE * sizeof(ptl->ptl_mhash[0]));
+       ptl->ptl_mhash = NULL; /* mark it as finalized */
+}
+
+int
+lnet_ptl_setup(struct lnet_portal *ptl, int index)
+{
+       cfs_list_t              *mhash;
+       int                     i;
+
+       ptl->ptl_index = index;
+       CFS_INIT_LIST_HEAD(&ptl->ptl_msgq);
+       CFS_INIT_LIST_HEAD(&ptl->ptl_mlist);
+
+       LIBCFS_ALLOC(mhash, sizeof(*mhash) * LNET_PORTAL_HASH_SIZE);
+       if (mhash == NULL) {
+               CERROR("Failed to create match table for portal %d\n", index);
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < LNET_PORTAL_HASH_SIZE; i++)
+               CFS_INIT_LIST_HEAD(&mhash[i]);
+
+       ptl->ptl_mhash = mhash; /* initialized */
+
+       return 0;
+}
+
+void
+lnet_portals_destroy(void)
+{
+       int     i;
+
+       if (the_lnet.ln_portals == NULL)
+               return;
+
+       for (i = 0; i < the_lnet.ln_nportals; i++)
+               lnet_ptl_cleanup(the_lnet.ln_portals[i]);
+
+       cfs_array_free(the_lnet.ln_portals);
+       the_lnet.ln_portals = NULL;
+}
+
+int
+lnet_portals_create(void)
+{
+       int     size;
+       int     i;
+
+       size = sizeof(struct lnet_portal);
+
+       the_lnet.ln_nportals = MAX_PORTALS;
+       the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
+       if (the_lnet.ln_portals == NULL) {
+               CERROR("Failed to allocate portals table\n");
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < the_lnet.ln_nportals; i++) {
+               if (lnet_ptl_setup(the_lnet.ln_portals[i], i)) {
+                       lnet_portals_destroy();
+                       return -ENOMEM;
+               }
+       }
+
+       return 0;
+}
+
+/**
+ * Turn on the lazy portal attribute. Use with caution!
+ *
+ * This portal attribute only affects incoming PUT requests to the portal,
+ * and is off by default. By default, if there's no matching MD for an
+ * incoming PUT request, it is simply dropped. With the lazy attribute on,
+ * such requests are queued indefinitely until either a matching MD is
+ * posted to the portal or the lazy attribute is turned off.
+ *
+ * It would prevent dropped requests, however it should be regarded as the
+ * last line of defense - i.e. users must keep a close watch on active
+ * buffers on a lazy portal and once it becomes too low post more buffers as
+ * soon as possible. This is because delayed requests usually have detrimental
+ * effects on underlying network connections. A few delayed requests often
+ * suffice to bring an underlying connection to a complete halt, due to flow
+ * control mechanisms.
+ *
+ * There's also a DOS attack risk. If users don't post match-all MDs on a
+ * lazy portal, a malicious peer can easily stop a service by sending some
+ * PUT requests with match bits that won't match any MD. A routed server is
+ * especially vulnerable since the connections to its neighbor routers are
+ * shared among all clients.
+ *
+ * \param portal Index of the portal to enable the lazy attribute on.
+ *
+ * \retval 0       On success.
+ * \retval -EINVAL If \a portal is not a valid index.
+ */
+int
+LNetSetLazyPortal(int portal)
+{
+       struct lnet_portal *ptl;
+
+       if (portal < 0 || portal >= the_lnet.ln_nportals)
+               return -EINVAL;
+
+       CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
+       ptl = the_lnet.ln_portals[portal];
+
+       LNET_LOCK();
+       lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
+       LNET_UNLOCK();
+
+       return 0;
+}
+
+/**
+ * Turn off the lazy portal attribute. Delayed requests on the portal,
+ * if any, will be all dropped when this function returns.
+ *
+ * \param portal Index of the portal to disable the lazy attribute on.
+ *
+ * \retval 0       On success.
+ * \retval -EINVAL If \a portal is not a valid index.
+ */
+int
+LNetClearLazyPortal(int portal)
+{
+       struct lnet_portal      *ptl;
+       CFS_LIST_HEAD           (zombies);
+
+       if (portal < 0 || portal >= the_lnet.ln_nportals)
+               return -EINVAL;
+
+       ptl = the_lnet.ln_portals[portal];
+
+       LNET_LOCK();
+
+       if (!lnet_ptl_is_lazy(ptl)) {
+               LNET_UNLOCK();
+               return 0;
+       }
+
+       if (the_lnet.ln_shutdown)
+               CWARN("Active lazy portal %d on exit\n", portal);
+       else
+               CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
+
+       /* grab all the blocked messages atomically */
+       cfs_list_splice_init(&ptl->ptl_msgq, &zombies);
+
+       ptl->ptl_msgq_version++;
+       lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
+
+       LNET_UNLOCK();
+
+       lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");
+
+       return 0;
+}