Whamcloud - gitweb
LU-56 lnet: Granulate LNet lock
authorLiang Zhen <liang@whamcloud.com>
Thu, 7 Jun 2012 08:43:29 +0000 (16:43 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 25 Jun 2012 14:55:03 +0000 (10:55 -0400)
LNet is using a global lock LNET_LOCK to serialize all operations
and event callbacks of LNet, it's a big performance issue on fat
SMP machines because of high lock contention.

We have submitted many changes to separate critical logic of LNet
and this patch is the key step for finer-grained LNet locking.
This patch add a new lock "lnet_res_lock", all operations on LNet
resources (ME, MD, EQ) are under protection of this lock, we still
keep LNET_LOCK so far, but it's only called for serializing
operations on NI, peer, credits and routers.

This is still an intermediate patch for LNet SMP improvements, both
LNET_LOCK and lnet_res_lock are just spinlock now, they will be
replaced by percpt lock in upcoming patches.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I313caffd21776ee3474c2a1391ea78f002b47790
Reviewed-on: http://review.whamcloud.com/3056
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Lai Siyao <laisiyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/lib-ptl.c
lnet/lnet/router.c

index ab5dea5..bbddbe0 100644 (file)
@@ -87,6 +87,19 @@ static inline int lnet_md_unlinkable (lnet_libmd_t *md)
 }
 
 #ifdef __KERNEL__
+
+static inline void
+lnet_res_lock(void)
+{
+       cfs_spin_lock(&the_lnet.ln_res_lock);
+}
+
+static inline void
+lnet_res_unlock(void)
+{
+       cfs_spin_unlock(&the_lnet.ln_res_lock);
+}
+
 #define LNET_LOCK()        cfs_spin_lock(&the_lnet.ln_lock)
 #define LNET_UNLOCK()      cfs_spin_unlock(&the_lnet.ln_lock)
 #define LNET_MUTEX_LOCK(m)   cfs_mutex_lock(m)
@@ -105,15 +118,24 @@ do {                                            \
         (l) = 0;                                \
 } while (0)
 
-#define LNET_LOCK()        LNET_SINGLE_THREADED_LOCK(the_lnet.ln_lock)
-#define LNET_UNLOCK()      LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_lock)
-#define LNET_MUTEX_LOCK(m)     LNET_SINGLE_THREADED_LOCK(*(m))
-#define LNET_MUTEX_UNLOCK(m)   LNET_SINGLE_THREADED_UNLOCK(*(m))
+#define LNET_LOCK()            LNET_SINGLE_THREADED_LOCK(the_lnet.ln_lock)
+#define LNET_UNLOCK()          LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_lock)
+#define LNET_MUTEX_LOCK(m)     LNET_SINGLE_THREADED_LOCK(*(m))
+#define LNET_MUTEX_UNLOCK(m)   LNET_SINGLE_THREADED_UNLOCK(*(m))
+
+#define lnet_res_lock()                                \
+       LNET_SINGLE_THREADED_LOCK(the_lnet.ln_res_lock)
+#define lnet_res_unlock()                      \
+       LNET_SINGLE_THREADED_UNLOCK(the_lnet.ln_res_lock)
+
 # else
-#define LNET_LOCK()        pthread_mutex_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK()      pthread_mutex_unlock(&the_lnet.ln_lock)
-#define LNET_MUTEX_LOCK(m)     pthread_mutex_lock(m)
-#define LNET_MUTEX_UNLOCK(m)   pthread_mutex_unlock(m)
+#define LNET_LOCK()            pthread_mutex_lock(&the_lnet.ln_lock)
+#define LNET_UNLOCK()          pthread_mutex_unlock(&the_lnet.ln_lock)
+#define LNET_MUTEX_LOCK(m)     pthread_mutex_lock(m)
+#define LNET_MUTEX_UNLOCK(m)   pthread_mutex_unlock(m)
+#define lnet_res_lock()                pthread_mutex_lock(&the_lnet.ln_res_lock)
+#define lnet_res_unlock()      pthread_mutex_unlock(&the_lnet.ln_res_lock)
+
 # endif
 #endif
 
@@ -162,9 +184,9 @@ lnet_eq_alloc (void)
        struct lnet_res_container *rec = &the_lnet.ln_eq_container;
        lnet_eq_t                 *eq;
 
