Whamcloud - gitweb
LU-56 lnet: cleanup for LNet Event Queue
authorLiang Zhen <liang@whamcloud.com>
Mon, 28 May 2012 01:58:47 +0000 (09:58 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 13:09:01 +0000 (09:09 -0400)
This patch is just a cleanup patch for LNet Event Queue (EQ),
it's an intermediate patch for upcoming LNet SMP improvement
for EQ.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I2761c0c4118fa709a921b4060a7bd5ec6700a3f0
Reviewed-on: http://review.whamcloud.com/2924
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-md.c
lnet/lnet/lib-me.c
lnet/lnet/lib-msg.c

index a3556e6..8a8b7a4 100644 (file)
@@ -680,7 +680,7 @@ int lnet_islocalnid(lnet_nid_t nid);
 int lnet_islocalnet(__u32 net);
 
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
 int lnet_islocalnet(__u32 net);
 
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
-void lnet_enq_event_locked(lnet_eq_t *eq, lnet_event_t *ev);
+void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
index c56df4a..caec262 100644 (file)
@@ -582,19 +582,19 @@ typedef struct
 
 #ifdef __KERNEL__
         cfs_spinlock_t         ln_lock;
 
 #ifdef __KERNEL__
         cfs_spinlock_t         ln_lock;
-        cfs_waitq_t            ln_waitq;
         cfs_mutex_t            ln_api_mutex;
         cfs_mutex_t            ln_lnd_mutex;
         cfs_mutex_t            ln_api_mutex;
         cfs_mutex_t            ln_lnd_mutex;
+       cfs_waitq_t                     ln_eq_waitq;
 #else
 # ifndef HAVE_LIBPTHREAD
         int                    ln_lock;
         int                    ln_api_mutex;
         int                    ln_lnd_mutex;
 # else
 #else
 # ifndef HAVE_LIBPTHREAD
         int                    ln_lock;
         int                    ln_api_mutex;
         int                    ln_lnd_mutex;
 # else
-        pthread_cond_t         ln_cond;
         pthread_mutex_t        ln_lock;
         pthread_mutex_t        ln_api_mutex;
         pthread_mutex_t        ln_lnd_mutex;
         pthread_mutex_t        ln_lock;
         pthread_mutex_t        ln_api_mutex;
         pthread_mutex_t        ln_lnd_mutex;
+       pthread_cond_t                  ln_eq_cond;
 # endif
 #endif
        /* ME container  */
 # endif
 #endif
        /* ME container  */
@@ -614,7 +614,9 @@ typedef struct
 
         cfs_list_t             ln_nis;              /* LND instances */
         lnet_ni_t             *ln_loni;             /* the loopback NI */
 
         cfs_list_t             ln_nis;              /* LND instances */
         lnet_ni_t             *ln_loni;             /* the loopback NI */
-        lnet_ni_t             *ln_eqwaitni;         /* NI to wait for events in */
+       /* NI to wait for events in */
+       lnet_ni_t                       *ln_eq_waitni;
+
         cfs_list_t             ln_zombie_nis;       /* dying LND instances */
         int                    ln_nzombie_nis;      /* # of NIs to wait for */
 
         cfs_list_t             ln_zombie_nis;       /* dying LND instances */
         int                    ln_nzombie_nis;      /* # of NIs to wait for */
 
index a84e867..720682d 100644 (file)
@@ -91,10 +91,10 @@ lnet_get_networks(void)
 void
 lnet_init_locks(void)
 {
 void
 lnet_init_locks(void)
 {
-        cfs_spin_lock_init (&the_lnet.ln_lock);
-        cfs_waitq_init (&the_lnet.ln_waitq);
-        cfs_mutex_init(&the_lnet.ln_lnd_mutex);
-        cfs_mutex_init(&the_lnet.ln_api_mutex);
+       cfs_spin_lock_init (&the_lnet.ln_lock);
+       cfs_waitq_init(&the_lnet.ln_eq_waitq);
+       cfs_mutex_init(&the_lnet.ln_lnd_mutex);
+       cfs_mutex_init(&the_lnet.ln_api_mutex);
 }
 
 void
 }
 
 void
@@ -189,18 +189,18 @@ void lnet_fini_locks(void)
 
 void lnet_init_locks(void)
 {
 
 void lnet_init_locks(void)
 {
-        pthread_cond_init(&the_lnet.ln_cond, NULL);
-        pthread_mutex_init(&the_lnet.ln_lock, NULL);
-        pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
-        pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
+       pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
+       pthread_mutex_init(&the_lnet.ln_lock, NULL);
+       pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
+       pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
 }
 
 void lnet_fini_locks(void)
 {
 }
 
 void lnet_fini_locks(void)
 {
-        pthread_mutex_destroy(&the_lnet.ln_api_mutex);
-        pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
-        pthread_mutex_destroy(&the_lnet.ln_lock);
-        pthread_cond_destroy(&the_lnet.ln_cond);
+       pthread_mutex_destroy(&the_lnet.ln_api_mutex);
+       pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
+       pthread_mutex_destroy(&the_lnet.ln_lock);
+       pthread_cond_destroy(&the_lnet.ln_eq_cond);
 }
 
 # endif
 }
 
 # endif
@@ -878,10 +878,10 @@ lnet_shutdown_lndnis (void)
         }
 
         /* Drop the cached eqwait NI. */
         }
 
         /* Drop the cached eqwait NI. */
