Whamcloud - gitweb
LU-7888 obdclass: not hold global lock when lwp callback 34/19034/5
authorFan Yong <fan.yong@intel.com>
Wed, 13 Apr 2016 09:37:18 +0000 (17:37 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 16 May 2016 16:47:56 +0000 (16:47 +0000)
Originally, the lwp_notify_main() thread handled the every
registered lwp callback in turn with holding the global
lwp_register_list_lock. Because some lwp callback, such as
mdt_register_lwp_callback() may be blocked, then it will
cause others to be blocked when try to obtain such lock for
lustre_register_lwp_item(), and then block the mount.

This patch introduces reference count on the lwp_register_item
that will prevent the lwp item being freed during the callback,
so allow the lwp_notify_main() thread to do the real callback
without holding the global lock lwp_register_list_lock.

Intel-bug-Id: LDEV-323

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I1061205d5781e0699478d293e223567a0ebea286
Reviewed-on: http://review.whamcloud.com/19034
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/obd_class.h
lustre/obdclass/obd_mount_server.c

index f2c1af7..0b72e84 100644 (file)
@@ -1665,7 +1665,8 @@ struct lwp_register_item {
        struct obd_export **lri_exp;
        register_lwp_cb     lri_cb_func;
        void               *lri_cb_data;
        struct obd_export **lri_exp;
        register_lwp_cb     lri_cb_func;
        void               *lri_cb_data;
-       struct list_head            lri_list;
+       struct list_head    lri_list;
+       atomic_t            lri_ref;
        char                lri_name[MTI_NAME_MAXLEN];
 };
 
        char                lri_name[MTI_NAME_MAXLEN];
 };
 
index c8c90ec..9945ef0 100644 (file)
@@ -389,7 +389,18 @@ EXPORT_SYMBOL(tgt_name2lwp_name);
 
 static struct list_head lwp_register_list =
        LIST_HEAD_INIT(lwp_register_list);
 
 static struct list_head lwp_register_list =
        LIST_HEAD_INIT(lwp_register_list);
-static DEFINE_MUTEX(lwp_register_list_lock);
+static DEFINE_SPINLOCK(lwp_register_list_lock);
+
+static void lustre_put_lwp_item(struct lwp_register_item *lri)
+{
+       if (atomic_dec_and_test(&lri->lri_ref)) {
+               LASSERT(list_empty(&lri->lri_list));
+
+               if (*lri->lri_exp != NULL)
+                       class_export_put(*lri->lri_exp);
+               OBD_FREE_PTR(lri);
+       }
+}
 
 int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
                             register_lwp_cb cb_func, void *cb_data)
 
 int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
                             register_lwp_cb cb_func, void *cb_data)
@@ -406,15 +417,12 @@ int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
        if (lri == NULL)
                RETURN(-ENOMEM);
 
        if (lri == NULL)
                RETURN(-ENOMEM);
 