-       LNET_LOCK();
+       lnet_res_lock();
        eq = (lnet_eq_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        return eq;
 }
@@ -181,9 +203,9 @@ lnet_eq_free_locked(lnet_eq_t *eq)
 static inline void
 lnet_eq_free(lnet_eq_t *eq)
 {
-       LNET_LOCK();
+       lnet_res_lock();
        lnet_eq_free_locked(eq);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 }
 
 static inline lnet_libmd_t *
@@ -193,9 +215,9 @@ lnet_md_alloc (lnet_md_t *umd)
        struct lnet_res_container *rec = &the_lnet.ln_md_container;
        lnet_libmd_t              *md;
 
-       LNET_LOCK();
+       lnet_res_lock();
        md = (lnet_libmd_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        if (md != NULL)
                CFS_INIT_LIST_HEAD(&md->md_list);
@@ -215,9 +237,9 @@ lnet_md_free_locked(lnet_libmd_t *md)
 static inline void
 lnet_md_free(lnet_libmd_t *md)
 {
-       LNET_LOCK();
+       lnet_res_lock();
        lnet_md_free_locked(md);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 }
 
 static inline lnet_me_t *
@@ -227,9 +249,9 @@ lnet_me_alloc(void)
        struct lnet_res_container *rec = &the_lnet.ln_me_container;
        lnet_me_t                 *me;
 
-       LNET_LOCK();
+       lnet_res_lock();
        me = (lnet_me_t *)lnet_freelist_alloc(&rec->rec_freelist);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        return me;
 }
@@ -246,9 +268,9 @@ lnet_me_free_locked(lnet_me_t *me)
 static inline void
 lnet_me_free(lnet_me_t *me)
 {
-       LNET_LOCK();
+       lnet_res_lock();
        lnet_me_free_locked(me);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 }
 
 static inline lnet_msg_t *
index a9f6322..cb2bd3b 100644 (file)
@@ -623,24 +623,27 @@ typedef struct
         int                    ln_niinit_self;      /* Have I called LNetNIInit myself? */
        /* shutdown in progress */
        int                             ln_shutdown;
-
-        cfs_list_t             ln_lnds;             /* registered LNDs */
+       /* registered LNDs */
+       cfs_list_t                      ln_lnds;
 
 #ifdef __KERNEL__
-        cfs_spinlock_t         ln_lock;
-        cfs_mutex_t            ln_api_mutex;
-        cfs_mutex_t            ln_lnd_mutex;
+       cfs_spinlock_t                  ln_lock;
+       cfs_mutex_t                     ln_api_mutex;
+       cfs_mutex_t                     ln_lnd_mutex;
        cfs_waitq_t                     ln_eq_waitq;
+       cfs_spinlock_t                  ln_res_lock;
 #else
 # ifndef HAVE_LIBPTHREAD
-        int                    ln_lock;
-        int                    ln_api_mutex;
-        int                    ln_lnd_mutex;
+       int                             ln_lock;
+       int                             ln_api_mutex;
+       int                             ln_lnd_mutex;
+       int                             ln_res_lock;
 # else
-        pthread_mutex_t        ln_lock;
-        pthread_mutex_t        ln_api_mutex;
-        pthread_mutex_t        ln_lnd_mutex;
+       pthread_mutex_t                 ln_lock;
+       pthread_mutex_t                 ln_api_mutex;
+       pthread_mutex_t                 ln_lnd_mutex;
        pthread_cond_t                  ln_eq_cond;
+       pthread_mutex_t                 ln_res_lock;
 # endif
 #endif
        /* ME container  */
