Whamcloud - gitweb
LU-13005 lnet: eq: discard struct lnet_handle_eq 41/36841/5
authorMr NeilBrown <neilb@suse.de>
Wed, 20 Nov 2019 00:16:34 +0000 (11:16 +1100)
committerOleg Drokin <green@whamcloud.com>
Tue, 25 Feb 2020 05:51:08 +0000 (05:51 +0000)
The Portals API uses a cookie 'handle' to identify an EQ.  This is
appropriate for a user-space API for objects maintained by the kernel,
but it brings no value when the API client and implementation are both
in the kernel, as is the case with Lustre and LNet.

Instead of using a 'handle', a pointer to the 'struct lnet_eq` can be
used.  This object is not reference counted and is always freed
correctly, so there can be no case where the cookie becomes invalid
while it is still held.

So use 'struct lnet_eq *' directly instead of having indirection
through a 'struct lnet_handle_eq'.
Also:
 - have LNetEQAttach() return the pointer, using ERR_PTR() to return
   errors.
 - discard ln_eq_containers and don't store the me there-in.
   This means we don't free any eq that have not already been freed,
   but all eq that are allocated are properly freed, so that is not
   a problem.

Signed-off-by: Mr NeilBrown <neilb@suse.de>
Change-Id: I0d6e5b654e39e749b39d46f68d0fb3e47a3256e9
Reviewed-on: https://review.whamcloud.com/36841
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Chris Horn <chris.horn@hpe.com>
Reviewed-by: Amir Shehata <ashehata@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
13 files changed:
lnet/include/lnet/api.h
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/include/uapi/linux/lnet/lnet-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-md.c
lnet/lnet/lib-move.c
lnet/lnet/peer.c
lnet/selftest/rpc.c
lustre/include/lustre_net.h
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c

index f84016a..d89294a 100644 (file)
@@ -140,7 +140,7 @@ int LNetMDUnlink(struct lnet_handle_md md_in);
  * associated with it. If an event handler exists, it will be run for each
  * event that is deposited into the EQ.
  *
  * associated with it. If an event handler exists, it will be run for each
  * event that is deposited into the EQ.
  *
- * In addition to the struct lnet_handle_eq, the LNet API defines two types
+ * In addition to the struct lnet_eq, the LNet API defines two types
  * associated with events: The ::lnet_event_kind defines the kinds of events
  * that can be stored in an EQ. The struct lnet_event defines a structure that
  * holds the information about with an event.
  * associated with events: The ::lnet_event_kind defines the kinds of events
  * that can be stored in an EQ. The struct lnet_event defines a structure that
  * holds the information about with an event.
