Whamcloud - gitweb
LU-56 lnet: container for LNet message
authorLiang Zhen <liang@whamcloud.com>
Sun, 27 May 2012 07:58:14 +0000 (15:58 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 7 Jun 2012 17:07:21 +0000 (13:07 -0400)
Adding a simple message container to LNet, and use it to replace
global message queues and message finalizers.
It's a work step of LNet SMP improvements, we will create instances
for each CPT in following patches.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I00f79d7598c4cd1274d7b72f7c4b482d9df75b7a
Reviewed-on: http://review.whamcloud.com/2922
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c

index 28b69e6..3f7489d 100644 (file)
@@ -261,29 +261,32 @@ lnet_me_free(lnet_me_t *me)
 static inline lnet_msg_t *
 lnet_msg_alloc (void)
 {
 static inline lnet_msg_t *
 lnet_msg_alloc (void)
 {
-        /* NEVER called with liblock held */
-        lnet_msg_t    *msg;
+       /* NEVER called with network lock held */
+       struct lnet_msg_container *msc = &the_lnet.ln_msg_container;
+       lnet_msg_t                *msg;
 
 
-        LNET_LOCK();
-        msg = (lnet_msg_t *)lnet_freelist_alloc(&the_lnet.ln_free_msgs);
-        LNET_UNLOCK();
+       LNET_LOCK();
+       msg = (lnet_msg_t *)lnet_freelist_alloc(&msc->msc_freelist);
+       LNET_UNLOCK();
 
 
-        if (msg != NULL) {
-                /* NULL pointers, clear flags etc */
-                memset (msg, 0, sizeof (*msg));
+       if (msg != NULL) {
+               /* NULL pointers, clear flags etc */
+               memset(msg, 0, sizeof(*msg));
 #ifdef CRAY_XT3
 #ifdef CRAY_XT3
-                msg->msg_ev.uid = LNET_UID_ANY;
+               msg->msg_ev.uid = LNET_UID_ANY;
 #endif
 #endif
-        }
-        return(msg);
+       }
+       return msg;
 }
 
 static inline void
 lnet_msg_free_locked(lnet_msg_t *msg)
 {
        /* ALWAYS called with network lock held */
 }
 
 static inline void
 lnet_msg_free_locked(lnet_msg_t *msg)
 {
        /* ALWAYS called with network lock held */
+       struct lnet_msg_container *msc = &the_lnet.ln_msg_container;
+
        LASSERT(!msg->msg_onactivelist);
        LASSERT(!msg->msg_onactivelist);
-       lnet_freelist_free(&the_lnet.ln_free_msgs, msg);
+       lnet_freelist_free(&msc->msc_freelist, msg);
 }
 
 static inline void
 }
 
 static inline void
@@ -693,6 +696,9 @@ lnet_msg_t *lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *get_msg);
 void lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *msg, unsigned int len);
 void lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int rc);
 
 void lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *msg, unsigned int len);
 void lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int rc);
 
+int lnet_msg_container_setup(struct lnet_msg_container *container);
+void lnet_msg_container_cleanup(struct lnet_msg_container *container);
+
 char *lnet_msgtyp2str (int type);
 void lnet_print_hdr (lnet_hdr_t * hdr);
 int lnet_fail_nid(lnet_nid_t nid, unsigned int threshold);
 char *lnet_msgtyp2str (int type);
 void lnet_print_hdr (lnet_hdr_t * hdr);
 int lnet_fail_nid(lnet_nid_t nid, unsigned int threshold);
index 0dd289e..58c916e 100644 (file)
@@ -536,6 +536,21 @@ struct lnet_res_container {
 #endif
 };
 
 #endif
 };
 
+/* message container */
+struct lnet_msg_container {
+       int                     msc_init;       /* initialized or not */
+       /* max # threads finalizing */
+       int                     msc_nfinalizers;
+       /* msgs waiting to complete finalizing */
+       cfs_list_t              msc_finalizing;
+       cfs_list_t              msc_active;     /* active message list */
+       /* threads doing finalization */
+       void                    **msc_finalizers;
+#ifdef LNET_USE_LIB_FREELIST
+       lnet_freelist_t         msc_freelist;   /* freelist for messages */
+#endif
+};
+
 /* Router Checker states */
 #define LNET_RC_STATE_SHUTDOWN     0            /* not started */
 #define LNET_RC_STATE_RUNNING      1            /* started up OK */
 /* Router Checker states */
 #define LNET_RC_STATE_SHUTDOWN     0            /* not started */
 #define LNET_RC_STATE_RUNNING      1            /* started up OK */
@@ -610,15 +625,11 @@ typedef struct
 
         int                    ln_testprotocompat;  /* test protocol compatibility flags */
 
 
         int                    ln_testprotocompat;  /* test protocol compatibility flags */
 