index f2b4b2b..b88704e 100644 (file)
@@ -91,7 +91,8 @@ lnet_get_networks(void)
 void
 lnet_init_locks(void)
 {
-       cfs_spin_lock_init (&the_lnet.ln_lock);
+       cfs_spin_lock_init(&the_lnet.ln_lock);
+       cfs_spin_lock_init(&the_lnet.ln_res_lock);
        cfs_waitq_init(&the_lnet.ln_eq_waitq);
        cfs_mutex_init(&the_lnet.ln_lnd_mutex);
        cfs_mutex_init(&the_lnet.ln_api_mutex);
@@ -173,16 +174,18 @@ lnet_get_networks (void)
 
 void lnet_init_locks(void)
 {
-        the_lnet.ln_lock = 0;
-        the_lnet.ln_lnd_mutex = 0;
-        the_lnet.ln_api_mutex = 0;
+       the_lnet.ln_lock = 0;
+       the_lnet.ln_res_lock = 0;
+       the_lnet.ln_lnd_mutex = 0;
+       the_lnet.ln_api_mutex = 0;
 }
 
 void lnet_fini_locks(void)
 {
-        LASSERT (the_lnet.ln_api_mutex == 0);
-        LASSERT (the_lnet.ln_lnd_mutex == 0);
-        LASSERT (the_lnet.ln_lock == 0);
+       LASSERT(the_lnet.ln_api_mutex == 0);
+       LASSERT(the_lnet.ln_lnd_mutex == 0);
+       LASSERT(the_lnet.ln_lock == 0);
+       LASSERT(the_lnet.ln_res_lock == 0);
 }
 
 # else
@@ -191,6 +194,7 @@ void lnet_init_locks(void)
 {
        pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
        pthread_mutex_init(&the_lnet.ln_lock, NULL);
+       pthread_mutex_init(&the_lnet.ln_res_lock, NULL);
        pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
        pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
 }
@@ -200,6 +204,7 @@ void lnet_fini_locks(void)
        pthread_mutex_destroy(&the_lnet.ln_api_mutex);
        pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
        pthread_mutex_destroy(&the_lnet.ln_lock);
+       pthread_mutex_destroy(&the_lnet.ln_res_lock);
        pthread_cond_destroy(&the_lnet.ln_eq_cond);
 }
 
index 09fbdef..059afa2 100644 (file)
@@ -111,15 +111,15 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
         eq->eq_refcount = 0;
         eq->eq_callback = callback;
 
-        LNET_LOCK();
+       lnet_res_lock();
 
        lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
        cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
-        lnet_eq2handle(handle, eq);
-        return (0);
+       lnet_eq2handle(handle, eq);
+       return 0;
 }
 
 /**
@@ -142,30 +142,30 @@ LNetEQFree(lnet_handle_eq_t eqh)
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
-        eq = lnet_handle2eq(&eqh);
-        if (eq == NULL) {
-                LNET_UNLOCK();
-                return (-ENOENT);
-        }
+       eq = lnet_handle2eq(&eqh);
+       if (eq == NULL) {
+               lnet_res_unlock();
+               return -ENOENT;
+       }
 
-        if (eq->eq_refcount != 0) {
-                CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
-                       eq->eq_refcount);
-                LNET_UNLOCK();
-                return (-EBUSY);
-        }
+       if (eq->eq_refcount != 0) {
+               CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
+                      eq->eq_refcount);
+               lnet_res_unlock();
+               return -EBUSY;
+       }
 
-        /* stash for free after lock dropped */
-        events  = eq->eq_events;
-        size    = eq->eq_size;
+       /* stash for free after lock dropped */
+       events  = eq->eq_events;
+       size    = eq->eq_size;
 
        lnet_res_lh_invalidate(&eq->eq_lh);
-        cfs_list_del (&eq->eq_list);
+       cfs_list_del(&eq->eq_list);
        lnet_eq_free_locked(eq);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
        if (events != NULL)
                LIBCFS_FREE(events, size * sizeof(lnet_event_t));
@@ -309,7 +309,7 @@ lnet_eq_wait_locked(int *timeout_ms)
        cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
        cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        if (tms < 0) {
                cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE);
@@ -329,7 +329,7 @@ lnet_eq_wait_locked(int *timeout_ms)
        wait = tms != 0; /* might need to call here again */
        *timeout_ms = tms;
 
-       LNET_LOCK();
+       lnet_res_lock();
        cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl);
 
        return wait;
@@ -342,10 +342,10 @@ static void
 lnet_eq_cond_wait(struct timespec *ts)
 {
        if (ts == NULL) {
-               pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_lock);
+               pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_res_lock);
        } else {
                pthread_cond_timedwait(&the_lnet.ln_eq_cond,
-                                      &the_lnet.ln_lock, ts);
+                                      &the_lnet.ln_res_lock, ts);
        }
 }
 # endif