@@ -150,17 +150,17 @@ int LNetMDUnlink(struct lnet_handle_md md_in);
  * releases these resources and free the EQ.  LNetEQPoll() can be used
  * to test or wait on multiple EQs.
  * @{ */
  * releases these resources and free the EQ.  LNetEQPoll() can be used
  * to test or wait on multiple EQs.
  * @{ */
-int LNetEQAlloc(unsigned int      count_in,
-               lnet_eq_handler_t  handler,
-               struct lnet_handle_eq *handle_out);
+struct lnet_eq *
+LNetEQAlloc(unsigned int count_in,
+           lnet_eq_handler_t handler);
 
 
-int LNetEQFree(struct lnet_handle_eq eventq_in);
+int LNetEQFree(struct lnet_eq *eventq_in);
 
 
-int LNetEQPoll(struct lnet_handle_eq *eventqs_in,
-              int               neq_in,
-              signed long       timeout,
+int LNetEQPoll(struct lnet_eq **eventqs_in,
+              int neq_in,
+              signed long timeout,
               struct lnet_event *event_out,
               struct lnet_event *event_out,
-              int              *which_eq_out);
+              int *which_eq_out);
 /** @} lnet_eq */
 
 /** \defgroup lnet_data Data movement operations
 /** @} lnet_eq */
 
 /** \defgroup lnet_data Data movement operations
index 059a5b4..96aa805 100644 (file)
@@ -308,30 +308,6 @@ lnet_res_lh_invalidate(struct lnet_libhandle *lh)
 }
 
 static inline void
 }
 
 static inline void
-lnet_eq2handle(struct lnet_handle_eq *handle, struct lnet_eq *eq)
-{
-       if (eq == NULL) {
-               LNetInvalidateEQHandle(handle);
-               return;
-       }
-
-       handle->cookie = eq->eq_lh.lh_cookie;
-}
-
-static inline struct lnet_eq *
-lnet_handle2eq(struct lnet_handle_eq *handle)
-{
-       /* ALWAYS called with resource lock held */
-       struct lnet_libhandle *lh;
-
-       lh = lnet_res_lh_lookup(&the_lnet.ln_eq_container, handle->cookie);
-       if (lh == NULL)
-               return NULL;
-
-       return lh_entry(lh, struct lnet_eq, eq_lh);
-}
-
-static inline void
 lnet_md2handle(struct lnet_handle_md *handle, struct lnet_libmd *md)
 {
        handle->cookie = md->md_lh.lh_cookie;
 lnet_md2handle(struct lnet_handle_md *handle, struct lnet_libmd *md)
 {
        handle->cookie = md->md_lh.lh_cookie;
@@ -617,7 +593,7 @@ void lnet_prep_send(struct lnet_msg *msg, int type,
                    unsigned int len);
 int lnet_send(lnet_nid_t nid, struct lnet_msg *msg, lnet_nid_t rtr_nid);
 int lnet_send_ping(lnet_nid_t dest_nid, struct lnet_handle_md *mdh, int nnis,
                    unsigned int len);
 int lnet_send(lnet_nid_t nid, struct lnet_msg *msg, lnet_nid_t rtr_nid);
 int lnet_send_ping(lnet_nid_t dest_nid, struct lnet_handle_md *mdh, int nnis,
-                  void *user_ptr, struct lnet_handle_eq eqh, bool recovery);
+                  void *user_ptr, struct lnet_eq *eq, bool recovery);
 void lnet_return_tx_credits_locked(struct lnet_msg *msg);
 void lnet_return_rx_credits_locked(struct lnet_msg *msg);
 void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
 void lnet_return_tx_credits_locked(struct lnet_msg *msg);
 void lnet_return_rx_credits_locked(struct lnet_msg *msg);
 void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
index 5b8b8d8..f6aa0d7 100644 (file)
@@ -183,8 +183,6 @@ struct lnet_libhandle {
        ((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
 
 struct lnet_eq {
        ((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
 
 struct lnet_eq {
-       struct list_head        eq_list;
-       struct lnet_libhandle   eq_lh;
        unsigned long           eq_enq_seq;
        unsigned long           eq_deq_seq;
        unsigned int            eq_size;
        unsigned long           eq_enq_seq;
        unsigned long           eq_deq_seq;
        unsigned int            eq_size;
@@ -1056,7 +1054,7 @@ struct lnet {
         * ln_api_mutex.
         */
        struct lnet_handle_md           ln_ping_target_md;
         * ln_api_mutex.
         */
        struct lnet_handle_md           ln_ping_target_md;
-       struct lnet_handle_eq           ln_ping_target_eq;
+       struct lnet_eq                  *ln_ping_target_eq;
        struct lnet_ping_buffer         *ln_ping_target;
        atomic_t                        ln_ping_target_seqno;
 
        struct lnet_ping_buffer         *ln_ping_target;
        atomic_t                        ln_ping_target_seqno;
 
@@ -1068,13 +1066,13 @@ struct lnet {
         * buffer may linger a while after it has been unlinked, in
         * which case the event handler cleans up.
         */
         * buffer may linger a while after it has been unlinked, in
         * which case the event handler cleans up.
         */
-       struct lnet_handle_eq           ln_push_target_eq;
+       struct lnet_eq                  *ln_push_target_eq;
        struct lnet_handle_md           ln_push_target_md;
        struct lnet_ping_buffer         *ln_push_target;
        int                             ln_push_target_nnis;
 
        /* discovery event queue handle */
        struct lnet_handle_md           ln_push_target_md;
        struct lnet_ping_buffer         *ln_push_target;
        int                             ln_push_target_nnis;
 
        /* discovery event queue handle */
-       struct lnet_handle_eq           ln_dc_eqh;
+       struct lnet_eq                  *ln_dc_eq;
        /* discovery requests */
        struct list_head                ln_dc_request;
        /* discovery working list */
        /* discovery requests */
        struct list_head                ln_dc_request;
        /* discovery working list */
@@ -1145,7 +1143,7 @@ struct lnet {
         */
        struct list_head                **ln_mt_zombie_rstqs;
        /* recovery eq handler */
         */
        struct list_head                **ln_mt_zombie_rstqs;
        /* recovery eq handler */
-       struct lnet_handle_eq           ln_mt_eqh;
+       struct lnet_eq                  *ln_mt_eq;
 
        /*
         * Completed when the discovery and monitor threads can enter their
 
        /*
         * Completed when the discovery and monitor threads can enter their
index 8a48f49..e6ee09f 100644 (file)
@@ -328,28 +328,6 @@ struct lnet_ping_info {
  */
 #define LNET_WIRE_HANDLE_COOKIE_NONE   (-1)
 
  */
 #define LNET_WIRE_HANDLE_COOKIE_NONE   (-1)
 
-struct lnet_handle_eq {
-       __u64   cookie;
-};
-
-/**
- * Invalidate eq handle \a h.
- */
-static inline void LNetInvalidateEQHandle(struct lnet_handle_eq *h)
-{
-       h->cookie = LNET_WIRE_HANDLE_COOKIE_NONE;
-}
-
-/**
- * Check whether eq handle \a h is invalid.
- *
- * \return 1 if handle is invalid, 0 if valid.
- */
-static inline int LNetEQHandleIsInvalid(struct lnet_handle_eq h)
-{
-       return (LNET_WIRE_HANDLE_COOKIE_NONE == h.cookie);
-}
-
 struct lnet_handle_md {
        __u64   cookie;
 };
 struct lnet_handle_md {
        __u64   cookie;
 };
@@ -510,11 +488,10 @@ struct lnet_md {
        void            *user_ptr;
        /**
         * A handle for the event queue used to log the operations performed on
        void            *user_ptr;
        /**
         * A handle for the event queue used to log the operations performed on
-        * the memory region. If this argument is a NULL handle (i.e. nullified
-        * by LNetInvalidateHandle()), operations performed on this memory
-        * descriptor are not logged.
+        * the memory region. If this argument is a NULL handle operations
+        * performed on this memory descriptor are not logged.
         */
         */
-       struct lnet_handle_eq eq_handle;
+       struct lnet_eq *eq_handle;
        /**
         * The bulk MD handle which was registered to describe the buffers
         * either to be used to transfer data to the peer or receive data
        /**
         * The bulk MD handle which was registered to describe the buffers
         * either to be used to transfer data to the peer or receive data
index f580a89..2ee5970 100644 (file)
@@ -979,10 +979,7 @@ lnet_res_container_cleanup(struct lnet_res_container *rec)
                struct list_head *e = rec->rec_active.next;
 
                list_del_init(e);
                struct list_head *e = rec->rec_active.next;
 
                list_del_init(e);
-               if (rec->rec_type == LNET_COOKIE_TYPE_EQ) {
-                       lnet_eq_free(list_entry(e, struct lnet_eq, eq_list));
-
-               } else if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
+               if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
                        lnet_md_free(list_entry(e, struct lnet_libmd, md_list));
 
                } else { /* NB: Active MEs should be attached on portals */
                        lnet_md_free(list_entry(e, struct lnet_libmd, md_list));
 
                } else { /* NB: Active MEs should be attached on portals */
@@ -1170,7 +1167,7 @@ lnet_prepare(lnet_pid_t requested_pid)
        INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
        INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
        init_waitqueue_head(&the_lnet.ln_dc_waitq);
        INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
        INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
        init_waitqueue_head(&the_lnet.ln_dc_waitq);
-       LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+       the_lnet.ln_mt_eq = NULL;
        init_completion(&the_lnet.ln_started);
 
        rc = lnet_slab_setup();
        init_completion(&the_lnet.ln_started);
 
        rc = lnet_slab_setup();
@@ -1256,9 +1253,9 @@ lnet_unprepare (void)
                the_lnet.ln_mt_zombie_rstqs = NULL;
        }
 
                the_lnet.ln_mt_zombie_rstqs = NULL;
        }
 
-       if (!LNetEQHandleIsInvalid(the_lnet.ln_mt_eqh)) {
-               rc = LNetEQFree(the_lnet.ln_mt_eqh);
-               LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+       if (the_lnet.ln_mt_eq) {
+               rc = LNetEQFree(the_lnet.ln_mt_eq);
+               the_lnet.ln_mt_eq = NULL;
                LASSERT(rc == 0);
        }
 
                LASSERT(rc == 0);
        }
 
@@ -1683,9 +1680,10 @@ lnet_ping_target_setup(struct lnet_ping_buffer **ppbuf,
        int rc, rc2;
 
        if (set_eq) {
        int rc, rc2;
 
        if (set_eq) {
-               rc = LNetEQAlloc(0, lnet_ping_target_event_handler,
-                                &the_lnet.ln_ping_target_eq);
-               if (rc != 0) {
+               the_lnet.ln_ping_target_eq =
+                       LNetEQAlloc(0, lnet_ping_target_event_handler);
+               if (IS_ERR(the_lnet.ln_ping_target_eq)) {
+                       rc = PTR_ERR(the_lnet.ln_ping_target_eq);
                        CERROR("Can't allocate ping buffer EQ: %d\n", rc);
                        return rc;
                }
                        CERROR("Can't allocate ping buffer EQ: %d\n", rc);
                        return rc;
                }
@@ -1948,9 +1946,10 @@ static int lnet_push_target_init(void)
        if (the_lnet.ln_push_target)
                return -EALREADY;
 
        if (the_lnet.ln_push_target)
                return -EALREADY;
 
-       rc = LNetEQAlloc(0, lnet_push_target_event_handler,
-                        &the_lnet.ln_push_target_eq);
-       if (rc) {
+       the_lnet.ln_push_target_eq =
+               LNetEQAlloc(0, lnet_push_target_event_handler);
+       if (IS_ERR(the_lnet.ln_push_target_eq)) {
+               rc = PTR_ERR(the_lnet.ln_push_target_eq);
                CERROR("Can't allocated push target EQ: %d\n", rc);
                return rc;
        }
                CERROR("Can't allocated push target EQ: %d\n", rc);
                return rc;
        }
@@ -1962,7 +1961,7 @@ static int lnet_push_target_init(void)
 
        if (rc) {
                LNetEQFree(the_lnet.ln_push_target_eq);
 
        if (rc) {
                LNetEQFree(the_lnet.ln_push_target_eq);
-               LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
+               the_lnet.ln_push_target_eq = NULL;
        }
 
        return rc;
        }
 
        return rc;
@@ -1989,7 +1988,7 @@ static void lnet_push_target_fini(void)
        the_lnet.ln_push_target_nnis = 0;
 
        LNetEQFree(the_lnet.ln_push_target_eq);
        the_lnet.ln_push_target_nnis = 0;
 
        LNetEQFree(the_lnet.ln_push_target_eq);
-       LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
+       the_lnet.ln_push_target_eq = NULL;
 }
 
 static int
 }
 
 static int
@@ -2649,8 +2648,9 @@ LNetNIInit(lnet_pid_t requested_pid)
 
        lnet_ping_target_update(pbuf, ping_mdh);
 
 
        lnet_ping_target_update(pbuf, ping_mdh);
 
-       rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
-       if (rc != 0) {
+       the_lnet.ln_mt_eq = LNetEQAlloc(0, lnet_mt_event_handler);
+       if (IS_ERR(the_lnet.ln_mt_eq)) {
+               rc = PTR_ERR(the_lnet.ln_mt_eq);
                CERROR("Can't allocate monitor thread EQ: %d\n", rc);
                goto err_stop_ping;
        }
                CERROR("Can't allocate monitor thread EQ: %d\n", rc);
                goto err_stop_ping;
        }
@@ -4082,7 +4082,7 @@ EXPORT_SYMBOL(LNetGetId);
 static int lnet_ping(struct lnet_process_id id, signed long timeout,
                     struct lnet_process_id __user *ids, int n_ids)
 {
 static int lnet_ping(struct lnet_process_id id, signed long timeout,
                     struct lnet_process_id __user *ids, int n_ids)
 {
-       struct lnet_handle_eq eqh;
+       struct lnet_eq *eq;
        struct lnet_handle_md mdh;
        struct lnet_event event;
        struct lnet_md md = { NULL };
        struct lnet_handle_md mdh;
        struct lnet_event event;
        struct lnet_md md = { NULL };
@@ -4117,8 +4117,9 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
                return -ENOMEM;
 
        /* NB 2 events max (including any unlink event) */
                return -ENOMEM;
 
        /* NB 2 events max (including any unlink event) */
-       rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &eqh);
-       if (rc != 0) {
+       eq = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE);
+       if (IS_ERR(eq)) {
+               rc = PTR_ERR(eq);
                CERROR("Can't allocate EQ: %d\n", rc);
                goto fail_ping_buffer_decref;
        }
                CERROR("Can't allocate EQ: %d\n", rc);
                goto fail_ping_buffer_decref;
        }
@@ -4130,7 +4131,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
        md.max_size  = 0;
        md.options   = LNET_MD_TRUNCATE;
        md.user_ptr  = NULL;
        md.max_size  = 0;
        md.options   = LNET_MD_TRUNCATE;
        md.user_ptr  = NULL;
-       md.eq_handle = eqh;
+       md.eq_handle = eq;
 
        rc = LNetMDBind(md, LNET_UNLINK, &mdh);
        if (rc != 0) {
 
        rc = LNetMDBind(md, LNET_UNLINK, &mdh);
        if (rc != 0) {
@@ -4161,7 +4162,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
                        sigprocmask(SIG_SETMASK, &set, &blocked);
                }
 
                        sigprocmask(SIG_SETMASK, &set, &blocked);
                }
 
-               rc2 = LNetEQPoll(&eqh, 1, timeout, &event, &which);
+               rc2 = LNetEQPoll(&eq, 1, timeout, &event, &which);
 
                if (unlinked)
                        cfs_restore_sigs(blocked);
 
                if (unlinked)
                        cfs_restore_sigs(blocked);
@@ -4259,7 +4260,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
        rc = pbuf->pb_info.pi_nnis;
 
  fail_free_eq:
        rc = pbuf->pb_info.pi_nnis;
 
  fail_free_eq:
-       rc2 = LNetEQFree(eqh);
+       rc2 = LNetEQFree(eq);
        if (rc2 != 0)
                CERROR("rc2 %d\n", rc2);
        LASSERT(rc2 == 0);
        if (rc2 != 0)
                CERROR("rc2 %d\n", rc2);
        LASSERT(rc2 == 0);
index 6a68e5f..db6bd52 100644 (file)
  * \param callback A handler function that runs when an event is deposited
  * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
  * indicate that no event handler is desired.
  * \param callback A handler function that runs when an event is deposited
  * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
  * indicate that no event handler is desired.
- * \param handle On successful return, this location will hold a handle for
- * the newly created EQ.
  *
  *
- * \retval 0      On success.
+ * \retval eq     On successful return, the newly created EQ is returned.
+ *                On failure, an error code encoded with ERR_PTR() is returned.
  * \retval -EINVAL If an parameter is not valid.
  * \retval -ENOMEM If memory for the EQ can't be allocated.
  *
  * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
  */
  * \retval -EINVAL If an parameter is not valid.
  * \retval -ENOMEM If memory for the EQ can't be allocated.
  *
  * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
  */
-int
-LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
-           struct lnet_handle_eq *handle)
+struct lnet_eq *
+LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback)
 {
        struct lnet_eq *eq;
 
 {
        struct lnet_eq *eq;
 
@@ -87,11 +85,11 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
        /* count can be 0 if only need callback, we can eliminate
         * overhead of enqueue event */
        if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
        /* count can be 0 if only need callback, we can eliminate
         * overhead of enqueue event */
        if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
-               return -EINVAL;
+               return ERR_PTR(-EINVAL);
 
        eq = lnet_eq_alloc();
        if (eq == NULL)
 
        eq = lnet_eq_alloc();
        if (eq == NULL)
-               return -ENOMEM;
+               return ERR_PTR(-ENOMEM);
 
        if (count != 0) {
                LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
 
        if (count != 0) {
                LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
@@ -111,20 +109,7 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
        if (eq->eq_refs == NULL)
                goto failed;
 
        if (eq->eq_refs == NULL)
                goto failed;
 
-       /* MUST hold both exclusive lnet_res_lock */
-       lnet_res_lock(LNET_LOCK_EX);
-       /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
-        * both EQ lookup and poll event with only lnet_eq_wait_lock */
-       lnet_eq_wait_lock();
-
-       lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
-       list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
-
-       lnet_eq_wait_unlock();
-       lnet_res_unlock(LNET_LOCK_EX);
-
-       lnet_eq2handle(handle, eq);
-       return 0;
+       return eq;
 
 failed:
        if (eq->eq_events != NULL)
 
 failed:
        if (eq->eq_events != NULL)
@@ -134,7 +119,7 @@ failed:
                cfs_percpt_free(eq->eq_refs);
 
        lnet_eq_free(eq);
                cfs_percpt_free(eq->eq_refs);
 
        lnet_eq_free(eq);
-       return -ENOMEM;
+       return ERR_PTR(-ENOMEM);
 }
 EXPORT_SYMBOL(LNetEQAlloc);
 
 }
 EXPORT_SYMBOL(LNetEQAlloc);
 
@@ -142,16 +127,14 @@ EXPORT_SYMBOL(LNetEQAlloc);
  * Release the resources associated with an event queue if it's idle;
  * otherwise do nothing and it's up to the user to try again.
  *
  * Release the resources associated with an event queue if it's idle;
  * otherwise do nothing and it's up to the user to try again.
  *
- * \param eqh A handle for the event queue to be released.
+ * \param eq The event queue to be released.
  *
  * \retval 0 If the EQ is not in use and freed.
  *
  * \retval 0 If the EQ is not in use and freed.
- * \retval -ENOENT If \a eqh does not point to a valid EQ.
  * \retval -EBUSY  If the EQ is still in use by some MDs.
  */
 int
  * \retval -EBUSY  If the EQ is still in use by some MDs.
  */
 int
-LNetEQFree(struct lnet_handle_eq eqh)
+LNetEQFree(struct lnet_eq *eq)
 {
 {
-       struct lnet_eq  *eq;
        struct lnet_event       *events = NULL;
        int             **refs = NULL;
        int             *ref;
        struct lnet_event       *events = NULL;
        int             **refs = NULL;
        int             *ref;
@@ -164,12 +147,6 @@ LNetEQFree(struct lnet_handle_eq eqh)
         * both EQ lookup and poll event with only lnet_eq_wait_lock */
        lnet_eq_wait_lock();
 
         * both EQ lookup and poll event with only lnet_eq_wait_lock */
        lnet_eq_wait_lock();
 
-       eq = lnet_handle2eq(&eqh);
-       if (eq == NULL) {
-               rc = -ENOENT;
-               goto out;
-       }
-
        cfs_percpt_for_each(ref, i, eq->eq_refs) {
                LASSERT(*ref >= 0);
                if (*ref == 0)
        cfs_percpt_for_each(ref, i, eq->eq_refs) {
                LASSERT(*ref >= 0);
                if (*ref == 0)
@@ -186,8 +163,6 @@ LNetEQFree(struct lnet_handle_eq eqh)
        size    = eq->eq_size;
        refs    = eq->eq_refs;
 
        size    = eq->eq_size;
        refs    = eq->eq_refs;
 
-       lnet_res_lh_invalidate(&eq->eq_lh);
-       list_del(&eq->eq_list);
        lnet_eq_free(eq);
  out:
        lnet_eq_wait_unlock();
        lnet_eq_free(eq);
  out:
        lnet_eq_wait_unlock();
@@ -301,7 +276,7 @@ __must_hold(&the_lnet.ln_eq_wait_lock)
  * LNetEQPoll() provides a timeout to allow applications to poll, block for a
  * fixed period, or block indefinitely.
  *
  * LNetEQPoll() provides a timeout to allow applications to poll, block for a
  * fixed period, or block indefinitely.
  *
- * \param eventqs,neq An array of EQ handles, and size of the array.
+ * \param eventqs,neq An array of lnet_eq, and size of the array.
  * \param timeout Time in jiffies to wait for an event to occur on
  * one of the EQs. The constant MAX_SCHEDULE_TIMEOUT can be used to indicate an
  * infinite timeout.
  * \param timeout Time in jiffies to wait for an event to occur on
  * one of the EQs. The constant MAX_SCHEDULE_TIMEOUT can be used to indicate an
  * infinite timeout.
@@ -317,7 +292,7 @@ __must_hold(&the_lnet.ln_eq_wait_lock)
  * \retval -ENOENT    If there's an invalid handle in \a eventqs.
  */
 int
  * \retval -ENOENT    If there's an invalid handle in \a eventqs.
  */
 int
-LNetEQPoll(struct lnet_handle_eq *eventqs, int neq, signed long timeout,
+LNetEQPoll(struct lnet_eq **eventqs, int neq, signed long timeout,
           struct lnet_event *event, int *which)
 {
        int     wait = 1;
           struct lnet_event *event, int *which)
 {
        int     wait = 1;
@@ -334,7 +309,7 @@ LNetEQPoll(struct lnet_handle_eq *eventqs, int neq, signed long timeout,
 
        for (;;) {
                for (i = 0; i < neq; i++) {
 
        for (;;) {
                for (i = 0; i < neq; i++) {
-                       struct lnet_eq *eq = lnet_handle2eq(&eventqs[i]);
+                       struct lnet_eq *eq = eventqs[i];
 
                        if (eq == NULL) {
                                lnet_eq_wait_unlock();
 
                        if (eq == NULL) {
                                lnet_eq_wait_unlock();
index f13868b..99b1305 100644 (file)
@@ -269,7 +269,7 @@ lnet_md_build(struct lnet_libmd *lmd, struct lnet_md *umd, int unlink)
 
 /* must be called with resource lock held */
 static int
 
 /* must be called with resource lock held */
 static int
-lnet_md_link(struct lnet_libmd *md, struct lnet_handle_eq eq_handle, int cpt)
+lnet_md_link(struct lnet_libmd *md, struct lnet_eq *eq, int cpt)
 {
        struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
 
 {
        struct lnet_res_container *container = the_lnet.ln_md_containers[cpt];
 
@@ -285,12 +285,8 @@ lnet_md_link(struct lnet_libmd *md, struct lnet_handle_eq eq_handle, int cpt)
         * maybe there we shouldn't even allow LNET_EQ_NONE!)
         * LASSERT (eq == NULL);
         */
         * maybe there we shouldn't even allow LNET_EQ_NONE!)
         * LASSERT (eq == NULL);
         */
-       if (!LNetEQHandleIsInvalid(eq_handle)) {
-               md->md_eq = lnet_handle2eq(&eq_handle);
-
-               if (md->md_eq == NULL)
-                       return -ENOENT;
-
+       if (eq) {
+               md->md_eq = eq;
                (*md->md_eq->eq_refs[cpt])++;
        }
 
                (*md->md_eq->eq_refs[cpt])++;
        }
 
@@ -318,7 +314,6 @@ lnet_md_deconstruct(struct lnet_libmd *lmd, struct lnet_md *umd)
        umd->max_size = lmd->md_max_size;
        umd->options = lmd->md_options;
        umd->user_ptr = lmd->md_user_ptr;
        umd->max_size = lmd->md_max_size;
        umd->options = lmd->md_options;
        umd->user_ptr = lmd->md_user_ptr;
-       lnet_eq2handle(&umd->eq_handle, lmd->md_eq);
 }
 
 static int
 }
 
 static int
index 2a39b3c..d93374b 100644 (file)
@@ -3197,7 +3197,7 @@ lnet_recover_local_nis(void)
                        ev_info->mt_type = MT_TYPE_LOCAL_NI;
                        ev_info->mt_nid = nid;
                        rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
                        ev_info->mt_type = MT_TYPE_LOCAL_NI;
                        ev_info->mt_nid = nid;
                        rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
-                                           ev_info, the_lnet.ln_mt_eqh, true);
+                                           ev_info, the_lnet.ln_mt_eq, true);
                        /* lookup the nid again */
                        lnet_net_lock(0);
                        ni = lnet_nid2ni_locked(nid, 0);
                        /* lookup the nid again */
                        lnet_net_lock(0);
                        ni = lnet_nid2ni_locked(nid, 0);
@@ -3430,7 +3430,7 @@ lnet_recover_peer_nis(void)
                        ev_info->mt_type = MT_TYPE_PEER_NI;
                        ev_info->mt_nid = nid;
                        rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
                        ev_info->mt_type = MT_TYPE_PEER_NI;
                        ev_info->mt_nid = nid;
                        rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
-                                           ev_info, the_lnet.ln_mt_eqh, true);
+                                           ev_info, the_lnet.ln_mt_eq, true);
                        lnet_net_lock(0);
                        /*
                         * lnet_find_peer_ni_locked() grabs a refcount for
                        lnet_net_lock(0);
                        /*
                         * lnet_find_peer_ni_locked() grabs a refcount for
@@ -3560,7 +3560,7 @@ lnet_monitor_thread(void *arg)
 int
 lnet_send_ping(lnet_nid_t dest_nid,
               struct lnet_handle_md *mdh, int nnis,
 int
 lnet_send_ping(lnet_nid_t dest_nid,
               struct lnet_handle_md *mdh, int nnis,
-              void *user_data, struct lnet_handle_eq eqh, bool recovery)
+              void *user_data, struct lnet_eq *eq, bool recovery)
 {
        struct lnet_md md = { NULL };
        struct lnet_process_id id;
 {
        struct lnet_md md = { NULL };
        struct lnet_process_id id;
@@ -3585,7 +3585,7 @@ lnet_send_ping(lnet_nid_t dest_nid,
        md.max_size  = 0;
        md.options   = LNET_MD_TRUNCATE;
        md.user_ptr  = user_data;
        md.max_size  = 0;
        md.options   = LNET_MD_TRUNCATE;
        md.user_ptr  = user_data;
-       md.eq_handle = eqh;
+       md.eq_handle = eq;
 
        rc = LNetMDBind(md, LNET_UNLINK, mdh);
        if (rc) {
 
        rc = LNetMDBind(md, LNET_UNLINK, mdh);
        if (rc) {
@@ -3780,7 +3780,7 @@ clean_thread:
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
-       LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+       the_lnet.ln_mt_eq = NULL;
        return rc;
 clean_queues:
        lnet_rsp_tracker_clean();
        return rc;
 clean_queues:
        lnet_rsp_tracker_clean();
index d5adfc3..d9cda71 100644 (file)
@@ -2979,7 +2979,7 @@ __must_hold(&lp->lp_lock)
        nnis = max(lp->lp_data_nnis, LNET_INTERFACES_MIN);
 
        rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
        nnis = max(lp->lp_data_nnis, LNET_INTERFACES_MIN);
 
        rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
-                           the_lnet.ln_dc_eqh, false);
+                           the_lnet.ln_dc_eq, false);
 
        /*
         * if LNetMDBind in lnet_send_ping fails we need to decrement the
 
        /*
         * if LNetMDBind in lnet_send_ping fails we need to decrement the
@@ -3071,7 +3071,7 @@ __must_hold(&lp->lp_lock)
        md.threshold = 2; /* Put/Ack */
        md.max_size  = 0;
        md.options   = 0;
        md.threshold = 2; /* Put/Ack */
        md.max_size  = 0;
        md.options   = 0;
-       md.eq_handle = the_lnet.ln_dc_eqh;
+       md.eq_handle = the_lnet.ln_dc_eq;
        md.user_ptr  = lp;
 
        rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
        md.user_ptr  = lp;
 
        rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
@@ -3399,8 +3399,8 @@ static int lnet_peer_discovery(void *arg)
        }
        lnet_net_unlock(LNET_LOCK_EX);
 
        }
        lnet_net_unlock(LNET_LOCK_EX);
 
-       LNetEQFree(the_lnet.ln_dc_eqh);
-       LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+       LNetEQFree(the_lnet.ln_dc_eq);
+       the_lnet.ln_dc_eq = NULL;
 
        the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
        wake_up(&the_lnet.ln_dc_waitq);
 
        the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
        wake_up(&the_lnet.ln_dc_waitq);
@@ -3414,13 +3414,14 @@ static int lnet_peer_discovery(void *arg)
 int lnet_peer_discovery_start(void)
 {
        struct task_struct *task;
 int lnet_peer_discovery_start(void)
 {
        struct task_struct *task;
-       int rc;
+       int rc = 0;
 
        if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
                return -EALREADY;
 
 
        if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
                return -EALREADY;
 
-       rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
-       if (rc != 0) {
+       the_lnet.ln_dc_eq = LNetEQAlloc(0, lnet_discovery_event_handler);
+       if (IS_ERR(the_lnet.ln_dc_eq)) {
+               rc = PTR_ERR(the_lnet.ln_dc_eq);
                CERROR("Can't allocate discovery EQ: %d\n", rc);
                return rc;
        }
                CERROR("Can't allocate discovery EQ: %d\n", rc);
                return rc;
        }
@@ -3431,8 +3432,8 @@ int lnet_peer_discovery_start(void)
                rc = PTR_ERR(task);
                CERROR("Can't start peer discovery thread: %d\n", rc);
 
                rc = PTR_ERR(task);
                CERROR("Can't start peer discovery thread: %d\n", rc);
 
-               LNetEQFree(the_lnet.ln_dc_eqh);
-               LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+               LNetEQFree(the_lnet.ln_dc_eq);
+               the_lnet.ln_dc_eq = NULL;
 
                the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
        }
 
                the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
        }
index b3ef75a..3c7490a 100644 (file)
@@ -53,7 +53,7 @@ enum srpc_state {
 static struct smoketest_rpc {
        spinlock_t       rpc_glock;     /* global lock */
        struct srpc_service     *rpc_services[SRPC_SERVICE_MAX_ID + 1];
 static struct smoketest_rpc {
        spinlock_t       rpc_glock;     /* global lock */
        struct srpc_service     *rpc_services[SRPC_SERVICE_MAX_ID + 1];
-       struct lnet_handle_eq    rpc_lnet_eq;   /* _the_ LNet event queue */
+       struct lnet_eq          *rpc_lnet_eq;   /* _the_ LNet event queue */
        enum srpc_state          rpc_state;
        struct srpc_counters     rpc_counters;
        __u64                    rpc_matchbits; /* matchbits counter */
        enum srpc_state          rpc_state;
        struct srpc_counters     rpc_counters;
        __u64                    rpc_matchbits; /* matchbits counter */
@@ -1621,9 +1621,9 @@ srpc_startup (void)
 
        srpc_data.rpc_state = SRPC_STATE_NI_INIT;
 
 
        srpc_data.rpc_state = SRPC_STATE_NI_INIT;
 
-       LNetInvalidateEQHandle(&srpc_data.rpc_lnet_eq);
-       rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
-       if (rc != 0) {
+       srpc_data.rpc_lnet_eq = LNetEQAlloc(0, srpc_lnet_ev_handler);
+       if (IS_ERR(srpc_data.rpc_lnet_eq)) {
+               rc = PTR_ERR(srpc_data.rpc_lnet_eq);
                CERROR("LNetEQAlloc() has failed: %d\n", rc);
                goto bail;
        }
                CERROR("LNetEQAlloc() has failed: %d\n", rc);
                goto bail;
        }
index 1c7072c..f3f5b47 100644 (file)
@@ -1973,7 +1973,7 @@ static inline bool nrs_policy_compat_one(const struct ptlrpc_service *svc,
 /** @} nrs */
 
 /* ptlrpc/events.c */
 /** @} nrs */
 
 /* ptlrpc/events.c */
-extern struct lnet_handle_eq ptlrpc_eq_h;
+extern struct lnet_eq *ptlrpc_eq;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
                               struct lnet_process_id *peer, lnet_nid_t *self);
 /**
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
                               struct lnet_process_id *peer, lnet_nid_t *self);
 /**
index 660a05c..f7a4c88 100644 (file)
@@ -40,7 +40,7 @@
 #include <lustre_sec.h>
 #include "ptlrpc_internal.h"
 
 #include <lustre_sec.h>
 #include "ptlrpc_internal.h"
 
-struct lnet_handle_eq ptlrpc_eq_h;
+struct lnet_eq *ptlrpc_eq;
 
 /*
  *  Client's outgoing request callback
 
 /*
  *  Client's outgoing request callback
@@ -556,7 +556,7 @@ void ptlrpc_ni_fini(void)
         * replies */
 
        for (retries = 0;; retries++) {
         * replies */
 
        for (retries = 0;; retries++) {
-               rc = LNetEQFree(ptlrpc_eq_h);
+               rc = LNetEQFree(ptlrpc_eq);
                switch (rc) {
                default:
                        LBUG();
                switch (rc) {
                default:
                        LBUG();
@@ -603,17 +603,17 @@ int ptlrpc_ni_init(void)
         * because we are guaranteed to get every event via callback,
         * so we just set EQ size to 0 to avoid overhread of serializing
         * enqueue/dequeue operations in LNet. */
         * because we are guaranteed to get every event via callback,
         * so we just set EQ size to 0 to avoid overhread of serializing
         * enqueue/dequeue operations in LNet. */
-       rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
-        if (rc == 0)
-                return 0;
+       ptlrpc_eq = LNetEQAlloc(0, ptlrpc_master_callback);
+       if (!IS_ERR(ptlrpc_eq))
+               return 0;
 
 
-        CERROR ("Failed to allocate event queue: %d\n", rc);
-        LNetNIFini();
+       rc = PTR_ERR(ptlrpc_eq);
+       CERROR("Failed to allocate event queue: %d\n", rc);
+       LNetNIFini();
 
        return rc;
 }
 
 
        return rc;
 }
 
-
 int ptlrpc_init_portals(void)
 {
         int   rc = ptlrpc_ni_init();
 int ptlrpc_init_portals(void)
 {
         int   rc = ptlrpc_ni_init();
index c19fecd..7af98c4 100644 (file)
@@ -61,7 +61,7 @@ static int ptl_send_buf(struct lnet_handle_md *mdh, void *base, int len,
        md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
        md.options   = PTLRPC_MD_OPTIONS;
        md.user_ptr  = cbid;
        md.threshold = (ack == LNET_ACK_REQ) ? 2 : 1;
        md.options   = PTLRPC_MD_OPTIONS;
        md.user_ptr  = cbid;
-       md.eq_handle = ptlrpc_eq_h;
+       md.eq_handle = ptlrpc_eq;
        LNetInvalidateMDHandle(&md.bulk_handle);
 
        if (bulk_cookie) {
        LNetInvalidateMDHandle(&md.bulk_handle);
 
        if (bulk_cookie) {
@@ -195,7 +195,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
        desc->bd_failure = 0;
 
        md.user_ptr = &desc->bd_cbid;
        desc->bd_failure = 0;
 
        md.user_ptr = &desc->bd_cbid;
-       md.eq_handle = ptlrpc_eq_h;
+       md.eq_handle = ptlrpc_eq;
        md.threshold = 2; /* SENT and ACK/REPLY */
 
        for (posted_md = 0; posted_md < total_md; mbits++) {
        md.threshold = 2; /* SENT and ACK/REPLY */
 
        for (posted_md = 0; posted_md < total_md; mbits++) {
@@ -362,7 +362,7 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
        desc->bd_last_mbits = mbits;
        desc->bd_md_count = total_md;
        md.user_ptr = &desc->bd_cbid;
        desc->bd_last_mbits = mbits;
        desc->bd_md_count = total_md;
        md.user_ptr = &desc->bd_cbid;
-       md.eq_handle = ptlrpc_eq_h;
+       md.eq_handle = ptlrpc_eq;
        md.threshold = 1;                       /* PUT or GET */
 
        for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
        md.threshold = 1;                       /* PUT or GET */
 
        for (posted_md = 0; posted_md < total_md; posted_md++, mbits++) {
@@ -850,8 +850,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                 reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
                         LNET_MD_MANAGE_REMOTE |
                         LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
                 reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
                         LNET_MD_MANAGE_REMOTE |
                         LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
-                reply_md.user_ptr  = &request->rq_reply_cbid;
-                reply_md.eq_handle = ptlrpc_eq_h;
+               reply_md.user_ptr  = &request->rq_reply_cbid;
+               reply_md.eq_handle = ptlrpc_eq;
 
                /* We must see the unlink callback to set rq_reply_unlinked,
                 * so we can't auto-unlink */
 
                /* We must see the unlink callback to set rq_reply_unlinked,
                 * so we can't auto-unlink */
@@ -967,16 +967,16 @@ int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
                return -ENOMEM;
        }
 
                return -ENOMEM;
        }
 
-        LASSERT(rqbd->rqbd_refcount == 0);
-        rqbd->rqbd_refcount = 1;
+       LASSERT(rqbd->rqbd_refcount == 0);
+       rqbd->rqbd_refcount = 1;
 
 
-        md.start     = rqbd->rqbd_buffer;
-        md.length    = service->srv_buf_size;
-        md.max_size  = service->srv_max_req_size;
-        md.threshold = LNET_MD_THRESH_INF;
-        md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
-        md.user_ptr  = &rqbd->rqbd_cbid;
-        md.eq_handle = ptlrpc_eq_h;
+       md.start     = rqbd->rqbd_buffer;
+       md.length    = service->srv_buf_size;
+       md.max_size  = service->srv_max_req_size;
+       md.threshold = LNET_MD_THRESH_INF;
+       md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
+       md.user_ptr  = &rqbd->rqbd_cbid;
+       md.eq_handle = ptlrpc_eq;
 
        rc = LNetMDAttach(me, md, LNET_UNLINK, &rqbd->rqbd_md_h);
        if (rc == 0)
 
        rc = LNetMDAttach(me, md, LNET_UNLINK, &rqbd->rqbd_md_h);
        if (rc == 0)