-       mutex_lock(&lwp_register_list_lock);
-
        lwp = class_name2obd(lwpname);
        if (lwp != NULL && lwp->obd_set_up == 1) {
                struct obd_uuid *uuid;
 
                OBD_ALLOC_PTR(uuid);
                if (uuid == NULL) {
        lwp = class_name2obd(lwpname);
        if (lwp != NULL && lwp->obd_set_up == 1) {
                struct obd_uuid *uuid;
 
                OBD_ALLOC_PTR(uuid);
                if (uuid == NULL) {
-                       mutex_unlock(&lwp_register_list_lock);
                        OBD_FREE_PTR(lri);
                        RETURN(-ENOMEM);
                }
                        OBD_FREE_PTR(lri);
                        RETURN(-ENOMEM);
                }
@@ -428,31 +436,40 @@ int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
        lri->lri_cb_func = cb_func;
        lri->lri_cb_data = cb_data;
        INIT_LIST_HEAD(&lri->lri_list);
        lri->lri_cb_func = cb_func;
        lri->lri_cb_data = cb_data;
        INIT_LIST_HEAD(&lri->lri_list);
+       /*
+        * Initialize the lri_ref at 2, one will be released before
+        * current function returned via lustre_put_lwp_item(), the
+        * other will be released in lustre_deregister_lwp_item().
+        */
+       atomic_set(&lri->lri_ref, 2);
+
+       spin_lock(&lwp_register_list_lock);
        list_add(&lri->lri_list, &lwp_register_list);
        list_add(&lri->lri_list, &lwp_register_list);
+       spin_unlock(&lwp_register_list_lock);
 
        if (*exp != NULL && cb_func != NULL)
                cb_func(cb_data);
 
        if (*exp != NULL && cb_func != NULL)
                cb_func(cb_data);
+       lustre_put_lwp_item(lri);
 
 
-       mutex_unlock(&lwp_register_list_lock);
        RETURN(0);
 }
 EXPORT_SYMBOL(lustre_register_lwp_item);
 
 void lustre_deregister_lwp_item(struct obd_export **exp)
 {
        RETURN(0);
 }
 EXPORT_SYMBOL(lustre_register_lwp_item);
 
 void lustre_deregister_lwp_item(struct obd_export **exp)
 {
-       struct lwp_register_item *lri, *tmp;
+       struct lwp_register_item *lri;
 
 
-       mutex_lock(&lwp_register_list_lock);
-       list_for_each_entry_safe(lri, tmp, &lwp_register_list, lri_list) {
+       spin_lock(&lwp_register_list_lock);
+       list_for_each_entry(lri, &lwp_register_list, lri_list) {
                if (exp == lri->lri_exp) {
                if (exp == lri->lri_exp) {
-                       if (*exp)
-                               class_export_put(*exp);
-                       list_del(&lri->lri_list);
-                       OBD_FREE_PTR(lri);
-                       break;
+                       list_del_init(&lri->lri_list);
+                       spin_unlock(&lwp_register_list_lock);
+
+                       lustre_put_lwp_item(lri);
+                       return;
                }
        }
                }
        }
-       mutex_unlock(&lwp_register_list_lock);
+       spin_unlock(&lwp_register_list_lock);
 }
 EXPORT_SYMBOL(lustre_deregister_lwp_item);
 
 }
 EXPORT_SYMBOL(lustre_deregister_lwp_item);
 
@@ -499,20 +516,32 @@ EXPORT_SYMBOL(lustre_find_lwp_by_index);
 
 void lustre_notify_lwp_list(struct obd_export *exp)
 {
 
 void lustre_notify_lwp_list(struct obd_export *exp)
 {
-       struct lwp_register_item *lri, *tmp;
+       struct lwp_register_item *lri;
        LASSERT(exp != NULL);
 
        LASSERT(exp != NULL);
 
-       mutex_lock(&lwp_register_list_lock);
-       list_for_each_entry_safe(lri, tmp, &lwp_register_list, lri_list) {
+again:
+       spin_lock(&lwp_register_list_lock);
+       list_for_each_entry(lri, &lwp_register_list, lri_list) {
                if (strcmp(exp->exp_obd->obd_name, lri->lri_name))
                        continue;
                if (*lri->lri_exp != NULL)
                        continue;
                *lri->lri_exp = class_export_get(exp);
                if (strcmp(exp->exp_obd->obd_name, lri->lri_name))
                        continue;
                if (*lri->lri_exp != NULL)
                        continue;
                *lri->lri_exp = class_export_get(exp);
+               atomic_inc(&lri->lri_ref);
+               spin_unlock(&lwp_register_list_lock);
+
                if (lri->lri_cb_func != NULL)
                        lri->lri_cb_func(lri->lri_cb_data);
                if (lri->lri_cb_func != NULL)
                        lri->lri_cb_func(lri->lri_cb_data);
+               lustre_put_lwp_item(lri);
+
+               /* Others may have changed the list after we unlock, we have
+                * to rescan the list from the beginning. Usually, the list
+                * 'lwp_register_list' is very short, and there is 'guard'
+                * lri::lri_exp that will prevent the callback to be done
+                * repeatedly. So rescanning the list has no problem. */
+               goto again;
        }
        }
-       mutex_unlock(&lwp_register_list_lock);
+       spin_unlock(&lwp_register_list_lock);
 }
 EXPORT_SYMBOL(lustre_notify_lwp_list);
 
 }
 EXPORT_SYMBOL(lustre_notify_lwp_list);