@@ -362,9 +362,18 @@ lnet_eq_wait_locked(int *timeout_ms)
        if (the_lnet.ln_eq_waitni != NULL) {
                /* I have a single NI that I have to call into, to get
                 * events queued, or to block. */
+               lnet_res_unlock();
+
+               LNET_LOCK();
                eq_waitni = the_lnet.ln_eq_waitni;
-               lnet_ni_addref_locked(eq_waitni);
+               if (unlikely(eq_waitni == NULL)) {
+                       LNET_UNLOCK();
 
+                       lnet_res_lock();
+                       return -1;
+               }
+
+               lnet_ni_addref_locked(eq_waitni);
                LNET_UNLOCK();
 
                if (tms <= 0) { /* even for tms == 0 */
@@ -382,8 +391,8 @@ lnet_eq_wait_locked(int *timeout_ms)
                                tms = 0;
                }
 
-               LNET_LOCK();
-               lnet_ni_decref_locked(eq_waitni);
+               lnet_ni_decref(eq_waitni);
+               lnet_res_lock();
        } else { /* w/o eq_waitni */
 # ifndef HAVE_LIBPTHREAD
                /* If I'm single-threaded, LNET fails at startup if it can't
@@ -471,34 +480,34 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
         if (neq < 1)
                 RETURN(-ENOENT);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         for (;;) {
 #ifndef __KERNEL__
-                LNET_UNLOCK();
+               lnet_res_unlock();
 
-                /* Recursion breaker */
-                if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
-                    !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
-                        lnet_router_checker();
+               /* Recursion breaker */
+               if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
+                   !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
+                       lnet_router_checker();
 
-                LNET_LOCK();
+               lnet_res_lock();
 #endif
-                for (i = 0; i < neq; i++) {
-                        lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
+               for (i = 0; i < neq; i++) {
+                       lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
 
-                        if (eq == NULL) {
-                                LNET_UNLOCK();
-                                RETURN(-ENOENT);
-                        }
+                       if (eq == NULL) {
+                               lnet_res_unlock();
+                               RETURN(-ENOENT);
+                       }
 
                        rc = lnet_eq_dequeue_event(eq, event);
-                        if (rc != 0) {
-                                LNET_UNLOCK();
-                                *which = i;
-                                RETURN(rc);
-                        }
-                }
+                       if (rc != 0) {
+                               lnet_res_unlock();
+                               *which = i;
+                               RETURN(rc);
+                       }
+               }
 
                if (wait == 0)
                        break;
@@ -515,6 +524,6 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
                        break;
        }
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
        RETURN(0);
 }
index c189fb9..dbb007e 100644 (file)
@@ -40,7 +40,7 @@
 
 #include <lnet/lib-lnet.h>
 
-/* must be called with LNET_LOCK held */
+/* must be called with lnet_res_lock held */
 void
 lnet_md_unlink(lnet_libmd_t *md)
 {
@@ -196,7 +196,7 @@ lnet_md_link(lnet_libmd_t *md, lnet_handle_eq_t eq_handle)
        return 0;
 }
 
-/* must be called with LNET_LOCK held */
+/* must be called with lnet_res_lock held */
 void
 lnet_md_deconstruct(lnet_libmd_t *lmd, lnet_md_t *umd)
 {
@@ -286,7 +286,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
 
        rc = lnet_md_build(md, &umd, unlink);
 
-       LNET_LOCK();
+       lnet_res_lock();
        if (rc != 0)
                goto failed;
 
@@ -307,7 +307,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
 
        lnet_md2handle(handle, md);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_drop_delayed_msg_list(&drops, "Bad match");
        lnet_recv_delayed_msg_list(&matches);
@@ -317,7 +317,7 @@ LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
  failed:
        lnet_md_free_locked(md);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
        return rc;
 }
 
@@ -360,7 +360,7 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
 
        rc = lnet_md_build(md, &umd, unlink);
 
-       LNET_LOCK();
+       lnet_res_lock();
        if (rc != 0)
                goto failed;
 
@@ -370,13 +370,13 @@ LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
 
        lnet_md2handle(handle, md);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
        return 0;
 
  failed:
        lnet_md_free_locked(md);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
        return rc;
 }
 
