From: Niu Yawei Date: Mon, 15 Aug 2016 08:31:53 +0000 (-0400) Subject: LU-8491 quota: sleep while holding spinlock X-Git-Tag: 2.9.51~61 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6cb38c3a863993f3bba8332194c5ee8c939ad25d LU-8491 quota: sleep while holding spinlock Revise the memory allocation code in qmt_glimpse_lock() to avoid sleep while holding spinlock. Signed-off-by: Niu Yawei Change-Id: I75d5751910906984c31454d4567f58d769af5d51 Reviewed-on: https://review.whamcloud.com/21923 Reviewed-by: Fan Yong Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index be80dce..ff95837 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -446,9 +446,92 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) RETURN(0); } -typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *, - struct obd_uuid *, union ldlm_gl_desc *, - void *); +typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *); + +struct qmt_gl_lock_array { + unsigned long q_max; + unsigned long q_cnt; + struct ldlm_lock **q_locks; +}; + +static void qmt_free_lock_array(struct qmt_gl_lock_array *array) +{ + int i; + + if (array->q_max == 0) { + LASSERT(array->q_locks == NULL); + return; + } + + for (i = 0; i < array->q_cnt; i++) { + LASSERT(array->q_locks[i]); + LDLM_LOCK_RELEASE(array->q_locks[i]); + array->q_locks[i] = NULL; + } + array->q_cnt = 0; + OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks)); + array->q_locks = NULL; + array->q_max = 0; +} + +static int qmt_alloc_lock_array(struct ldlm_resource *res, + struct qmt_gl_lock_array *array, + qmt_glimpse_cb_t cb, void *arg) +{ + struct list_head *pos; + unsigned long count = 0; + int fail_cnt = 0; + ENTRY; + + LASSERT(!array->q_max && !array->q_cnt && !array->q_locks); +again: + lock_res(res); + /* scan list of granted locks */ + list_for_each(pos, &res->lr_granted) { + struct ldlm_lock *lock; + int rc; + + lock = list_entry(pos, struct ldlm_lock, l_res_link); + LASSERT(lock->l_export); + + if (cb != NULL) { + rc = cb(lock, arg); + /* slave should not be notified */ + if (rc == 0) + continue; + } + + count++; + if (array->q_max != 0 && array->q_cnt < array->q_max) { + array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock); + array->q_cnt++; + } + } + unlock_res(res); + + if (count > array->q_max) { + qmt_free_lock_array(array); + if (++fail_cnt > 5) + RETURN(-EAGAIN); + /* + * allocate more slots in case of more qualified locks are + * found during next loop + */ + array->q_max = count + count / 2 + 10; + count = 0; + LASSERT(array->q_locks == NULL && array->q_cnt == 0); + OBD_ALLOC(array->q_locks, + sizeof(*array->q_locks) * array->q_max); + if (array->q_locks == NULL) { + array->q_max = 0; + RETURN(-ENOMEM); + } + + goto again; + } + RETURN(0); +} + /* * Send glimpse callback to slaves holding a lock on resource \res. * This is used to notify slaves of new quota settings or to claim quota space @@ -468,47 +551,39 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, { struct list_head *tmp, *pos; struct list_head gl_list = LIST_HEAD_INIT(gl_list); - int rc = 0; + struct qmt_gl_lock_array locks; + unsigned long i; + int rc = 0; ENTRY; - lock_res(res); - /* scan list of granted locks */ - list_for_each(pos, &res->lr_granted) { - struct ldlm_glimpse_work *work; - struct ldlm_lock *lock; - struct obd_uuid *uuid; - - lock = list_entry(pos, struct ldlm_lock, l_res_link); - LASSERT(lock->l_export); - uuid = &lock->l_export->exp_client_uuid; + memset(&locks, 0, sizeof(locks)); + rc = qmt_alloc_lock_array(res, &locks, cb, arg); + if (rc) { + CERROR("%s: failed to allocate glimpse lock array (%d)\n", + qmt->qmt_svname, rc); + RETURN(rc); + } - if (cb != NULL) { - rc = cb(env, qmt, uuid, desc, arg); - if (rc == 0) - /* slave should not be notified */ - continue; - if (rc < 0) - /* something wrong happened, we still notify */ - CERROR("%s: callback function failed to " - "determine whether slave %s should be " - "notified (%d)\n", qmt->qmt_svname, - obd_uuid2str(uuid), rc); - } + for (i = locks.q_cnt; i > 0; i--) { + struct ldlm_glimpse_work *work; OBD_ALLOC_PTR(work); if (work == NULL) { - CERROR("%s: failed to notify %s\n", qmt->qmt_svname, - obd_uuid2str(uuid)); + CERROR("%s: failed to notify a lock.\n", + qmt->qmt_svname); continue; } list_add_tail(&work->gl_list, &gl_list); - work->gl_lock = LDLM_LOCK_GET(lock); + work->gl_lock = locks.q_locks[i - 1]; work->gl_flags = 0; work->gl_desc = desc; + locks.q_locks[i - 1] = NULL; + locks.q_cnt--; } - unlock_res(res); + + qmt_free_lock_array(&locks); if (list_empty(&gl_list)) { CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname); @@ -581,11 +656,10 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, /* Callback function used to select locks that should be glimpsed when * broadcasting the new qunit value */ -static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt, - struct obd_uuid *uuid, union ldlm_gl_desc *desc, - void *arg) +static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg) { struct obd_uuid *slv_uuid = arg; + struct obd_uuid *uuid = &lock->l_export->exp_client_uuid; if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid)) RETURN(0);