-        if (the_lnet.ln_eqwaitni != NULL) {
-                lnet_ni_decref_locked(the_lnet.ln_eqwaitni);
-                the_lnet.ln_eqwaitni = NULL;
-        }
+       if (the_lnet.ln_eq_waitni != NULL) {
+               lnet_ni_decref_locked(the_lnet.ln_eq_waitni);
+               the_lnet.ln_eq_waitni = NULL;
+       }
 
         /* Drop the cached loopback NI. */
         if (the_lnet.ln_loni != NULL) {
 
         /* Drop the cached loopback NI. */
         if (the_lnet.ln_loni != NULL) {
@@ -1058,10 +1058,10 @@ lnet_startup_lndnis (void)
 
 #ifndef __KERNEL__
                 if (lnd->lnd_wait != NULL) {
 
 #ifndef __KERNEL__
                 if (lnd->lnd_wait != NULL) {
-                        if (the_lnet.ln_eqwaitni == NULL) {
-                                lnet_ni_addref(ni);
-                                the_lnet.ln_eqwaitni = ni;
-                        }
+                       if (the_lnet.ln_eq_waitni == NULL) {
+                               lnet_ni_addref(ni);
+                               the_lnet.ln_eq_waitni = ni;
+                       }
                 } else {
 # ifndef HAVE_LIBPTHREAD
                         LCONSOLE_ERROR_MSG(0x106, "LND %s not supported in a "
                 } else {
 # ifndef HAVE_LIBPTHREAD
                         LCONSOLE_ERROR_MSG(0x106, "LND %s not supported in a "
@@ -1090,8 +1090,8 @@ lnet_startup_lndnis (void)
                 nicount++;
         }
 
                 nicount++;
         }
 
-        if (the_lnet.ln_eqwaitni != NULL && nicount > 1) {
-                lnd_type = the_lnet.ln_eqwaitni->ni_lnd->lnd_type;
+       if (the_lnet.ln_eq_waitni != NULL && nicount > 1) {
+               lnd_type = the_lnet.ln_eq_waitni->ni_lnd->lnd_type;
                 LCONSOLE_ERROR_MSG(0x109, "LND %s can only run single-network"
                                    "\n",
                                    libcfs_lnd2str(lnd_type));
                 LCONSOLE_ERROR_MSG(0x109, "LND %s can only run single-network"
                                    "\n",
                                    libcfs_lnd2str(lnd_type));
index 4f2e3f8..87317ac 100644 (file)
@@ -170,17 +170,51 @@ LNetEQFree(lnet_handle_eq_t eqh)
         return 0;
 }
 
         return 0;
 }
 
+void
+lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
+{
+       lnet_event_t    *eq_slot;
+
+       /* Allocate the next queue slot */
+       ev->sequence = eq->eq_enq_seq++;
+
+       /* size must be a power of 2 to handle sequence # overflow */
+       LASSERT(eq->eq_size != 0 &&
+               eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
+       eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
+
+       /* There is no race since both event consumers and event producers
+        * take the LNET_LOCK, so we don't screw around with memory
+        * barriers, setting the sequence number last or weird structure
+        * layout assertions. */
+       *eq_slot = *ev;
+
+       /* Call the callback handler (if any) */
+       if (eq->eq_callback != NULL)
+               eq->eq_callback(eq_slot);
+
+#ifdef __KERNEL__
+       /* Wake anyone waiting in LNetEQPoll() */
+       if (cfs_waitq_active(&the_lnet.ln_eq_waitq))
+               cfs_waitq_broadcast(&the_lnet.ln_eq_waitq);
+#else
+# ifndef HAVE_LIBPTHREAD
+       /* LNetEQPoll() calls into _the_ LND to wait for action */
+# else
+       /* Wake anyone waiting in LNetEQPoll() */
+       pthread_cond_broadcast(&the_lnet.ln_eq_cond);
+# endif
+#endif
+}
+
 int
 int
-lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
+lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev)
 {
         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
         lnet_event_t *new_event = &eq->eq_events[new_index];
         int           rc;
         ENTRY;
 
 {
         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
         lnet_event_t *new_event = &eq->eq_events[new_index];
         int           rc;
         ENTRY;
 
-        CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
-               new_event, eq->eq_deq_seq, eq->eq_size);
-
         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
                 RETURN(0);
         }
         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
                 RETURN(0);
         }
@@ -188,6 +222,9 @@ lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
         /* We've got a new event... */
         *ev = *new_event;
 
         /* We've got a new event... */
         *ev = *new_event;
 
+       CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
+              new_event, eq->eq_deq_seq, eq->eq_size);
+
         /* ...but did it overwrite an event we've not seen yet? */
         if (eq->eq_deq_seq == new_event->sequence) {
                 rc = 1;
         /* ...but did it overwrite an event we've not seen yet? */
         if (eq->eq_deq_seq == new_event->sequence) {
                 rc = 1;
@@ -253,6 +290,144 @@ LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
                          event, &which);
 }
 
                          event, &which);
 }
 