@@ -418,11 +418,11 @@ LNetMDUnlink (lnet_handle_md_t mdh)
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
-        md = lnet_handle2md(&mdh);
-        if (md == NULL) {
-                LNET_UNLOCK();
+       md = lnet_handle2md(&mdh);
+       if (md == NULL) {
+               lnet_res_unlock();
                 return -ENOENT;
         }
 
@@ -438,6 +438,6 @@ LNetMDUnlink (lnet_handle_md_t mdh)
 
         lnet_md_unlink(md);
 
-        LNET_UNLOCK();
-        return 0;
+       lnet_res_unlock();
+       return 0;
 }
index 72d4514..d4f325d 100644 (file)
@@ -96,7 +96,7 @@ LNetMEAttach(unsigned int portal,
         if (me == NULL)
                 return -ENOMEM;
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         me->me_portal = portal;
         me->me_match_id = match_id;
@@ -116,7 +116,7 @@ LNetMEAttach(unsigned int portal,
 
         lnet_me2handle(handle, me);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
         return 0;
 }
@@ -156,23 +156,23 @@ LNetMEInsert(lnet_handle_me_t current_meh,
         if (new_me == NULL)
                 return -ENOMEM;
 
-        LNET_LOCK();
+       lnet_res_lock();
 
-        current_me = lnet_handle2me(&current_meh);
-        if (current_me == NULL) {
+       current_me = lnet_handle2me(&current_meh);
+       if (current_me == NULL) {
                lnet_me_free_locked(new_me);
 
-                LNET_UNLOCK();
-                return -ENOENT;
-        }
+               lnet_res_unlock();
+               return -ENOENT;
+       }
 
-        LASSERT (current_me->me_portal < the_lnet.ln_nportals);
+       LASSERT(current_me->me_portal < the_lnet.ln_nportals);
 
        ptl = the_lnet.ln_portals[current_me->me_portal];
        if (lnet_ptl_is_unique(ptl)) {
                 /* nosense to insertion on unique portal */
                lnet_me_free_locked(new_me);
-                LNET_UNLOCK();
+               lnet_res_unlock();
                 return -EPERM;
         }
 
@@ -192,9 +192,9 @@ LNetMEInsert(lnet_handle_me_t current_meh,
 
         lnet_me2handle(handle, new_me);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
-        return 0;
+       return 0;
 }
 
 /**
@@ -221,11 +221,11 @@ LNetMEUnlink(lnet_handle_me_t meh)
         LASSERT (the_lnet.ln_init);
         LASSERT (the_lnet.ln_refcount > 0);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         me = lnet_handle2me(&meh);
         if (me == NULL) {
-                LNET_UNLOCK();
+               lnet_res_unlock();
                 return -ENOENT;
         }
 
@@ -239,11 +239,11 @@ LNetMEUnlink(lnet_handle_me_t meh)
 
         lnet_me_unlink(me);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
         return 0;
 }
 
-/* call with LNET_LOCK please */
+/* call with lnet_res_lock please */
 void
 lnet_me_unlink(lnet_me_t *me)
 {
index 0917b76..953507d 100644 (file)
@@ -1486,7 +1486,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
         int               rlength;
         int               mlength;
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         src.nid = hdr->src_nid;
         src.pid = hdr->src_pid;
@@ -1504,7 +1504,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                         CERROR("REPLY MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-                LNET_UNLOCK();
+               lnet_res_unlock();
                 return ENOENT;                  /* +ve: OK but no match */
         }
 
@@ -1520,7 +1520,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
                         mlength);
-                LNET_UNLOCK();
+               lnet_res_unlock();
                 return ENOENT;          /* +ve: OK but no match */
         }
 
@@ -1533,7 +1533,7 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
         if (mlength != 0)
                 lnet_setpayloadbuffer(msg);
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
@@ -1555,7 +1555,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         /* NB handles only looked up by creator (no flips) */
         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
@@ -1571,7 +1571,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
                         CERROR("Source MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-                LNET_UNLOCK();
+               lnet_res_unlock();
                 return ENOENT;                  /* +ve! */
         }
 
@@ -1581,12 +1581,12 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
 
        lnet_msg_attach_md(msg, md, 0, 0);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_build_msg_event(msg, LNET_EVENT_ACK);
 
-        lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
-        return 0;
+       lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
+       return 0;
 }
 
 static int