-        cfs_list_t             ln_finalizeq;        /* msgs waiting to complete finalizing */
-#ifdef __KERNEL__
-        void                 **ln_finalizers;       /* threads doing finalization */
-        int                    ln_nfinalizers;      /* max # threads finalizing */
-#else
-        int                    ln_finalizing;
-#endif
         cfs_list_t             ln_test_peers;       /* failure simulation */
 
         cfs_list_t             ln_test_peers;       /* failure simulation */
 
+       /* message container */
+       struct lnet_msg_container       ln_msg_container;
+
         lnet_handle_md_t       ln_ping_target_md;
         lnet_handle_eq_t       ln_ping_target_eq;
         lnet_ping_info_t      *ln_ping_info;
         lnet_handle_md_t       ln_ping_target_md;
         lnet_handle_eq_t       ln_ping_target_eq;
         lnet_ping_info_t      *ln_ping_info;
@@ -630,10 +641,6 @@ typedef struct
         lnet_handle_eq_t       ln_rc_eqh;           /* router checker's event queue */
         lnet_handle_md_t       ln_rc_mdh;
         cfs_list_t             ln_zombie_rcd;
         lnet_handle_eq_t       ln_rc_eqh;           /* router checker's event queue */
         lnet_handle_md_t       ln_rc_mdh;
         cfs_list_t             ln_zombie_rcd;
-#ifdef LNET_USE_LIB_FREELIST
-        lnet_freelist_t        ln_free_msgs;
-#endif
-        cfs_list_t             ln_active_msgs;
 
         lnet_counters_t        ln_counters;
 
 
         lnet_counters_t        ln_counters;
 
index ee09400..1829896 100644 (file)
@@ -348,20 +348,7 @@ lnet_unregister_lnd (lnd_t *lnd)
         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
 }
 
         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
 }
 
-#ifndef LNET_USE_LIB_FREELIST
-
-int
-lnet_descriptor_setup (void)
-{
-        return 0;
-}
-
-void
-lnet_descriptor_cleanup (void)
-{
-}
-
-#else
+#ifdef LNET_USE_LIB_FREELIST
 
 int
 lnet_freelist_init (lnet_freelist_t *fl, int n, int size)
 
 int
 lnet_freelist_init (lnet_freelist_t *fl, int n, int size)
@@ -410,27 +397,6 @@ lnet_freelist_fini (lnet_freelist_t *fl)
         memset (fl, 0, sizeof (*fl));
 }
 
         memset (fl, 0, sizeof (*fl));
 }
 
-int
-lnet_descriptor_setup (void)
-{
-        /* NB on failure caller must still call lnet_descriptor_cleanup */
-        /*               ******                                         */
-        int        rc;
-
-        memset (&the_lnet.ln_free_msgs, 0, sizeof (the_lnet.ln_free_msgs));
-        rc = lnet_freelist_init(&the_lnet.ln_free_msgs,
-                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
-        if (rc != 0)
-                return (rc);
-        return (rc);
-}
-
-void
-lnet_descriptor_cleanup (void)
-{
-        lnet_freelist_fini (&the_lnet.ln_free_msgs);
-}
-
 #endif /* LNET_USE_LIB_FREELIST */
 
 __u64
 #endif /* LNET_USE_LIB_FREELIST */
 
 __u64
@@ -623,50 +589,6 @@ lnet_portal_mhash_free(cfs_list_t *mhash)
         LIBCFS_FREE(mhash, sizeof(cfs_list_t) * LNET_PORTAL_HASH_SIZE);
 }
 
         LIBCFS_FREE(mhash, sizeof(cfs_list_t) * LNET_PORTAL_HASH_SIZE);
 }
 