+#ifdef __KERNEL__
+
+static int
+lnet_eq_wait_locked(int *timeout_ms)
+{
+       int              tms = *timeout_ms;
+       int              wait;
+       cfs_waitlink_t   wl;
+       cfs_time_t       now;
+
+       if (tms == 0)
+               return -1; /* don't want to wait and no new event */
+
+       cfs_waitlink_init(&wl);
+       cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
+       cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl);
+
+       LNET_UNLOCK();
+
+       if (tms < 0) {
+               cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE);
+
+       } else {
+               struct timeval tv;
+
+               now = cfs_time_current();
+               cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
+                                   cfs_time_seconds(tms) / 1000);
+               cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), &tv);
+               tms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
+               if (tms < 0) /* no more wait but may have new event */
+                       tms = 0;
+       }
+
+       wait = tms != 0; /* might need to call here again */
+       *timeout_ms = tms;
+
+       LNET_LOCK();
+       cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl);
+
+       return wait;
+}
+
+#else /* !__KERNEL__ */
+
+# ifdef HAVE_LIBPTHREAD
+static void
+lnet_eq_cond_wait(struct timespec *ts)
+{
+       if (ts == NULL) {
+               pthread_cond_wait(&the_lnet.ln_eq_cond, &the_lnet.ln_lock);
+       } else {
+               pthread_cond_timedwait(&the_lnet.ln_eq_cond,
+                                      &the_lnet.ln_lock, ts);
+       }
+}
+# endif
+
+static int
+lnet_eq_wait_locked(int *timeout_ms)
+{
+       lnet_ni_t         *eq_waitni = NULL;
+       int                tms = *timeout_ms;
+       int                wait;
+       struct timeval     then;
+       struct timeval     now;
+
+       if (the_lnet.ln_eq_waitni != NULL) {
+               /* I have a single NI that I have to call into, to get
+                * events queued, or to block. */
+               eq_waitni = the_lnet.ln_eq_waitni;
+               lnet_ni_addref_locked(eq_waitni);
+
+               LNET_UNLOCK();
+
+               if (tms <= 0) { /* even for tms == 0 */
+                       (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms);
+
+               } else {
+                       gettimeofday(&then, NULL);
+
+                       (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms);
+
+                       gettimeofday(&now, NULL);
+                       tms -= (now.tv_sec - then.tv_sec) * 1000 +
+                              (now.tv_usec - then.tv_usec) / 1000;
+                       if (tms < 0)
+                               tms = 0;
+               }
+
+               LNET_LOCK();
+               lnet_ni_decref_locked(eq_waitni);
+       } else { /* w/o eq_waitni */
+# ifndef HAVE_LIBPTHREAD
+               /* If I'm single-threaded, LNET fails at startup if it can't
+                * set the_lnet.ln_eqwaitni correctly.  */
+               LBUG();
+# else /* HAVE_LIBPTHREAD */
+               struct timespec  ts;
+
+               if (tms == 0) /* don't want to wait and new event */
+                       return -1;
+
+               if (tms < 0) {
+                       lnet_eq_cond_wait(NULL);
+
+               } else {
+
+                       gettimeofday(&then, NULL);
+
+                       ts.tv_sec = then.tv_sec + tms / 1000;
+                       ts.tv_nsec = then.tv_usec * 1000 +
+                                    (tms % 1000) * 1000000;
+                       if (ts.tv_nsec >= 1000000000) {
+                               ts.tv_sec++;
+                               ts.tv_nsec -= 1000000000;
+                       }
+
+                       lnet_eq_cond_wait(&ts);
+
+                       gettimeofday(&now, NULL);
+                       tms -= (now.tv_sec - then.tv_sec) * 1000 +
+                              (now.tv_usec - then.tv_usec) / 1000;
+                       if (tms < 0)
+                               tms = 0;
+               }
+# endif /* HAVE_LIBPTHREAD */
+       }
+
+       wait = tms != 0;
+       *timeout_ms = tms;
+
+       return wait;
+}
+
+#endif /* __KERNEL__ */
+
+
 /**
  * Block the calling process until there's an event from a set of EQs or
  * timeout happens.
 /**
  * Block the calling process until there's an event from a set of EQs or
  * timeout happens.
@@ -283,19 +458,9 @@ int
 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
             lnet_event_t *event, int *which)
 {
 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
             lnet_event_t *event, int *which)
 {
-        int              i;
-        int              rc;
-#ifdef __KERNEL__
-        cfs_waitlink_t   wl;
-        cfs_time_t       now;
-#else
-        struct timeval   then;
-        struct timeval   now;
-# ifdef HAVE_LIBPTHREAD
-        struct timespec  ts;
-# endif
-        lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
-#endif
+       int     wait = 1;
+       int     rc;
+       int     i;
         ENTRY;
 
         LASSERT (the_lnet.ln_init);
         ENTRY;
 
         LASSERT (the_lnet.ln_init);
@@ -325,7 +490,7 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
                                 RETURN(-ENOENT);
                         }
 
                                 RETURN(-ENOENT);
                         }
 
-                        rc = lib_get_event (eq, event);
+                       rc = lnet_eq_dequeue_event(eq, event);
                         if (rc != 0) {
                                 LNET_UNLOCK();
                                 *which = i;
                         if (rc != 0) {
                                 LNET_UNLOCK();
                                 *which = i;
@@ -333,102 +498,21 @@ LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
                         }
                 }
 
                         }
                 }
 
-#ifdef __KERNEL__
-                if (timeout_ms == 0) {
-                        LNET_UNLOCK();
-                        RETURN (0);
-                }
-
-                cfs_waitlink_init(&wl);
-                cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
-                cfs_waitq_add(&the_lnet.ln_waitq, &wl);
-
-                LNET_UNLOCK();
-
-                if (timeout_ms < 0) {
-                        cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
-                } else {
-                        struct timeval tv;
-
-                        now = cfs_time_current();
-                        cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
-                                            cfs_time_seconds(timeout_ms)/1000);
-                        cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
-                                            &tv);
-                        timeout_ms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
-                        if (timeout_ms < 0)
-                                timeout_ms = 0;
-                }
-
-                LNET_LOCK();
-                cfs_waitq_del(&the_lnet.ln_waitq, &wl);
-#else
-                if (eqwaitni != NULL) {
-                        /* I have a single NI that I have to call into, to get
-                         * events queued, or to block. */
-                        lnet_ni_addref_locked(eqwaitni);
-                        LNET_UNLOCK();
-
-                        if (timeout_ms <= 0) {
-                                (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
-                        } else {
-                                gettimeofday(&then, NULL);
-
-                                (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
-
-                                gettimeofday(&now, NULL);
-                                timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
-                                              (now.tv_usec - then.tv_usec) / 1000;
-                                if (timeout_ms < 0)
-                                        timeout_ms = 0;
-                        }
-
-                        LNET_LOCK();
-                        lnet_ni_decref_locked(eqwaitni);
-
-                        /* don't call into eqwaitni again if timeout has
-                         * expired */
-                        if (timeout_ms == 0)
-                                eqwaitni = NULL;
-
-                        continue;               /* go back and check for events */
-                }
-
-                if (timeout_ms == 0) {
-                        LNET_UNLOCK();
-                        RETURN (0);
-                }
-
-# ifndef HAVE_LIBPTHREAD
-                /* If I'm single-threaded, LNET fails at startup if it can't
-                 * set the_lnet.ln_eqwaitni correctly.  */
-                LBUG();
-# else
-                if (timeout_ms < 0) {
-                        pthread_cond_wait(&the_lnet.ln_cond,
-                                          &the_lnet.ln_lock);
-                } else {
-                        gettimeofday(&then, NULL);
-
-                        ts.tv_sec = then.tv_sec + timeout_ms/1000;
-                        ts.tv_nsec = then.tv_usec * 1000 +
-                                     (timeout_ms%1000) * 1000000;
-                        if (ts.tv_nsec >= 1000000000) {
-                                ts.tv_sec++;
-                                ts.tv_nsec -= 1000000000;
-                        }
-
-                        pthread_cond_timedwait(&the_lnet.ln_cond,
-                                               &the_lnet.ln_lock, &ts);
-
-                        gettimeofday(&now, NULL);
-                        timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
-                                      (now.tv_usec - then.tv_usec) / 1000;
-
-                        if (timeout_ms < 0)
-                                timeout_ms = 0;
-                }
-# endif
-#endif
-        }
+               if (wait == 0)
+                       break;
+
+               /*
+                * return value of lnet_eq_wait_locked:
+                * -1 : did nothing and it's sure no new event
+                *  1 : sleep inside and wait until new event
+                *  0 : don't want to wait anymore, but might have new event
+                *      so need to call dequeue again
+                */
+               wait = lnet_eq_wait_locked(&timeout_ms);
+               if (wait < 0) /* no new event */
+                       break;
+       }
+
+       LNET_UNLOCK();
+       RETURN(0);
 }
 }