@@ -2061,12 +2061,10 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
         }
         msg->msg_vmflush = !!cfs_memory_pressure_get();
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
-               lnet_msg_free_locked(msg);
-
                 CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
                        match_bits, portal, libcfs_id2str(target),
                        md == NULL ? -1 : md->md_threshold);
@@ -2074,7 +2072,10 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         CERROR("Source MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-                LNET_UNLOCK();
+               lnet_res_unlock();
+
+               lnet_msg_free(msg);
+
                 return -ENOENT;
         }
 
@@ -2102,7 +2103,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         LNET_WIRE_HANDLE_COOKIE_NONE;
         }
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
@@ -2134,7 +2135,7 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
         LASSERT (!getmsg->msg_target_is_router);
         LASSERT (!getmsg->msg_routing);
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         LASSERT (getmd->md_refcount > 0);
 
@@ -2148,7 +2149,8 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
                 CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n",
                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), 
                         getmd);
-                goto drop_msg;
+               lnet_res_unlock();
+               goto drop;
         }
 
         LASSERT (getmd->md_offset == 0);
@@ -2164,22 +2166,25 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
        msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
 
        lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
-       lnet_msg_commit(msg, 0);
+       lnet_res_unlock();
 
+       LNET_LOCK();
+       lnet_msg_commit(msg, 0);
        LNET_UNLOCK();
 
        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
        return msg;
 
- drop_msg:
-       lnet_msg_free_locked(msg);
  drop:
+       LNET_LOCK();
        the_lnet.ln_counters.drop_count++;
        the_lnet.ln_counters.drop_length += getmd->md_length;
-
        LNET_UNLOCK ();
 
+       if (msg != NULL)
+               lnet_msg_free(msg);
+
        return NULL;
 }
 
@@ -2246,12 +2251,10 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
                 return -ENOMEM;
         }
 
-        LNET_LOCK();
+       lnet_res_lock();
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
-               lnet_msg_free_locked(msg);
-
                 CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
                        match_bits, portal, libcfs_id2str(target),
                        md == NULL ? -1 : md->md_threshold);
@@ -2259,7 +2262,10 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
                         CERROR("REPLY MD also attached to portal %d\n",
                                md->md_me->me_portal);
 
-                LNET_UNLOCK();
+               lnet_res_unlock();
+
+               lnet_msg_free(msg);
+
                 return -ENOENT;
         }
 
@@ -2280,7 +2286,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
                 md->md_lh.lh_cookie;
 
-        LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
index ca8201e..3c16bf7 100644 (file)
@@ -311,9 +311,6 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
        lnet_libmd_t    *md = msg->msg_md;
        int             unlink;
 
-       if (md == NULL)
-               return;
-
        /* Now it's safe to drop my caller's ref */
        md->md_refcount--;
        LASSERT(md->md_refcount >= 0);
@@ -413,23 +410,25 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
 #endif
-        LNET_LOCK();
 
         LASSERT (msg->msg_onactivelist);
 
         msg->msg_ev.status = status;
 
-       if (msg->msg_md != NULL)
+       if (msg->msg_md != NULL) {
+               lnet_res_lock();
                lnet_msg_detach_md(msg, status);
+               lnet_res_unlock();
+       }
 
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
-               LNET_UNLOCK();
                /* not commited to network yet */
                LASSERT(!msg->msg_onactivelist);
                lnet_msg_free(msg);
                return;
        }
 
+       LNET_LOCK();
        container = &the_lnet.ln_msg_container;
        cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
index ca24586..1baadd4 100644 (file)
@@ -56,10 +56,10 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
                goto match;
 
        /* unset, new portal */
-       LNET_LOCK();
+       lnet_res_lock();
        /* check again with lock */
        if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
-               LNET_UNLOCK();
+               lnet_res_unlock();
                goto match;
        }
 
@@ -69,7 +69,7 @@ lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
        else
                lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        return 1;
 
