X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqmt_lock.c;h=f9818380bac8dd49f49c780429fe8998ccf3d445;hp=c28b0b27e7fc970927fd842463066e6b26145a6a;hb=HEAD;hpb=d01cae3ec8758fc27cae91266465b8d627ed55dd diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index c28b0b2..d847529 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -31,6 +31,7 @@ #define DEBUG_SUBSYSTEM S_LQUOTA #include +#include #include #include @@ -38,6 +39,8 @@ #include "qmt_internal.h" +struct workqueue_struct *qmt_lvbo_free_wq; + /* intent policy function called from mdt_intent_opc() when the intent is of * quota type */ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, @@ -89,7 +92,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, case IT_QUOTA_DQACQ: { struct lquota_entry *lqe; struct ldlm_lock *lock; - int idx; + int idx, stype; if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0) /* acquire on global lock? something is wrong ... */ @@ -104,8 +107,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, GOTO(out, rc = -ENOLCK); LDLM_LOCK_PUT(lock); - rc = qmt_uuid2idx(uuid, &idx); - if (rc < 0) + stype = qmt_uuid2idx(uuid, &idx); + if (stype < 0) GOTO(out, rc = -EINVAL); /* TODO: it seems we don't need to get lqe from @@ -115,7 +118,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, LASSERT(lqe != NULL); lqe_getref(lqe); - rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, + rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype, lqe_qtype(lqe), &reqbody->qb_id, NULL, idx); if (rc) { @@ -126,7 +129,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, /* acquire quota space */ rc = qmt_dqacq0(env, qmt, uuid, reqbody->qb_flags, reqbody->qb_count, - reqbody->qb_usage, repbody); + reqbody->qb_usage, repbody, + qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx); lqe_putref(lqe); qti_lqes_fini(env); if (rc) @@ -270,47 +274,72 @@ out: */ static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx) { - unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; + unsigned long least = lqe2qpi(lqe)->qpi_least_qunit; + bool revoke = false; /* There is no array to store lge for the case of DOM. - * Ignore it until MDT pools will be ready. */ - if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) { - lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0; - lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0; - - /* We shouldn't call revoke for DOM case, it will be updated - * at qmt_id_lock_glimpse. */ - return (lgd->lqeg_arr[idx].lge_qunit == least_qunit); + * Ignore it until MDT pools will be ready. + */ + if (!qmt_dom(lqe_rtype(lqe), stype)) { + struct lqe_glbl_data *lgd; + + mutex_lock(&lqe->lqe_glbl_data_lock); + lgd = lqe->lqe_glbl_data; + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); + + lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0; + lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0; + /* We shouldn't call revoke for DOM case, it will be + * updated at qmt_id_lock_glimpse. + */ + revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); } - return false; + return revoke; } -static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe, +static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl, int stype, int idx) { - unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; - - if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) { - int i; - - qti_lqes_write_lock(env); - for (i = 0; i < qti_lqes_cnt(env); i++) { - LQUOTA_DEBUG(qti_lqes(env)[i], - "idx %d lge_qunit %llu least_qunit %lu\n", - idx, lgd->lqeg_arr[idx].lge_qunit, - least_qunit); - if (qti_lqes(env)[i]->lqe_qunit == least_qunit) { - qti_lqes(env)[i]->lqe_revoke_time = + unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit; + bool notify = false; + + if (qmt_dom(lqe_rtype(lqe_gl), stype)) + return false; + + qti_lqes_write_lock(env); + mutex_lock(&lqe_gl->lqe_glbl_data_lock); + if (lqe_gl->lqe_glbl_data) { + struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; + int lge_idx; + + lge_idx = qmt_map_lge_idx(lgd, idx); + if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) { + struct lquota_entry *lqe; + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + lqe = qti_lqes(env)[i]; + LQUOTA_DEBUG(lqe, + "lge_qunit %llu least_qunit %lu idx %d\n", + lgd->lqeg_arr[lge_idx].lge_qunit, + least_qunit, idx); + if (lqe->lqe_qunit == least_qunit) { + lqe->lqe_revoke_time = ktime_get_seconds(); - qmt_adjust_edquot(qti_lqes(env)[i], + notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds()); + } } } - qti_lqes_write_unlock(env); } + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + qti_lqes_write_unlock(env); + + return notify; } /* @@ -329,7 +358,7 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, struct ldlm_lock *lock; struct obd_export *exp; bool need_revoke; - int rc = 0, idx; + int rc = 0, idx, stype; ENTRY; LASSERT(res != NULL); @@ -379,11 +408,11 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, GOTO(out, rc = -EFAULT); } - rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx); - if (rc < 0) - GOTO(out_exp, rc); + stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx); + if (stype < 0) + GOTO(out_exp, rc = stype); - need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx); + need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx); if (lvb->lvb_id_rel == 0) { /* nothing to release */ if (lvb->lvb_id_may_rel != 0) @@ -394,13 +423,23 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, if (!need_revoke && lvb->lvb_id_rel == 0) GOTO(out_exp, rc = 0); - rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe), - &lqe->lqe_id, NULL, idx); + rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype, + lqe_qtype(lqe), &lqe->lqe_id, NULL, idx); if (rc) GOTO(out_exp, rc); - if (need_revoke) - qmt_set_revoke(env, lqe, rc, idx); + if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) { + int notify = false; + + mutex_lock(&lqe->lqe_glbl_data_lock); + if (lqe->lqe_glbl_data) { + qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); + notify = true; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); + if (notify) + qmt_id_lock_notify(qmt, lqe); + } if (lvb->lvb_id_rel) { LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu", @@ -409,7 +448,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, /* release quota space */ rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid, QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, - 0, &qti->qti_body); + 0, &qti->qti_body, + qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx); if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel) LQUOTA_ERROR(lqe, "failed to release quota space on glimpse %llu!=%llu : rc = %d\n", @@ -513,12 +553,8 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res) if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) { struct lquota_entry *lqe = res->lr_lvb_data; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; - /* release lqe reference */ - lqe->lqe_glbl_data = NULL; - lqe_putref(lqe); - qmt_free_lqe_gd(lgd); + queue_work(qmt_lvbo_free_wq, &lqe->lqe_work); } else { struct dt_object *obj = res->lr_lvb_data; /* release object reference */ @@ -563,6 +599,7 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res, struct qmt_gl_lock_array *array, qmt_glimpse_cb_t cb, void *arg) { + struct lquota_entry *lqe = arg; struct list_head *pos; unsigned long count = 0; int fail_cnt = 0; @@ -570,6 +607,8 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res, LASSERT(!array->q_max && !array->q_cnt && !array->q_locks); again: + if (cb) + mutex_lock(&lqe->lqe_glbl_data_lock); lock_res(res); /* scan list of granted locks */ list_for_each(pos, &res->lr_granted) { @@ -593,6 +632,8 @@ again: } } unlock_res(res); + if (cb) + mutex_unlock(&lqe->lqe_glbl_data_lock); if (count > array->q_max) { qmt_free_lock_array(array); @@ -616,11 +657,10 @@ again: RETURN(0); } -void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, - struct lquota_entry *lqe) +static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, + struct lquota_entry *lqe) { struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; int idx, stype; __u64 qunit; bool edquot; @@ -629,12 +669,24 @@ void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, LASSERT(stype >= 0); /* DOM case - set global lqe settings */ - if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) { + if (qmt_dom(lqe_rtype(lqe), stype)) { edquot = lqe->lqe_edquot; qunit = lqe->lqe_qunit; } else { - edquot = lgd->lqeg_arr[idx].lge_edquot; - qunit = lgd->lqeg_arr[idx].lge_qunit; + struct lqe_glbl_data *lgd; + int lge_idx; + + mutex_lock(&lqe->lqe_glbl_data_lock); + lgd = lqe->lqe_glbl_data; + if (lgd) { + lge_idx = qmt_map_lge_idx(lgd, idx); + edquot = lgd->lqeg_arr[lge_idx].lge_edquot; + qunit = lgd->lqeg_arr[lge_idx].lge_qunit; + } else { + edquot = lqe->lqe_edquot; + qunit = lqe->lqe_qunit; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); } /* fill glimpse descriptor with lqe settings */ @@ -663,7 +715,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, qmt_glimpse_cb_t cb, struct lquota_entry *lqe) { union ldlm_gl_desc *descs = NULL; - struct lqe_glbl_data *gld; struct list_head *tmp, *pos; LIST_HEAD(gl_list); struct qmt_gl_lock_array locks; @@ -671,7 +722,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, int rc = 0; ENTRY; - gld = lqe ? lqe->lqe_glbl_data : NULL; memset(&locks, 0, sizeof(locks)); rc = qmt_alloc_lock_array(res, &locks, cb, lqe); if (rc) { @@ -688,7 +738,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, locks_count = locks.q_cnt; /* Use one desc for all works, when called from qmt_glb_lock_notify */ - if (gld && locks.q_cnt > 1) { + if (cb && locks.q_cnt > 1) { /* TODO: think about to store this preallocated descs * in lqe_global in lqeg_arr as a part of lqe_glbl_entry. * The benefit is that we don't need to allocate/free @@ -715,7 +765,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt, continue; } - if (gld) { + if (cb) { if (descs) desc = &descs[i - 1]; qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe); @@ -788,6 +838,21 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, LQUOTA_FLAG_DEFAULT); + } else if (lqe->lqe_is_deleted) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_softlimit = 0; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_DELETED); + } else if (lqe->lqe_is_reset) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_RESET); + } else if (lqe->lqe_granted > lqe->lqe_hardlimit) { + qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; + qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; + qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0, + LQUOTA_FLAG_REVOKE); } else { qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit; qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit; @@ -797,7 +862,7 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, /* look up ldlm resource associated with global index */ fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid); - res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid, + res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0); if (IS_ERR(res)) { /* this might happen if no slaves have enqueued global quota @@ -817,20 +882,32 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, * broadcasting the new qunit value */ static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe) { - struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; + struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; int idx; int stype = qmt_uuid2idx(uuid, &idx); LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT); + CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n", + stype, lqe_rtype(lqe), idx, uuid->uuid); /* Quota pools support only OSTs, despite MDTs also could be registered * as LQUOTA_RES_DT devices(DOM). */ - if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) + if (qmt_dom(lqe_rtype(lqe), stype)) return 1; - else - return lgd->lqeg_arr[idx].lge_edquot_nu || - lgd->lqeg_arr[idx].lge_qunit_nu; + + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); + + CDEBUG(D_QUOTA, + "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n", + idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu, + lgd->lqeg_arr[lge_idx].lge_qunit_nu); + return lgd->lqeg_arr[lge_idx].lge_edquot_nu || + lgd->lqeg_arr[lge_idx].lge_qunit_nu; + } + + return 0; } @@ -856,8 +933,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe)); fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid); - res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN, - 0); + res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0); if (IS_ERR(res)) { /* this might legitimately happens if slaves haven't had the * opportunity to enqueue quota lock yet. */ @@ -872,6 +948,12 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, } lqe_write_lock(lqe); + /* + * It is possible to add an lqe in a 2nd time while the same lqe + * from the 1st time is still sending glimpse + */ + if (lqe->lqe_gl) + GOTO(out, 0); /* The purpose of glimpse callback on per-ID lock is twofold: * - notify slaves of new qunit value and hope they will release some * spare quota space in return @@ -890,14 +972,13 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, * replies if needed */ lqe->lqe_may_rel = 0; - /* The rebalance thread is the only thread which can issue glimpses */ - LASSERT(!lqe->lqe_gl); lqe->lqe_gl = true; lqe_write_unlock(lqe); /* issue glimpse callback to slaves */ - qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, - qmt_id_lock_cb, lqe); + if (lqe->lqe_glbl_data) + qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc, + qmt_id_lock_cb, lqe); lqe_write_lock(lqe); if (lqe->lqe_revoke_time == 0 && @@ -907,8 +988,8 @@ static void qmt_id_lock_glimpse(const struct lu_env *env, } LASSERT(lqe->lqe_gl); lqe->lqe_gl = false; +out: lqe_write_unlock(lqe); - ldlm_resource_putref(res); EXIT; } @@ -980,7 +1061,11 @@ static int qmt_reba_thread(void *_args) list_del_init(&lqe->lqe_link); spin_unlock(&qmt->qmt_reba_lock); - if (!kthread_should_stop()) + /* lqe_ref == 1 means we hold the last ref, + * so no need to send glimpse callbacks. + */ + if (!kthread_should_stop() && + atomic_read(&lqe->lqe_ref) > 1) qmt_id_lock_glimpse(env, qmt, lqe, NULL); lqe_putref(lqe);