X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqmt_lock.c;h=0e20f9c1cdc363b152c29b3f7cd0afb85d57756c;hp=a4086f6b52bcf1aff8b04ee70ebde50071ab688d;hb=546993d587c5fc380e9745eae98f863e02e68575;hpb=08aa217ce49aba1ded52e0f7adb8a607035123fd diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index a4086f6..0e20f9c 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -21,20 +21,19 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA +#include + #include +#include #include #include "qmt_internal.h" @@ -52,7 +51,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, struct obd_uuid *uuid; struct lquota_lvb *lvb; struct ldlm_resource *res = (*lockp)->l_resource; - int rc; + struct ldlm_reply *ldlm_rep; + int rc, lvb_len; ENTRY; req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA); @@ -79,6 +79,10 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, if (repbody == NULL) RETURN(err_serious(-EFAULT)); + ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + if (ldlm_rep == NULL) + RETURN(err_serious(-EFAULT)); + uuid = &(*lockp)->l_export->exp_client_uuid; switch (it->opc) { @@ -128,17 +132,23 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, break; default: - CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname, + CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname, it->opc); - GOTO(out, rc = err_serious(-EINVAL)); + GOTO(out, rc = -EINVAL); } /* on success, pack lvb in reply */ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); - ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp)); - EXIT; + lvb_len = ldlm_lvbo_size(*lockp); + lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len); + if (lvb_len < 0) + GOTO(out, rc = lvb_len); + + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER); out: - return rc; + ldlm_rep->lock_policy_res2 = clear_serious(rc); + EXIT; + return ELDLM_OK; } /* @@ -150,7 +160,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) struct lu_env *env; struct qmt_thread_info *qti; struct qmt_device *qmt = lu2qmt_dev(ld); - int pool_id, pool_type, qtype; + int pool_type, qtype; int rc; ENTRY; @@ -163,25 +173,17 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) RETURN(0); - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* extract global index FID and quota identifier */ - fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id); + fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name); /* sanity check the global index FID */ - rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype); + rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype); if (rc) { - CERROR("can't extract pool information from FID "DFID"\n", + CERROR("can't extract glb index information from FID "DFID"\n", PFID(&qti->qti_fid)); GOTO(out, rc); } @@ -192,7 +194,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) struct lquota_entry *lqe; /* Find the quota entry associated with the quota id */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype, + lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype, &qti->qti_id); if (IS_ERR(lqe)) GOTO(out, rc = PTR_ERR(lqe)); @@ -208,7 +210,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); if (!dt_object_exists(obj)) { - lu_object_put(env, &obj->do_lu); + dt_object_put(env, obj); GOTO(out, rc = -ENOENT); } @@ -217,11 +219,9 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid)); } - res->lr_lvb_len = sizeof(struct lquota_lvb); + res->lr_lvb_len = sizeof(struct lquota_lvb); EXIT; out: - lu_env_fini(env); - OBD_FREE_PTR(env); return rc; } @@ -264,7 +264,7 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, LASSERT(lqe != NULL); lqe_getref(lqe); - LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64, + LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu", lvb->lvb_id_rel, lvb->lvb_id_may_rel); if (lvb->lvb_id_rel == 0) { @@ -272,18 +272,12 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (lvb->lvb_id_may_rel != 0) /* but might still release later ... */ lqe->lqe_may_rel += lvb->lvb_id_may_rel; - GOTO(out_lqe, rc = 0); + GOTO(out, rc = 0); } /* allocate environement */ - OBD_ALLOC_PTR(env); - if (env == NULL) - GOTO(out_lqe, rc = -ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) - GOTO(out_env, rc); + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* The request is a glimpse callback which was sent via the @@ -297,14 +291,14 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (IS_ERR(lock)) { CERROR("%s: failed to get lock from request!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = PTR_ERR(lock)); + GOTO(out, rc = PTR_ERR(lock)); } exp = class_export_get(lock->l_export); if (exp == NULL) { CERROR("%s: failed to get export from lock!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = -EFAULT); + GOTO(out, rc = -EFAULT); } /* release quota space */ @@ -312,17 +306,13 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body); if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel) LQUOTA_ERROR(lqe, "failed to release quota space on glimpse " - LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count, + "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count, lvb->lvb_id_rel, rc); class_export_put(exp); if (rc) - GOTO(out_env_init, rc); + GOTO(out, rc); EXIT; -out_env_init: - lu_env_fini(env); -out_env: - OBD_FREE_PTR(env); -out_lqe: +out: lqe_putref(lqe); return rc; } @@ -367,26 +357,10 @@ int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb, lqe_putref(lqe); } else { /* global quota lock */ - struct lu_env *env; - int rc; - struct dt_object *obj = res->lr_lvb_data; - - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } + struct dt_object *obj = res->lr_lvb_data; /* return current version of global index */ - qlvb->lvb_glb_ver = dt_version_get(env, obj); - - lu_env_fini(env); - OBD_FREE_PTR(env); + qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj); } RETURN(sizeof(struct lquota_lvb)); @@ -410,36 +384,103 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) /* release lqe reference */ lqe_putref(lqe); } else { - struct dt_object *obj = res->lr_lvb_data; - struct lu_env *env; - int rc; + struct dt_object *obj = res->lr_lvb_data; + /* release object reference */ + dt_object_put(lu_env_find(), obj); + } - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); + res->lr_lvb_data = NULL; + res->lr_lvb_len = 0; + + RETURN(0); +} + +typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *); + +struct qmt_gl_lock_array { + unsigned long q_max; + unsigned long q_cnt; + struct ldlm_lock **q_locks; +}; + +static void qmt_free_lock_array(struct qmt_gl_lock_array *array) +{ + int i; + + if (array->q_max == 0) { + LASSERT(array->q_locks == NULL); + return; + } + + for (i = 0; i < array->q_cnt; i++) { + LASSERT(array->q_locks[i]); + LDLM_LOCK_RELEASE(array->q_locks[i]); + array->q_locks[i] = NULL; + } + array->q_cnt = 0; + OBD_FREE(array->q_locks, array->q_max * sizeof(*array->q_locks)); + array->q_locks = NULL; + array->q_max = 0; +} - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); +static int qmt_alloc_lock_array(struct ldlm_resource *res, + struct qmt_gl_lock_array *array, + qmt_glimpse_cb_t cb, void *arg) +{ + struct list_head *pos; + unsigned long count = 0; + int fail_cnt = 0; + ENTRY; + + LASSERT(!array->q_max && !array->q_cnt && !array->q_locks); +again: + lock_res(res); + /* scan list of granted locks */ + list_for_each(pos, &res->lr_granted) { + struct ldlm_lock *lock; + int rc; + + lock = list_entry(pos, struct ldlm_lock, l_res_link); + LASSERT(lock->l_export); + + if (cb != NULL) { + rc = cb(lock, arg); + /* slave should not be notified */ + if (rc == 0) + continue; } - /* release object reference */ - lu_object_put(env, &obj->do_lu); - lu_env_fini(env); - OBD_FREE_PTR(env); + count++; + if (array->q_max != 0 && array->q_cnt < array->q_max) { + array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock); + array->q_cnt++; + } } + unlock_res(res); - res->lr_lvb_data = NULL; - res->lr_lvb_len = 0; + if (count > array->q_max) { + qmt_free_lock_array(array); + if (++fail_cnt > 5) + RETURN(-EAGAIN); + /* + * allocate more slots in case of more qualified locks are + * found during next loop + */ + array->q_max = count + count / 2 + 10; + count = 0; + LASSERT(array->q_locks == NULL && array->q_cnt == 0); + OBD_ALLOC(array->q_locks, + sizeof(*array->q_locks) * array->q_max); + if (array->q_locks == NULL) { + array->q_max = 0; + RETURN(-ENOMEM); + } + goto again; + } RETURN(0); } -typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *, - struct obd_uuid *, union ldlm_gl_desc *, - void *); /* * Send glimpse callback to slaves holding a lock on resource \res. * This is used to notify slaves of new quota settings or to claim quota space @@ -457,51 +498,43 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, struct ldlm_resource *res, union ldlm_gl_desc *desc, qmt_glimpse_cb_t cb, void *arg) { - cfs_list_t *tmp, *pos; - CFS_LIST_HEAD(gl_list); - int rc = 0; + struct list_head *tmp, *pos; + LIST_HEAD(gl_list); + struct qmt_gl_lock_array locks; + unsigned long i; + int rc = 0; ENTRY; - lock_res(res); - /* scan list of granted locks */ - cfs_list_for_each(pos, &res->lr_granted) { - struct ldlm_glimpse_work *work; - struct ldlm_lock *lock; - struct obd_uuid *uuid; - - lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link); - LASSERT(lock->l_export); - uuid = &lock->l_export->exp_client_uuid; + memset(&locks, 0, sizeof(locks)); + rc = qmt_alloc_lock_array(res, &locks, cb, arg); + if (rc) { + CERROR("%s: failed to allocate glimpse lock array (%d)\n", + qmt->qmt_svname, rc); + RETURN(rc); + } - if (cb != NULL) { - rc = cb(env, qmt, uuid, desc, arg); - if (rc == 0) - /* slave should not be notified */ - continue; - if (rc < 0) - /* something wrong happened, we still notify */ - CERROR("%s: callback function failed to " - "determine whether slave %s should be " - "notified (%d)\n", qmt->qmt_svname, - obd_uuid2str(uuid), rc); - } + for (i = locks.q_cnt; i > 0; i--) { + struct ldlm_glimpse_work *work; OBD_ALLOC_PTR(work); if (work == NULL) { - CERROR("%s: failed to notify %s\n", qmt->qmt_svname, - obd_uuid2str(uuid)); + CERROR("%s: failed to notify a lock.\n", + qmt->qmt_svname); continue; } - cfs_list_add_tail(&work->gl_list, &gl_list); - work->gl_lock = LDLM_LOCK_GET(lock); + list_add_tail(&work->gl_list, &gl_list); + work->gl_lock = locks.q_locks[i - 1]; work->gl_flags = 0; work->gl_desc = desc; + locks.q_locks[i - 1] = NULL; + locks.q_cnt--; } - unlock_res(res); - if (cfs_list_empty(&gl_list)) { + qmt_free_lock_array(&locks); + + if (list_empty(&gl_list)) { CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname); RETURN(0); } @@ -509,12 +542,12 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, /* issue glimpse callbacks to all connected slaves */ rc = ldlm_glimpse_locks(res, &gl_list); - cfs_list_for_each_safe(pos, tmp, &gl_list) { + list_for_each_safe(pos, tmp, &gl_list) { struct ldlm_glimpse_work *work; - work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list); + work = list_entry(pos, struct ldlm_glimpse_work, gl_list); - cfs_list_del(&work->gl_list); + list_del(&work->gl_list); CERROR("%s: failed to notify %s of new quota settings\n", qmt->qmt_svname, obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid)); @@ -539,25 +572,32 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, struct qmt_thread_info *qti = qmt_info(env); struct qmt_pool_info *pool = lqe2qpi(lqe); struct ldlm_resource *res = NULL; - int rc; ENTRY; - lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, - pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); + lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, + lqe->lqe_site->lqs_qtype); /* send glimpse callback to notify slaves of new quota settings */ qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id; qti->qti_gl_desc.lquota_desc.gl_flags = 0; - qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; - qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; - qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime; + if (lqe->lqe_is_default) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_softlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_DEFAULT); + + } else { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime; + } qti->qti_gl_desc.lquota_desc.gl_ver = ver; /* look up ldlm resource associated with global index */ fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid); res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN, 0); - if (res == NULL) { + if (IS_ERR(res)) { /* this might happen if no slaves have enqueued global quota * locks yet */ LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated " @@ -565,19 +605,18 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, RETURN_EXIT; } - rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc, - NULL, NULL); + qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc, + NULL, NULL); ldlm_resource_putref(res); EXIT; } /* Callback function used to select locks that should be glimpsed when * broadcasting the new qunit value */ -static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt, - struct obd_uuid *uuid, union ldlm_gl_desc *desc, - void *arg) +static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg) { struct obd_uuid *slv_uuid = arg; + struct obd_uuid *uuid = &lock->l_export->exp_client_uuid; if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid)) RETURN(0); @@ -599,18 +638,17 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, struct qmt_thread_info *qti = qmt_info(env); struct qmt_pool_info *pool = lqe2qpi(lqe); struct ldlm_resource *res = NULL; - int rc; ENTRY; if (!lqe->lqe_enforced) RETURN_EXIT; - lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, - pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); - fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); + lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, + lqe->lqe_site->lqs_qtype); + fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN, 0); - if (res == NULL) { + if (IS_ERR(res)) { /* this might legitimately happens if slaves haven't had the * opportunity to enqueue quota lock yet. */ LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID " @@ -618,7 +656,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, lqe_write_lock(lqe); if (lqe->lqe_revoke_time == 0 && lqe->lqe_qunit == pool->qpi_least_qunit) - lqe->lqe_revoke_time = cfs_time_current_64(); + lqe->lqe_revoke_time = ktime_get_seconds(); lqe_write_unlock(lqe); RETURN_EXIT; } @@ -649,15 +687,15 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, lqe_write_unlock(lqe); /* issue glimpse callback to slaves */ - rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, - uuid ? qmt_id_lock_cb : NULL, (void *)uuid); + qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, + uuid ? qmt_id_lock_cb : NULL, (void *)uuid); lqe_write_lock(lqe); if (lqe->lqe_revoke_time == 0 && qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit && lqe->lqe_qunit == pool->qpi_least_qunit) { - lqe->lqe_revoke_time = cfs_time_current_64(); - qmt_adjust_edquot(lqe, cfs_time_current_sec()); + lqe->lqe_revoke_time = ktime_get_seconds(); + qmt_adjust_edquot(lqe, ktime_get_real_seconds()); } LASSERT(lqe->lqe_gl); lqe->lqe_gl = false; @@ -681,14 +719,14 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe) lqe_getref(lqe); spin_lock(&qmt->qmt_reba_lock); - if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) { - cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list); + if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) { + list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list); added = true; } spin_unlock(&qmt->qmt_reba_lock); if (added) - cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq); + wake_up(&qmt->qmt_reba_thread.t_ctl_waitq); else lqe_putref(lqe); EXIT; @@ -708,39 +746,38 @@ static int qmt_reba_thread(void *arg) { struct qmt_device *qmt = (struct qmt_device *)arg; struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; - struct l_wait_info lwi = { 0 }; struct lu_env *env; struct lquota_entry *lqe, *tmp; - char pname[MTI_NAME_MAXLEN]; int rc; ENTRY; OBD_ALLOC_PTR(env); - if (env == NULL) + if (env == NULL) { + thread_set_flags(thread, SVC_STOPPED); RETURN(-ENOMEM); + } rc = lu_env_init(env, LCT_MD_THREAD); if (rc) { CERROR("%s: failed to init env.", qmt->qmt_svname); - OBD_FREE_PTR(env); - RETURN(rc); + GOTO(out_env, rc); } - - snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname); - cfs_daemonize(pname); + rc = lu_env_add(env); + if (rc) + GOTO(out_env_fini, rc); thread_set_flags(thread, SVC_RUNNING); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); while (1) { - l_wait_event(thread->t_ctl_waitq, - !cfs_list_empty(&qmt->qmt_reba_list) || - !thread_is_running(thread), &lwi); + wait_event_idle(thread->t_ctl_waitq, + !list_empty(&qmt->qmt_reba_list) || + !thread_is_running(thread)); spin_lock(&qmt->qmt_reba_lock); - cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list, - lqe_link) { - cfs_list_del_init(&lqe->lqe_link); + list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list, + lqe_link) { + list_del_init(&lqe->lqe_link); spin_unlock(&qmt->qmt_reba_lock); if (thread_is_running(thread)) @@ -754,10 +791,13 @@ static int qmt_reba_thread(void *arg) if (!thread_is_running(thread)) break; } + lu_env_remove(env); +out_env_fini: lu_env_fini(env); +out_env: OBD_FREE_PTR(env); thread_set_flags(thread, SVC_STOPPED); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); RETURN(rc); } @@ -767,21 +807,20 @@ static int qmt_reba_thread(void *arg) int qmt_start_reba_thread(struct qmt_device *qmt) { struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; - struct l_wait_info lwi = { 0 }; - int rc; + struct task_struct *task; ENTRY; - rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0); - if (rc < 0) { - CERROR("%s: failed to start rebalance thread (%d)\n", - qmt->qmt_svname, rc); + task = kthread_run(qmt_reba_thread, (void *)qmt, + "qmt_reba_%s", qmt->qmt_svname); + if (IS_ERR(task)) { + CERROR("%s: failed to start rebalance thread (%ld)\n", + qmt->qmt_svname, PTR_ERR(task)); thread_set_flags(thread, SVC_STOPPED); - RETURN(rc); + RETURN(PTR_ERR(task)); } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); + wait_event_idle(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread)); RETURN(0); } @@ -794,13 +833,11 @@ void qmt_stop_reba_thread(struct qmt_device *qmt) struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; if (!thread_is_stopped(thread)) { - struct l_wait_info lwi = { 0 }; thread_set_flags(thread, SVC_STOPPING); - cfs_waitq_signal(&thread->t_ctl_waitq); + wake_up(&thread->t_ctl_waitq); - l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), - &lwi); + wait_event_idle(thread->t_ctl_waitq, thread_is_stopped(thread)); } - LASSERT(cfs_list_empty(&qmt->qmt_reba_list)); + LASSERT(list_empty(&qmt->qmt_reba_list)); }