@@ -85,7 +85,7 @@ lnet_try_match_md(int index, int op_mask, lnet_process_id_t src,
                  unsigned int rlength, unsigned int roffset,
                  __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg)
 {
-       /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
+       /* ALWAYS called holding the lnet_res_lock, and can't lnet_res_unlock;
         * lnet_match_blocked_msg() relies on this to avoid races */
        unsigned int    offset;
        unsigned int    mlength;
@@ -292,7 +292,7 @@ lnet_ptl_match_md(unsigned int index, int op_mask, lnet_process_id_t src,
        }
 
        ptl = the_lnet.ln_portals[index];
-       LNET_LOCK();
+       lnet_res_lock();
 
        if (the_lnet.ln_shutdown) {
                rc =  LNET_MATCHMD_DROP;
@@ -316,7 +316,7 @@ lnet_ptl_match_md(unsigned int index, int op_mask, lnet_process_id_t src,
               op_mask == LNET_MD_OP_PUT ? "PUT" : "GET",
               libcfs_id2str(src), index, match_bits, roffset, rlength);
  out:
-       LNET_UNLOCK();
+       lnet_res_unlock();
        return rc;
 }
 
@@ -329,7 +329,7 @@ lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md)
        md->md_me = NULL;
 }
 
-/* called with LNET_LOCK held */
+/* called with lnet_res_lock held */
 void
 lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
                   cfs_list_t *matches, cfs_list_t *drops)
@@ -554,9 +554,9 @@ LNetSetLazyPortal(int portal)
        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
        ptl = the_lnet.ln_portals[portal];
 
-       LNET_LOCK();
+       lnet_res_lock();
        lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        return 0;
 }
@@ -581,10 +581,10 @@ LNetClearLazyPortal(int portal)
 
        ptl = the_lnet.ln_portals[portal];
 
-       LNET_LOCK();
+       lnet_res_lock();
 
        if (!lnet_ptl_is_lazy(ptl)) {
-               LNET_UNLOCK();
+               lnet_res_unlock();
                return 0;
        }
 
@@ -598,7 +598,7 @@ LNetClearLazyPortal(int portal)
 
        lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
 
-       LNET_UNLOCK();
+       lnet_res_unlock();
 
        lnet_drop_delayed_msg_list(&zombies, "Clearing lazy portal attr");
 
index 86d24ad..4cdce48 100644 (file)
@@ -659,7 +659,6 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
 static void
 lnet_router_checker_event(lnet_event_t *event)
 {
-       /* CAVEAT EMPTOR: I'm called with lnet_res_locked */
        lnet_rc_data_t          *rcd = event->md.user_ptr;
        struct lnet_peer        *lp;
 
@@ -676,13 +675,16 @@ lnet_router_checker_event(lnet_event_t *event)
        lp = rcd->rcd_gateway;
        LASSERT(lp != NULL);
 
-       if (!lnet_isrouter(lp)) /* ignore if no longer a router */
-               return;
+       LNET_LOCK();
+       if (!lnet_isrouter(lp) || lp->lp_rcd != rcd) {
+               /* ignore if no longer a router or rcd is replaced */
+               goto out;
+       }
 
        if (event->type == LNET_EVENT_SEND) {
-               lp->lp_ping_notsent = 0; /* NB: re-enable another ping */
+               lp->lp_ping_notsent = 0;
                if (event->status == 0)
-                       return;
+                       goto out;
        }
 
        /* LNET_EVENT_REPLY */
@@ -699,6 +701,9 @@ lnet_router_checker_event(lnet_event_t *event)
 
        if (avoid_asym_router_failure && event->status == 0)
                lnet_parse_rc_info(rcd);
+
+ out:
+       LNET_UNLOCK();
 }
 
 void
@@ -839,6 +844,8 @@ lnet_create_rc_data_locked(lnet_peer_t *gateway)
        lnet_peer_addref_locked(gateway);
        rcd->rcd_gateway = gateway;
        gateway->lp_rcd = rcd;
+       gateway->lp_ping_notsent = 0;
+
        return rcd;
 
  out:
@@ -1549,9 +1556,7 @@ lnet_router_checker (void)
 
                 LASSERT (rc == 1);
 
-                LNET_LOCK();
                 lnet_router_checker_event(&ev);
-                LNET_UNLOCK();
         }
 
        if (the_lnet.ln_rc_state == LNET_RC_STATE_STOPPING) {