-int
-lnet_init_finalizers(void)
-{
-#ifdef __KERNEL__
-        int    i;
-
-        the_lnet.ln_nfinalizers = (int) cfs_num_online_cpus();
-
-        LIBCFS_ALLOC(the_lnet.ln_finalizers,
-                     the_lnet.ln_nfinalizers *
-                     sizeof(*the_lnet.ln_finalizers));
-        if (the_lnet.ln_finalizers == NULL) {
-                CERROR("Can't allocate ln_finalizers\n");
-                return -ENOMEM;
-        }
-
-        for (i = 0; i < the_lnet.ln_nfinalizers; i++)
-                the_lnet.ln_finalizers[i] = NULL;
-#else
-        the_lnet.ln_finalizing = 0;
-#endif
-
-        CFS_INIT_LIST_HEAD(&the_lnet.ln_finalizeq);
-        return 0;
-}
-
-void
-lnet_fini_finalizers(void)
-{
-#ifdef __KERNEL__
-        int    i;
-
-        for (i = 0; i < the_lnet.ln_nfinalizers; i++)
-                LASSERT (the_lnet.ln_finalizers[i] == NULL);
-
-        LIBCFS_FREE(the_lnet.ln_finalizers,
-                    the_lnet.ln_nfinalizers *
-                    sizeof(*the_lnet.ln_finalizers));
-#else
-        LASSERT (!the_lnet.ln_finalizing);
-#endif
-        LASSERT (cfs_list_empty(&the_lnet.ln_finalizeq));
-}
-
 #ifndef __KERNEL__
 /**
  * Reserved API - do not use.
 #ifndef __KERNEL__
 /**
  * Reserved API - do not use.
@@ -709,14 +631,9 @@ lnet_prepare(lnet_pid_t requested_pid)
         }
 #endif
 
         }
 #endif
 
-        rc = lnet_descriptor_setup();
-        if (rc != 0)
-               return -ENOMEM;
-
         memset(&the_lnet.ln_counters, 0,
                sizeof(the_lnet.ln_counters));
 
         memset(&the_lnet.ln_counters, 0,
                sizeof(the_lnet.ln_counters));
 
-        CFS_INIT_LIST_HEAD (&the_lnet.ln_active_msgs);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_test_peers);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_nis);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_zombie_nis);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_test_peers);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_nis);
         CFS_INIT_LIST_HEAD (&the_lnet.ln_zombie_nis);
@@ -731,7 +648,8 @@ lnet_prepare(lnet_pid_t requested_pid)
         if (rc != 0)
                goto failed0;
 
         if (rc != 0)
                goto failed0;
 
-       rc = lnet_init_finalizers();
+       /* NB: we will have instance of message container per CPT soon */
+       rc = lnet_msg_container_setup(&the_lnet.ln_msg_container);
        if (rc != 0)
                goto failed1;
 
        if (rc != 0)
                goto failed1;
 
@@ -785,11 +703,10 @@ lnet_prepare(lnet_pid_t requested_pid)
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
  failed2:
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
  failed2:
-       lnet_fini_finalizers();
+       lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
  failed1:
        lnet_destroy_peer_table();
  failed0:
  failed1:
        lnet_destroy_peer_table();
  failed0:
-       lnet_descriptor_cleanup();
        return rc;
 }
 
        return rc;
 }
 
@@ -834,26 +751,14 @@ lnet_unprepare (void)
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
        lnet_res_container_cleanup(&the_lnet.ln_me_container);
        lnet_res_container_cleanup(&the_lnet.ln_eq_container);
 
-        while (!cfs_list_empty (&the_lnet.ln_active_msgs)) {
-                lnet_msg_t *msg = cfs_list_entry (the_lnet.ln_active_msgs.next,
-                                                  lnet_msg_t, msg_activelist);
+       LIBCFS_FREE(the_lnet.ln_portals,
+                   the_lnet.ln_nportals * sizeof(*the_lnet.ln_portals));
 
 
-                CERROR ("Active msg %p on exit\n", msg);
-                LASSERT (msg->msg_onactivelist);
-                msg->msg_onactivelist = 0;
-                cfs_list_del (&msg->msg_activelist);
-                lnet_msg_free (msg);
-        }
-
-        LIBCFS_FREE(the_lnet.ln_portals,  
-                    the_lnet.ln_nportals * sizeof(*the_lnet.ln_portals));
-
-        lnet_free_rtrpools();
-        lnet_fini_finalizers();
-        lnet_destroy_peer_table();
-        lnet_descriptor_cleanup();
+       lnet_free_rtrpools();
+       lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
+       lnet_destroy_peer_table();
 
 
-        return (0);
+       return 0;
 }
 
 lnet_ni_t  *
 }
 
 lnet_ni_t  *
index 8e949a4..9661ec3 100644 (file)
@@ -1105,7 +1105,8 @@ lnet_commit_routedmsg (lnet_msg_t *msg)
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
-        cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+       cfs_list_add(&msg->msg_activelist,
+                    &the_lnet.ln_msg_container.msc_active);
 }
 
 lnet_rtrbufpool_t *
 }
 
 lnet_rtrbufpool_t *
@@ -1526,7 +1527,8 @@ lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
-        cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+       cfs_list_add(&msg->msg_activelist,
+                    &the_lnet.ln_msg_container.msc_active);
 }
 
 static void
 }
 
 static void
index 9f1ad48..a25fae2 100644 (file)
@@ -154,11 +154,10 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
 void
 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
 void
 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
-#ifdef __KERNEL__
-        int                i;
-        int                my_slot;
-#endif
-        lnet_libmd_t      *md;
+       struct lnet_msg_container       *container;
+       lnet_libmd_t                    *md;
+       int                             my_slot;
+       int                             i;
 
         LASSERT (!cfs_in_interrupt ());
 
 
         LASSERT (!cfs_in_interrupt ());
 