index be5b9e1..821178d 100644 (file)
@@ -415,7 +415,7 @@ LNetMDUnlink (lnet_handle_md_t mdh)
         if (md->md_eq != NULL &&
             md->md_refcount == 0) {
                 lnet_build_unlink_event(md, &ev);
         if (md->md_eq != NULL &&
             md->md_refcount == 0) {
                 lnet_build_unlink_event(md, &ev);
-                lnet_enq_event_locked(md->md_eq, &ev);
+               lnet_eq_enqueue_event(md->md_eq, &ev);
         }
 
         lnet_md_unlink(md);
         }
 
         lnet_md_unlink(md);
index 2ea5309..31a8575 100644 (file)
@@ -289,7 +289,7 @@ LNetMEUnlink(lnet_handle_me_t meh)
             md->md_eq != NULL &&
             md->md_refcount == 0) {
                 lnet_build_unlink_event(md, &ev);
             md->md_eq != NULL &&
             md->md_refcount == 0) {
                 lnet_build_unlink_event(md, &ev);
-                lnet_enq_event_locked(md->md_eq, &ev);
+               lnet_eq_enqueue_event(md->md_eq, &ev);
         }
 
         lnet_me_unlink(me);
         }
 
         lnet_me_unlink(me);
index a25fae2..481b3bc 100644 (file)
@@ -56,43 +56,6 @@ lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
 }
 
 void
 }
 
 void
-lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
-{
-        lnet_event_t  *eq_slot;
-
-        /* Allocate the next queue slot */
-        ev->sequence = eq->eq_enq_seq++;
-
-        /* size must be a power of 2 to handle sequence # overflow */
-        LASSERT (eq->eq_size != 0 &&
-                 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
-        eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
-
-        /* There is no race since both event consumers and event producers
-         * take the LNET_LOCK, so we don't screw around with memory
-         * barriers, setting the sequence number last or weird structure
-         * layout assertions. */
-        *eq_slot = *ev;
-
-        /* Call the callback handler (if any) */
-        if (eq->eq_callback != NULL)
-                eq->eq_callback (eq_slot);
-
-#ifdef __KERNEL__
-        /* Wake anyone waiting in LNetEQPoll() */
-        if (cfs_waitq_active(&the_lnet.ln_waitq))
-                cfs_waitq_broadcast(&the_lnet.ln_waitq);
-#else
-# ifndef HAVE_LIBPTHREAD
-        /* LNetEQPoll() calls into _the_ LND to wait for action */
-# else
-        /* Wake anyone waiting in LNetEQPoll() */
-        pthread_cond_broadcast(&the_lnet.ln_cond);
-# endif
-#endif
-}
-
-void
 lnet_complete_msg_locked(lnet_msg_t *msg)
 {
         lnet_handle_wire_t ack_wmd;
 lnet_complete_msg_locked(lnet_msg_t *msg)
 {
         lnet_handle_wire_t ack_wmd;
@@ -199,7 +162,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                 msg->msg_ev.unlinked = unlink;
 
                 if (md->md_eq != NULL)
                 msg->msg_ev.unlinked = unlink;
 
                 if (md->md_eq != NULL)
-                        lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
+                       lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
 
                 if (unlink)
                         lnet_md_unlink(md);
 
                 if (unlink)
                         lnet_md_unlink(md);