Originally, the lwp_notify_main() thread handled the every
registered lwp callback in turn with holding the global
lwp_register_list_lock. Because some lwp callback, such as
mdt_register_lwp_callback() may be blocked, then it will
cause others to be blocked when try to obtain such lock for
lustre_register_lwp_item(), and then block the mount.
This patch introduces reference count on the lwp_register_item
that will prevent the lwp item being freed during the callback,
so allow the lwp_notify_main() thread to do the real callback
without holding the global lock lwp_register_list_lock.
Intel-bug-Id: LDEV-323
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I1061205d5781e0699478d293e223567a0ebea286
Reviewed-on: http://review.whamcloud.com/19034
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
struct obd_export **lri_exp;
register_lwp_cb lri_cb_func;
void *lri_cb_data;
struct obd_export **lri_exp;
register_lwp_cb lri_cb_func;
void *lri_cb_data;
- struct list_head lri_list;
+ struct list_head lri_list;
+ atomic_t lri_ref;
char lri_name[MTI_NAME_MAXLEN];
};
char lri_name[MTI_NAME_MAXLEN];
};
static struct list_head lwp_register_list =
LIST_HEAD_INIT(lwp_register_list);
static struct list_head lwp_register_list =
LIST_HEAD_INIT(lwp_register_list);
-static DEFINE_MUTEX(lwp_register_list_lock);
+static DEFINE_SPINLOCK(lwp_register_list_lock);
+
+static void lustre_put_lwp_item(struct lwp_register_item *lri)
+{
+ if (atomic_dec_and_test(&lri->lri_ref)) {
+ LASSERT(list_empty(&lri->lri_list));
+
+ if (*lri->lri_exp != NULL)
+ class_export_put(*lri->lri_exp);
+ OBD_FREE_PTR(lri);
+ }
+}
int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
register_lwp_cb cb_func, void *cb_data)
int lustre_register_lwp_item(const char *lwpname, struct obd_export **exp,
register_lwp_cb cb_func, void *cb_data)
if (lri == NULL)
RETURN(-ENOMEM);
if (lri == NULL)
RETURN(-ENOMEM);
- mutex_lock(&lwp_register_list_lock);
-
lwp = class_name2obd(lwpname);
if (lwp != NULL && lwp->obd_set_up == 1) {
struct obd_uuid *uuid;
OBD_ALLOC_PTR(uuid);
if (uuid == NULL) {
lwp = class_name2obd(lwpname);
if (lwp != NULL && lwp->obd_set_up == 1) {
struct obd_uuid *uuid;
OBD_ALLOC_PTR(uuid);
if (uuid == NULL) {
- mutex_unlock(&lwp_register_list_lock);
OBD_FREE_PTR(lri);
RETURN(-ENOMEM);
}
OBD_FREE_PTR(lri);
RETURN(-ENOMEM);
}
lri->lri_cb_func = cb_func;
lri->lri_cb_data = cb_data;
INIT_LIST_HEAD(&lri->lri_list);
lri->lri_cb_func = cb_func;
lri->lri_cb_data = cb_data;
INIT_LIST_HEAD(&lri->lri_list);
+ /*
+ * Initialize the lri_ref at 2, one will be released before
+ * current function returned via lustre_put_lwp_item(), the
+ * other will be released in lustre_deregister_lwp_item().
+ */
+ atomic_set(&lri->lri_ref, 2);
+
+ spin_lock(&lwp_register_list_lock);
list_add(&lri->lri_list, &lwp_register_list);
list_add(&lri->lri_list, &lwp_register_list);
+ spin_unlock(&lwp_register_list_lock);
if (*exp != NULL && cb_func != NULL)
cb_func(cb_data);
if (*exp != NULL && cb_func != NULL)
cb_func(cb_data);
+ lustre_put_lwp_item(lri);
- mutex_unlock(&lwp_register_list_lock);
RETURN(0);
}
EXPORT_SYMBOL(lustre_register_lwp_item);
void lustre_deregister_lwp_item(struct obd_export **exp)
{
RETURN(0);
}
EXPORT_SYMBOL(lustre_register_lwp_item);
void lustre_deregister_lwp_item(struct obd_export **exp)
{
- struct lwp_register_item *lri, *tmp;
+ struct lwp_register_item *lri;
- mutex_lock(&lwp_register_list_lock);
- list_for_each_entry_safe(lri, tmp, &lwp_register_list, lri_list) {
+ spin_lock(&lwp_register_list_lock);
+ list_for_each_entry(lri, &lwp_register_list, lri_list) {
if (exp == lri->lri_exp) {
if (exp == lri->lri_exp) {
- if (*exp)
- class_export_put(*exp);
- list_del(&lri->lri_list);
- OBD_FREE_PTR(lri);
- break;
+ list_del_init(&lri->lri_list);
+ spin_unlock(&lwp_register_list_lock);
+
+ lustre_put_lwp_item(lri);
+ return;
- mutex_unlock(&lwp_register_list_lock);
+ spin_unlock(&lwp_register_list_lock);
}
EXPORT_SYMBOL(lustre_deregister_lwp_item);
}
EXPORT_SYMBOL(lustre_deregister_lwp_item);
void lustre_notify_lwp_list(struct obd_export *exp)
{
void lustre_notify_lwp_list(struct obd_export *exp)
{
- struct lwp_register_item *lri, *tmp;
+ struct lwp_register_item *lri;
- mutex_lock(&lwp_register_list_lock);
- list_for_each_entry_safe(lri, tmp, &lwp_register_list, lri_list) {
+again:
+ spin_lock(&lwp_register_list_lock);
+ list_for_each_entry(lri, &lwp_register_list, lri_list) {
if (strcmp(exp->exp_obd->obd_name, lri->lri_name))
continue;
if (*lri->lri_exp != NULL)
continue;
*lri->lri_exp = class_export_get(exp);
if (strcmp(exp->exp_obd->obd_name, lri->lri_name))
continue;
if (*lri->lri_exp != NULL)
continue;
*lri->lri_exp = class_export_get(exp);
+ atomic_inc(&lri->lri_ref);
+ spin_unlock(&lwp_register_list_lock);
+
if (lri->lri_cb_func != NULL)
lri->lri_cb_func(lri->lri_cb_data);
if (lri->lri_cb_func != NULL)
lri->lri_cb_func(lri->lri_cb_data);
+ lustre_put_lwp_item(lri);
+
+ /* Others may have changed the list after we unlock, we have
+ * to rescan the list from the beginning. Usually, the list
+ * 'lwp_register_list' is very short, and there is 'guard'
+ * lri::lri_exp that will prevent the callback to be done
+ * repeatedly. So rescanning the list has no problem. */
+ goto again;
- mutex_unlock(&lwp_register_list_lock);
+ spin_unlock(&lwp_register_list_lock);
}
EXPORT_SYMBOL(lustre_notify_lwp_list);
}
EXPORT_SYMBOL(lustre_notify_lwp_list);