@@ -208,32 +207,37 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                 msg->msg_md = NULL;
         }
 
                 msg->msg_md = NULL;
         }
 
-        cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
+       container = &the_lnet.ln_msg_container;
+       cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
 
-        /* Recursion breaker.  Don't complete the message here if I am (or
-         * enough other threads are) already completing messages */
+       /* Recursion breaker.  Don't complete the message here if I am (or
+        * enough other threads are) already completing messages */
 
 #ifdef __KERNEL__
 
 #ifdef __KERNEL__
-        my_slot = -1;
-        for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
-                if (the_lnet.ln_finalizers[i] == cfs_current())
-                        goto out;
-                if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
-                        my_slot = i;
-        }
-        if (my_slot < 0)
-                goto out;
+       my_slot = -1;
+       for (i = 0; i < container->msc_nfinalizers; i++) {
+               if (container->msc_finalizers[i] == cfs_current())
+                       goto out;
 
 
-        the_lnet.ln_finalizers[my_slot] = cfs_current();
+               if (my_slot < 0 && container->msc_finalizers[i] == NULL)
+                       my_slot = i;
+       }
+
+       if (my_slot < 0)
+               goto out;
+
+       container->msc_finalizers[my_slot] = cfs_current();
 #else
 #else
-        if (the_lnet.ln_finalizing)
-                goto out;
+       LASSERT(container->msc_nfinalizers == 1);
+       if (container->msc_finalizers[0] != NULL)
+               goto out;
 
 
-        the_lnet.ln_finalizing = 1;
+       my_slot = i = 0;
+       container->msc_finalizers[0] = (struct lnet_msg_container *)1;
 #endif
 
 #endif
 
-        while (!cfs_list_empty(&the_lnet.ln_finalizeq)) {
-                msg = cfs_list_entry(the_lnet.ln_finalizeq.next,
+       while (!cfs_list_empty(&container->msc_finalizing)) {
+               msg = cfs_list_entry(container->msc_finalizing.next,
                                      lnet_msg_t, msg_list);
 
                 cfs_list_del(&msg->msg_list);
                                      lnet_msg_t, msg_list);
 
                 cfs_list_del(&msg->msg_list);
@@ -243,12 +247,80 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                 lnet_complete_msg_locked(msg);
         }
 
                 lnet_complete_msg_locked(msg);
         }
 
-#ifdef __KERNEL__
-        the_lnet.ln_finalizers[my_slot] = NULL;
-#else
-        the_lnet.ln_finalizing = 0;
+       container->msc_finalizers[my_slot] = NULL;
+ out:
+       LNET_UNLOCK();
+}
+
+void
+lnet_msg_container_cleanup(struct lnet_msg_container *container)
+{
+       int     count = 0;
+
+       if (container->msc_init == 0)
+               return;
+
+       while (!cfs_list_empty(&container->msc_active)) {
+               lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
+                                                lnet_msg_t, msg_activelist);
+
+               LASSERT(msg->msg_onactivelist);
+               msg->msg_onactivelist = 0;
+               cfs_list_del(&msg->msg_activelist);
+               lnet_msg_free(msg);
+               count++;
+       }
+
+       if (count > 0)
+               CERROR("%d active msg on exit\n", count);
+
+       if (container->msc_finalizers != NULL) {
+               LIBCFS_FREE(container->msc_finalizers,
+                           container->msc_nfinalizers *
+                           sizeof(*container->msc_finalizers));
+               container->msc_finalizers = NULL;
+       }
+#ifdef LNET_USE_LIB_FREELIST
+       lnet_freelist_fini(&container->msc_freelist);
 #endif
 #endif
+       container->msc_init = 0;
+}
 
 
- out:
-        LNET_UNLOCK();
+int
+lnet_msg_container_setup(struct lnet_msg_container *container)
+{
+       int     rc;
+
+       container->msc_init = 1;
+
+       CFS_INIT_LIST_HEAD(&container->msc_active);
+       CFS_INIT_LIST_HEAD(&container->msc_finalizing);
+
+#ifdef LNET_USE_LIB_FREELIST
+       memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
+
+       rc = lnet_freelist_init(&container->msc_freelist,
+                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
+       if (rc != 0) {
+               CERROR("Failed to init freelist for message container\n");
+               lnet_msg_container_cleanup(container);
+               return rc;
+       }
+#else
+       rc = 0;
+#endif
+       /* number of CPUs */
+       container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
+                                                   CFS_CPT_ANY);
+       LIBCFS_ALLOC(container->msc_finalizers,
+                    container->msc_nfinalizers *
+                    sizeof(*container->msc_finalizers));
+
+       if (container->msc_finalizers == NULL) {
+               CERROR("Failed to allocate message finalizers\n");
+               lnet_msg_container_cleanup(container);
+               return -ENOMEM;
+       }
+
+       return 0;
 }
 }