X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqmt_lock.c;h=f9818380bac8dd49f49c780429fe8998ccf3d445;hp=05c60b0ce7a49b685c1c5f9634dc790e66ff5f73;hb=HEAD;hpb=df66537682aef976102a5b986ec2830f8d00b0c6 diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 05c60b0..d847529 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -21,24 +21,26 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA +#include +#include + #include +#include #include #include "qmt_internal.h" +struct workqueue_struct *qmt_lvbo_free_wq; + /* intent policy function called from mdt_intent_opc() when the intent is of * quota type */ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, @@ -52,6 +54,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, struct obd_uuid *uuid; struct lquota_lvb *lvb; struct ldlm_resource *res = (*lockp)->l_resource; + struct ldlm_reply *ldlm_rep; int rc, lvb_len; ENTRY; @@ -79,12 +82,17 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, if (repbody == NULL) RETURN(err_serious(-EFAULT)); + ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + if (ldlm_rep == NULL) + RETURN(err_serious(-EFAULT)); + uuid = &(*lockp)->l_export->exp_client_uuid; switch (it->opc) { case IT_QUOTA_DQACQ: { struct lquota_entry *lqe; struct ldlm_lock *lock; + int idx, stype; if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0) /* acquire on global lock? something is wrong ... */ @@ -99,15 +107,32 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, GOTO(out, rc = -ENOLCK); LDLM_LOCK_PUT(lock); + stype = qmt_uuid2idx(uuid, &idx); + if (stype < 0) + GOTO(out, rc = -EINVAL); + + /* TODO: it seems we don't need to get lqe from + * lq_lvb_data anymore ... And do extra get + * and put on it */ lqe = res->lr_lvb_data; LASSERT(lqe != NULL); lqe_getref(lqe); + rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype, + lqe_qtype(lqe), &reqbody->qb_id, + NULL, idx); + if (rc) { + lqe_putref(lqe); + GOTO(out, rc); + } + /* acquire quota space */ - rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags, - reqbody->qb_count, reqbody->qb_usage, - repbody); + rc = qmt_dqacq0(env, qmt, uuid, + reqbody->qb_flags, reqbody->qb_count, + reqbody->qb_usage, repbody, + qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx); lqe_putref(lqe); + qti_lqes_fini(env); if (rc) GOTO(out, rc); break; @@ -128,19 +153,23 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, break; default: - CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname, + CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname, it->opc); - GOTO(out, rc = err_serious(-EINVAL)); + GOTO(out, rc = -EINVAL); } /* on success, pack lvb in reply */ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); lvb_len = ldlm_lvbo_size(*lockp); - lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len); + lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len); + if (lvb_len < 0) + GOTO(out, rc = lvb_len); + req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER); - EXIT; out: - return rc; + ldlm_rep->lock_policy_res2 = clear_serious(rc); + EXIT; + return ELDLM_OK; } /* @@ -152,7 +181,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) struct lu_env *env; struct qmt_thread_info *qti; struct qmt_device *qmt = lu2qmt_dev(ld); - int pool_id, pool_type, qtype; + int pool_type, qtype; int rc; ENTRY; @@ -165,25 +194,17 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) RETURN(0); - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* extract global index FID and quota identifier */ - fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id); + fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name); /* sanity check the global index FID */ - rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype); + rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype); if (rc) { - CERROR("can't extract pool information from FID "DFID"\n", + CERROR("can't extract glb index information from FID "DFID"\n", PFID(&qti->qti_fid)); GOTO(out, rc); } @@ -191,16 +212,36 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { /* no ID quota lock associated with UID/GID 0 or with a seq 0, * we are thus dealing with an ID lock. */ + struct qmt_pool_info *pool; struct lquota_entry *lqe; + struct lqe_glbl_data *lgd; + + pool = qmt_pool_lookup_glb(env, qmt, pool_type); + if (IS_ERR(pool)) + GOTO(out, rc = -ENOMEM); /* Find the quota entry associated with the quota id */ - lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype, - &qti->qti_id); - if (IS_ERR(lqe)) + lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype, + &qti->qti_id, NULL); + if (IS_ERR(lqe)) { + qpi_putref(env, pool); GOTO(out, rc = PTR_ERR(lqe)); + } + + /* TODO: need something like qmt_extend_lqe_gd that has + * to be calledeach time when qpi_slv_nr is incremented */ + lgd = qmt_alloc_lqe_gd(pool, qtype); + if (!lgd) { + lqe_putref(lqe); + qpi_putref(env, pool); + GOTO(out, rc = -ENOMEM); + } + + qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type); /* store reference to lqe in lr_lvb_data */ res->lr_lvb_data = lqe; + qpi_putref(env, pool); LQUOTA_DEBUG(lqe, "initialized res lvb"); } else { struct dt_object *obj; @@ -210,7 +251,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); if (!dt_object_exists(obj)) { - lu_object_put(env, &obj->do_lu); + dt_object_put(env, obj); GOTO(out, rc = -ENOENT); } @@ -219,14 +260,88 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res) CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid)); } - res->lr_lvb_len = sizeof(struct lquota_lvb); + res->lr_lvb_len = sizeof(struct lquota_lvb); EXIT; out: - lu_env_fini(env); - OBD_FREE_PTR(env); return rc; } +/* clear lge_qunit/edquot_nu flags - + * slave recieved new qunit and edquot. + * + * \retval true if revoke is needed - qunit + * for this slave reaches least_qunit + */ +static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx) +{ + unsigned long least = lqe2qpi(lqe)->qpi_least_qunit; + bool revoke = false; + + /* There is no array to store lge for the case of DOM. + * Ignore it until MDT pools will be ready. + */ + if (!qmt_dom(lqe_rtype(lqe), stype)) { + struct lqe_glbl_data *lgd; + + mutex_lock(&lqe->lqe_glbl_data_lock); + lgd = lqe->lqe_glbl_data; + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); + + lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0; + lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0; + /* We shouldn't call revoke for DOM case, it will be + * updated at qmt_id_lock_glimpse. + */ + revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); + } + + return revoke; +} + +static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl, + int stype, int idx) +{ + unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit; + bool notify = false; + + if (qmt_dom(lqe_rtype(lqe_gl), stype)) + return false; + + qti_lqes_write_lock(env); + mutex_lock(&lqe_gl->lqe_glbl_data_lock); + if (lqe_gl->lqe_glbl_data) { + struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; + int lge_idx; + + lge_idx = qmt_map_lge_idx(lgd, idx); + if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) { + struct lquota_entry *lqe; + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + lqe = qti_lqes(env)[i]; + LQUOTA_DEBUG(lqe, + "lge_qunit %llu least_qunit %lu idx %d\n", + lgd->lqeg_arr[lge_idx].lge_qunit, + least_qunit, idx); + if (lqe->lqe_qunit == least_qunit) { + lqe->lqe_revoke_time = + ktime_get_seconds(); + notify |= qmt_adjust_edquot(lqe, + ktime_get_real_seconds()); + } + } + } + } + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + qti_lqes_write_unlock(env); + + return notify; +} + /* * Update LVB associated with the global quota index. * This function is called from the DLM itself after a glimpse callback, in this @@ -242,7 +357,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, struct lquota_lvb *lvb; struct ldlm_lock *lock; struct obd_export *exp; - int rc = 0; + bool need_revoke; + int rc = 0, idx, stype; ENTRY; LASSERT(res != NULL); @@ -266,26 +382,9 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, LASSERT(lqe != NULL); lqe_getref(lqe); - LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64, - lvb->lvb_id_rel, lvb->lvb_id_may_rel); - - if (lvb->lvb_id_rel == 0) { - /* nothing to release */ - if (lvb->lvb_id_may_rel != 0) - /* but might still release later ... */ - lqe->lqe_may_rel += lvb->lvb_id_may_rel; - GOTO(out_lqe, rc = 0); - } - /* allocate environement */ - OBD_ALLOC_PTR(env); - if (env == NULL) - GOTO(out_lqe, rc = -ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) - GOTO(out_env, rc); + env = lu_env_find(); + LASSERT(env); qti = qmt_info(env); /* The request is a glimpse callback which was sent via the @@ -299,32 +398,71 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (IS_ERR(lock)) { CERROR("%s: failed to get lock from request!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = PTR_ERR(lock)); + GOTO(out, rc = PTR_ERR(lock)); } exp = class_export_get(lock->l_export); if (exp == NULL) { CERROR("%s: failed to get export from lock!\n", qmt->qmt_svname); - GOTO(out_env_init, rc = -EFAULT); + GOTO(out, rc = -EFAULT); } - /* release quota space */ - rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid, - QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body); - if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel) - LQUOTA_ERROR(lqe, "failed to release quota space on glimpse " - LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count, - lvb->lvb_id_rel, rc); - class_export_put(exp); + stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx); + if (stype < 0) + GOTO(out_exp, rc = stype); + + need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx); + if (lvb->lvb_id_rel == 0) { + /* nothing to release */ + if (lvb->lvb_id_may_rel != 0) + /* but might still release later ... */ + lqe->lqe_may_rel += lvb->lvb_id_may_rel; + } + + if (!need_revoke && lvb->lvb_id_rel == 0) + GOTO(out_exp, rc = 0); + + rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype, + lqe_qtype(lqe), &lqe->lqe_id, NULL, idx); if (rc) - GOTO(out_env_init, rc); + GOTO(out_exp, rc); + + if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) { + int notify = false; + + mutex_lock(&lqe->lqe_glbl_data_lock); + if (lqe->lqe_glbl_data) { + qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); + notify = true; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); + if (notify) + qmt_id_lock_notify(qmt, lqe); + } + + if (lvb->lvb_id_rel) { + LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu", + lvb->lvb_id_rel, lvb->lvb_id_may_rel); + + /* release quota space */ + rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid, + QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, + 0, &qti->qti_body, + qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx); + if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel) + LQUOTA_ERROR(lqe, + "failed to release quota space on glimpse %llu!=%llu : rc = %d\n", + qti->qti_body.qb_count, + lvb->lvb_id_rel, rc); + } + qti_lqes_fini(env); + if (rc) + GOTO(out_exp, rc); EXIT; -out_env_init: - lu_env_fini(env); -out_env: - OBD_FREE_PTR(env); -out_lqe: +out_exp: + class_export_put(exp); +out: lqe_putref(lqe); return rc; } @@ -345,53 +483,60 @@ int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock) int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb, int lvblen) { - struct ldlm_resource *res = lock->l_resource; - struct lquota_lvb *qlvb = lvb; + struct ldlm_resource *res = lock->l_resource; + struct lquota_lvb *qlvb = lvb; + struct lu_env *env; + int rc; ENTRY; LASSERT(res != NULL); + rc = 0; if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL || res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB) RETURN(-EINVAL); + env = lu_env_find(); + LASSERT(env); + if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { /* no ID quota lock associated with UID/GID 0 or with a seq 0, * we are thus dealing with an ID lock. */ - struct lquota_entry *lqe = res->lr_lvb_data; - + struct lquota_entry *lqe = res->lr_lvb_data; + struct qmt_device *qmt; + struct obd_uuid *uuid; + int idx; + + uuid = &(lock)->l_export->exp_client_uuid; + rc = qmt_uuid2idx(uuid, &idx); + if (rc < 0) + RETURN(rc); + qmt = lu2qmt_dev(ld); /* return current qunit value & edquot flags in lvb */ lqe_getref(lqe); - qlvb->lvb_id_qunit = lqe->lqe_qunit; - qlvb->lvb_flags = 0; - if (lqe->lqe_edquot) - qlvb->lvb_flags = LQUOTA_FL_EDQUOT; + rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, + lqe_qtype(lqe), &lqe->lqe_id, + NULL, idx); + if (!rc) { + qlvb->lvb_id_qunit = qti_lqes_min_qunit(env); + qlvb->lvb_flags = 0; + if (qti_lqes_edquot(env)) + qlvb->lvb_flags = LQUOTA_FL_EDQUOT; + qti_lqes_fini(env); + } + CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n", + (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid, + qlvb->lvb_flags, qlvb->lvb_id_qunit); lqe_putref(lqe); } else { /* global quota lock */ - struct lu_env *env; - int rc; - struct dt_object *obj = res->lr_lvb_data; - - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } + struct dt_object *obj = res->lr_lvb_data; /* return current version of global index */ qlvb->lvb_glb_ver = dt_version_get(env, obj); - - lu_env_fini(env); - OBD_FREE_PTR(env); } - RETURN(sizeof(struct lquota_lvb)); + RETURN(rc = rc ?: sizeof(struct lquota_lvb)); } /* @@ -407,30 +552,13 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) RETURN(0); if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { - struct lquota_entry *lqe = res->lr_lvb_data; + struct lquota_entry *lqe = res->lr_lvb_data; - /* release lqe reference */ - lqe_putref(lqe); + queue_work(qmt_lvbo_free_wq, &lqe->lqe_work); } else { - struct dt_object *obj = res->lr_lvb_data; - struct lu_env *env; - int rc; - - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_LOCAL); - if (rc) { - OBD_FREE_PTR(env); - RETURN(rc); - } - + struct dt_object *obj = res->lr_lvb_data; /* release object reference */ - lu_object_put(env, &obj->do_lu); - lu_env_fini(env); - OBD_FREE_PTR(env); + dt_object_put(lu_env_find(), obj); } res->lr_lvb_data = NULL; @@ -439,9 +567,136 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) RETURN(0); } -typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *, - struct obd_uuid *, union ldlm_gl_desc *, - void *); +typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *); + +struct qmt_gl_lock_array { + unsigned long q_max; + unsigned long q_cnt; + struct ldlm_lock **q_locks; +}; + +static void qmt_free_lock_array(struct qmt_gl_lock_array *array) +{ + int i; + + if (array->q_max == 0) { + LASSERT(array->q_locks == NULL); + return; + } + + for (i = 0; i < array->q_cnt; i++) { + LASSERT(array->q_locks[i]); + LDLM_LOCK_RELEASE(array->q_locks[i]); + array->q_locks[i] = NULL; + } + array->q_cnt = 0; + OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max); + array->q_locks = NULL; + array->q_max = 0; +} + +static int qmt_alloc_lock_array(struct ldlm_resource *res, + struct qmt_gl_lock_array *array, + qmt_glimpse_cb_t cb, void *arg) +{ + struct lquota_entry *lqe = arg; + struct list_head *pos; + unsigned long count = 0; + int fail_cnt = 0; + ENTRY; + + LASSERT(!array->q_max && !array->q_cnt && !array->q_locks); +again: + if (cb) + mutex_lock(&lqe->lqe_glbl_data_lock); + lock_res(res); + /* scan list of granted locks */ + list_for_each(pos, &res->lr_granted) { + struct ldlm_lock *lock; + int rc; + + lock = list_entry(pos, struct ldlm_lock, l_res_link); + LASSERT(lock->l_export); + + if (cb != NULL) { + rc = cb(lock, arg); + /* slave should not be notified */ + if (rc == 0) + continue; + } + + count++; + if (array->q_max != 0 && array->q_cnt < array->q_max) { + array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock); + array->q_cnt++; + } + } + unlock_res(res); + if (cb) + mutex_unlock(&lqe->lqe_glbl_data_lock); + + if (count > array->q_max) { + qmt_free_lock_array(array); + if (++fail_cnt > 5) + RETURN(-EAGAIN); + /* + * allocate more slots in case of more qualified locks are + * found during next loop + */ + array->q_max = count + count / 2 + 10; + count = 0; + LASSERT(array->q_locks == NULL && array->q_cnt == 0); + OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max); + if (array->q_locks == NULL) { + array->q_max = 0; + RETURN(-ENOMEM); + } + + goto again; + } + RETURN(0); +} + +static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, + struct lquota_entry *lqe) +{ + struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; + int idx, stype; + __u64 qunit; + bool edquot; + + stype = qmt_uuid2idx(uuid, &idx); + LASSERT(stype >= 0); + + /* DOM case - set global lqe settings */ + if (qmt_dom(lqe_rtype(lqe), stype)) { + edquot = lqe->lqe_edquot; + qunit = lqe->lqe_qunit; + } else { + struct lqe_glbl_data *lgd; + int lge_idx; + + mutex_lock(&lqe->lqe_glbl_data_lock); + lgd = lqe->lqe_glbl_data; + if (lgd) { + lge_idx = qmt_map_lge_idx(lgd, idx); + edquot = lgd->lqeg_arr[lge_idx].lge_edquot; + qunit = lgd->lqeg_arr[lge_idx].lge_qunit; + } else { + edquot = lqe->lqe_edquot; + qunit = lqe->lqe_qunit; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); + } + + /* fill glimpse descriptor with lqe settings */ + desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0; + desc->lquota_desc.gl_qunit = qunit; + CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n", + stype, idx, desc->lquota_desc.gl_flags, + desc->lquota_desc.gl_qunit); +} + /* * Send glimpse callback to slaves holding a lock on resource \res. * This is used to notify slaves of new quota settings or to claim quota space @@ -457,72 +712,101 @@ typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *, */ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, struct ldlm_resource *res, union ldlm_gl_desc *desc, - qmt_glimpse_cb_t cb, void *arg) + qmt_glimpse_cb_t cb, struct lquota_entry *lqe) { - cfs_list_t *tmp, *pos; - CFS_LIST_HEAD(gl_list); - int rc = 0; + union ldlm_gl_desc *descs = NULL; + struct list_head *tmp, *pos; + LIST_HEAD(gl_list); + struct qmt_gl_lock_array locks; + unsigned long i, locks_count; + int rc = 0; ENTRY; - lock_res(res); - /* scan list of granted locks */ - cfs_list_for_each(pos, &res->lr_granted) { - struct ldlm_glimpse_work *work; - struct ldlm_lock *lock; - struct obd_uuid *uuid; - - lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link); - LASSERT(lock->l_export); - uuid = &lock->l_export->exp_client_uuid; - - if (cb != NULL) { - rc = cb(env, qmt, uuid, desc, arg); - if (rc == 0) - /* slave should not be notified */ - continue; - if (rc < 0) - /* something wrong happened, we still notify */ - CERROR("%s: callback function failed to " - "determine whether slave %s should be " - "notified (%d)\n", qmt->qmt_svname, - obd_uuid2str(uuid), rc); + memset(&locks, 0, sizeof(locks)); + rc = qmt_alloc_lock_array(res, &locks, cb, lqe); + if (rc) { + CERROR("%s: failed to allocate glimpse lock array (%d)\n", + qmt->qmt_svname, rc); + RETURN(rc); + } + if (!locks.q_cnt) { + CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n", + qmt->qmt_svname); + RETURN(0); + } + CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt); + locks_count = locks.q_cnt; + + /* Use one desc for all works, when called from qmt_glb_lock_notify */ + if (cb && locks.q_cnt > 1) { + /* TODO: think about to store this preallocated descs + * in lqe_global in lqeg_arr as a part of lqe_glbl_entry. + * The benefit is that we don't need to allocate/free + * and setup this descs each time. But the drawback is + * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number). + * for examfple it could be 88 * 256 * 10 000 about 225 MB. */ + OBD_ALLOC(descs, + sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt); + if (!descs) { + CERROR("%s: alloc glimpse lock array failed: rc = %d\n", + qmt->qmt_svname, rc); + qmt_free_lock_array(&locks); + RETURN(-ENOMEM); } + } + + for (i = locks.q_cnt; i > 0; i--) { + struct ldlm_glimpse_work *work; OBD_ALLOC_PTR(work); if (work == NULL) { - CERROR("%s: failed to notify %s\n", qmt->qmt_svname, - obd_uuid2str(uuid)); + CERROR("%s: failed to notify a lock.\n", + qmt->qmt_svname); continue; } - cfs_list_add_tail(&work->gl_list, &gl_list); - work->gl_lock = LDLM_LOCK_GET(lock); + if (cb) { + if (descs) + desc = &descs[i - 1]; + qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe); + work->gl_interpret_data = lqe; + } + + list_add_tail(&work->gl_list, &gl_list); + work->gl_lock = locks.q_locks[i - 1]; work->gl_flags = 0; work->gl_desc = desc; + locks.q_locks[i - 1] = NULL; + locks.q_cnt--; } - unlock_res(res); - if (cfs_list_empty(&gl_list)) { + qmt_free_lock_array(&locks); + + if (list_empty(&gl_list)) { CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname); - RETURN(0); + GOTO(out, rc = 0); } /* issue glimpse callbacks to all connected slaves */ rc = ldlm_glimpse_locks(res, &gl_list); - cfs_list_for_each_safe(pos, tmp, &gl_list) { + list_for_each_safe(pos, tmp, &gl_list) { struct ldlm_glimpse_work *work; - work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list); + work = list_entry(pos, struct ldlm_glimpse_work, gl_list); - cfs_list_del(&work->gl_list); + list_del(&work->gl_list); CERROR("%s: failed to notify %s of new quota settings\n", qmt->qmt_svname, obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid)); LDLM_LOCK_RELEASE(work->gl_lock); OBD_FREE_PTR(work); } +out: + if (descs) + OBD_FREE(descs, + sizeof(struct ldlm_gl_lquota_desc) * locks_count); RETURN(rc); } @@ -541,25 +825,46 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, struct qmt_thread_info *qti = qmt_info(env); struct qmt_pool_info *pool = lqe2qpi(lqe); struct ldlm_resource *res = NULL; - int rc; ENTRY; - lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, - pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); + lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe)); /* send glimpse callback to notify slaves of new quota settings */ qti->qti_gl_desc.lquota_desc.gl_id = lqe->lqe_id; qti->qti_gl_desc.lquota_desc.gl_flags = 0; - qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; - qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; - qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime; + if (lqe->lqe_is_default) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_softlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_DEFAULT); + + } else if (lqe->lqe_is_deleted) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_softlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_DELETED); + } else if (lqe->lqe_is_reset) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_RESET); + } else if (lqe->lqe_granted > lqe->lqe_hardlimit) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_REVOKE); + } else { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime; + } qti->qti_gl_desc.lquota_desc.gl_ver = ver; /* look up ldlm resource associated with global index */ fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid); - res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid, + res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0); - if (res == NULL) { + if (IS_ERR(res)) { /* this might happen if no slaves have enqueued global quota * locks yet */ LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated " @@ -567,25 +872,45 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, RETURN_EXIT; } - rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc, - NULL, NULL); + qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc, + NULL, NULL); ldlm_resource_putref(res); EXIT; } /* Callback function used to select locks that should be glimpsed when * broadcasting the new qunit value */ -static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt, - struct obd_uuid *uuid, union ldlm_gl_desc *desc, - void *arg) +static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe) { - struct obd_uuid *slv_uuid = arg; + struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; + struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; + int idx; + int stype = qmt_uuid2idx(uuid, &idx); + + LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT); + + CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n", + stype, lqe_rtype(lqe), idx, uuid->uuid); + /* Quota pools support only OSTs, despite MDTs also could be registered + * as LQUOTA_RES_DT devices(DOM). */ + if (qmt_dom(lqe_rtype(lqe), stype)) + return 1; + + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); + + CDEBUG(D_QUOTA, + "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n", + idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu, + lgd->lqeg_arr[lge_idx].lge_qunit_nu); + return lgd->lqeg_arr[lge_idx].lge_edquot_nu || + lgd->lqeg_arr[lge_idx].lge_qunit_nu; + } - if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid)) - RETURN(0); - RETURN(+1); + return 0; } + /* * Send glimpse request on per-ID lock to push new qunit value to slave. * @@ -601,18 +926,15 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, struct qmt_thread_info *qti = qmt_info(env); struct qmt_pool_info *pool = lqe2qpi(lqe); struct ldlm_resource *res = NULL; - int rc; ENTRY; if (!lqe->lqe_enforced) RETURN_EXIT; - lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff, - pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype); - fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); - res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN, - 0); - if (res == NULL) { + lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe)); + fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); + res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0); + if (IS_ERR(res)) { /* this might legitimately happens if slaves haven't had the * opportunity to enqueue quota lock yet. */ LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID " @@ -620,51 +942,54 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, lqe_write_lock(lqe); if (lqe->lqe_revoke_time == 0 && lqe->lqe_qunit == pool->qpi_least_qunit) - lqe->lqe_revoke_time = cfs_time_current_64(); + lqe->lqe_revoke_time = ktime_get_seconds(); lqe_write_unlock(lqe); RETURN_EXIT; } lqe_write_lock(lqe); + /* + * It is possible to add an lqe in a 2nd time while the same lqe + * from the 1st time is still sending glimpse + */ + if (lqe->lqe_gl) + GOTO(out, 0); /* The purpose of glimpse callback on per-ID lock is twofold: * - notify slaves of new qunit value and hope they will release some * spare quota space in return * - notify slaves that master ran out of quota space and there is no * need to send acquire request any more until further notice */ - /* fill glimpse descriptor with lqe settings */ - if (lqe->lqe_edquot) - qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT; - else - qti->qti_gl_desc.lquota_desc.gl_flags = 0; - qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit; - + /* TODO: it is not clear how to implement below case for all lqes + * from where slaves will be notified in qmt_glimpse_lock. Because + * here we have just global lqe with an array of OSTs that should + * be notified. Theoretically we can find all lqes that includes + * these OSTs, but it is not trivial. So I would propose to move + * this case to another place ... */ if (lqe->lqe_revoke_time == 0 && - qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit) + lqe->lqe_qunit == pool->qpi_least_qunit) /* reset lqe_may_rel, it will be updated on glimpse callback * replies if needed */ lqe->lqe_may_rel = 0; - /* The rebalance thread is the only thread which can issue glimpses */ - LASSERT(!lqe->lqe_gl); lqe->lqe_gl = true; lqe_write_unlock(lqe); /* issue glimpse callback to slaves */ - rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, - uuid ? qmt_id_lock_cb : NULL, (void *)uuid); + if (lqe->lqe_glbl_data) + qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, + qmt_id_lock_cb, lqe); lqe_write_lock(lqe); if (lqe->lqe_revoke_time == 0 && - qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit && lqe->lqe_qunit == pool->qpi_least_qunit) { - lqe->lqe_revoke_time = cfs_time_current_64(); - qmt_adjust_edquot(lqe, cfs_time_current_sec()); + lqe->lqe_revoke_time = ktime_get_seconds(); + qmt_adjust_edquot(lqe, ktime_get_real_seconds()); } LASSERT(lqe->lqe_gl); lqe->lqe_gl = false; +out: lqe_write_unlock(lqe); - ldlm_resource_putref(res); EXIT; } @@ -681,21 +1006,32 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe) bool added = false; ENTRY; + LASSERT(lqe->lqe_is_global); lqe_getref(lqe); spin_lock(&qmt->qmt_reba_lock); - if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) { - cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list); + if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) { + list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list); added = true; + if (qmt->qmt_reba_task) + wake_up_process(qmt->qmt_reba_task); } spin_unlock(&qmt->qmt_reba_lock); - if (added) - cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq); - else + if (!added) lqe_putref(lqe); EXIT; } +struct qmt_reba_args { + struct qmt_device *qra_dev; + struct lu_env qra_env; + struct completion *qra_started; +}; + +#ifndef TASK_IDLE +#define TASK_IDLE TASK_INTERRUPTIBLE +#endif + /* * The rebalance thread is in charge of sending glimpse callbacks on per-ID * quota locks owned by slaves in order to notify them of: @@ -706,61 +1042,44 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe) * try to acquire quota from the master since this latter has already * distributed all the space. */ -static int qmt_reba_thread(void *arg) +static int qmt_reba_thread(void *_args) { - struct qmt_device *qmt = (struct qmt_device *)arg; - struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; - struct l_wait_info lwi = { 0 }; - struct lu_env *env; + struct qmt_reba_args *args = _args; + struct qmt_device *qmt = args->qra_dev; + struct lu_env *env = &args->qra_env; struct lquota_entry *lqe, *tmp; - char pname[MTI_NAME_MAXLEN]; - int rc; ENTRY; - OBD_ALLOC_PTR(env); - if (env == NULL) - RETURN(-ENOMEM); - - rc = lu_env_init(env, LCT_MD_THREAD); - if (rc) { - CERROR("%s: failed to init env.", qmt->qmt_svname); - OBD_FREE_PTR(env); - RETURN(rc); - } - - snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname); - cfs_daemonize(pname); - - thread_set_flags(thread, SVC_RUNNING); - cfs_waitq_signal(&thread->t_ctl_waitq); - - while (1) { - l_wait_event(thread->t_ctl_waitq, - !cfs_list_empty(&qmt->qmt_reba_list) || - !thread_is_running(thread), &lwi); + complete(args->qra_started); + while (({set_current_state(TASK_IDLE); + !kthread_should_stop(); })) { spin_lock(&qmt->qmt_reba_lock); - cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list, - lqe_link) { - cfs_list_del_init(&lqe->lqe_link); + list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list, + lqe_link) { + __set_current_state(TASK_RUNNING); + list_del_init(&lqe->lqe_link); spin_unlock(&qmt->qmt_reba_lock); - if (thread_is_running(thread)) + /* lqe_ref == 1 means we hold the last ref, + * so no need to send glimpse callbacks. + */ + if (!kthread_should_stop() && + atomic_read(&lqe->lqe_ref) > 1) qmt_id_lock_glimpse(env, qmt, lqe, NULL); lqe_putref(lqe); spin_lock(&qmt->qmt_reba_lock); } spin_unlock(&qmt->qmt_reba_lock); - - if (!thread_is_running(thread)) - break; + schedule(); } + __set_current_state(TASK_RUNNING); + + lu_env_remove(env); lu_env_fini(env); - OBD_FREE_PTR(env); - thread_set_flags(thread, SVC_STOPPED); - cfs_waitq_signal(&thread->t_ctl_waitq); - RETURN(rc); + OBD_FREE_PTR(args); + RETURN(0); } /* @@ -768,24 +1087,47 @@ static int qmt_reba_thread(void *arg) */ int qmt_start_reba_thread(struct qmt_device *qmt) { - struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; - struct l_wait_info lwi = { 0 }; - int rc; + struct task_struct *task; + struct qmt_reba_args *args; + DECLARE_COMPLETION_ONSTACK(started); + int rc; ENTRY; - rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0); - if (rc < 0) { - CERROR("%s: failed to start rebalance thread (%d)\n", - qmt->qmt_svname, rc); - thread_set_flags(thread, SVC_STOPPED); - RETURN(rc); + OBD_ALLOC_PTR(args); + if (args == NULL) + RETURN(-ENOMEM); + args->qra_dev = qmt; + args->qra_started = &started; + + rc = lu_env_init(&args->qra_env, LCT_MD_THREAD); + if (rc) { + CERROR("%s: failed to init env.\n", qmt->qmt_svname); + GOTO(out_env, rc); } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); + task = kthread_create(qmt_reba_thread, args, + "qmt_reba_%s", qmt->qmt_svname); + if (IS_ERR(task)) { + CERROR("%s: failed to start rebalance thread (%ld)\n", + qmt->qmt_svname, PTR_ERR(task)); + GOTO(out_env_fini, rc = PTR_ERR(task)); + } + + rc = lu_env_add_task(&args->qra_env, task); + if (rc) { + kthread_stop(task); + GOTO(out_env_fini, rc); + } + qmt->qmt_reba_task = task; + wake_up_process(task); + wait_for_completion(&started); RETURN(0); +out_env_fini: + lu_env_fini(&args->qra_env); +out_env: + OBD_FREE_PTR(args); + RETURN(rc); } /* @@ -793,16 +1135,15 @@ int qmt_start_reba_thread(struct qmt_device *qmt) */ void qmt_stop_reba_thread(struct qmt_device *qmt) { - struct ptlrpc_thread *thread = &qmt->qmt_reba_thread; + struct task_struct *task; - if (!thread_is_stopped(thread)) { - struct l_wait_info lwi = { 0 }; + spin_lock(&qmt->qmt_reba_lock); + task = qmt->qmt_reba_task; + qmt->qmt_reba_task = NULL; + spin_unlock(&qmt->qmt_reba_lock); - thread_set_flags(thread, SVC_STOPPING); - cfs_waitq_signal(&thread->t_ctl_waitq); + if (task) + kthread_stop(task); - l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), - &lwi); - } - LASSERT(cfs_list_empty(&qmt->qmt_reba_list)); + LASSERT(list_empty(&qmt->qmt_reba_list)); }