From 2ae193aa5f7186b1ad1cd4b46fd19cea1b35ed34 Mon Sep 17 00:00:00 2001 From: tianzy Date: Tue, 5 May 2009 10:57:38 +0000 Subject: [PATCH] Branch b1_8 part I 1. mainly fix the lqs_key of lustre_qunit_size. 2. clean quota_compute_lqs(). 3. only supply quota_search_lqs() to outside and make quota_create_lqs() internal. b=18616 i=johann i=panda --- lustre/include/lustre_quota.h | 9 ++- lustre/quota/quota_adjust_qunit.c | 162 ++++++++++++++------------------------ lustre/quota/quota_context.c | 48 ++++++----- lustre/quota/quota_interface.c | 73 +++++++++-------- lustre/quota/quota_internal.h | 13 +-- lustre/quota/quota_master.c | 39 ++++----- 6 files changed, 150 insertions(+), 194 deletions(-) diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index be073c4..2fba4f0 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -263,7 +263,7 @@ struct lustre_qunit_size { cfs_time_t lqs_last_bshrink; /* time of last block shrink */ cfs_time_t lqs_last_ishrink; /* time of last inode shrink */ spinlock_t lqs_lock; - struct quota_adjust_qunit lqs_key; /* hash key */ + unsigned long long lqs_key; /* hash key */ struct lustre_quota_ctxt *lqs_ctxt; /* quota ctxt */ }; @@ -275,6 +275,13 @@ struct lustre_qunit_size { #define LQS_SET_ADJBLK(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJBLK) #define LQS_SET_ADJINO(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJINO) +/* In the hash for lustre_qunit_size, the key is decided by + * grp_or_usr and uid/gid, in here, I combine these two values, + * which will make comparing easier and more efficient */ +#define LQS_KEY(is_grp, id) ((is_grp ? 1ULL << 32: 0) + id) +#define LQS_KEY_ID(key) (key & 0xffffffff) +#define LQS_KEY_GRP(key) (key >> 32) + static inline void lqs_getref(struct lustre_qunit_size *lqs) { if (atomic_inc_return(&lqs->lqs_refcount) == 2) /* quota_create_lqs */ diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index 05d6fe4..992efe5 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -74,110 +74,41 @@ * is_acq: whether it is acquiring; otherwise, it is releasing */ void quota_compute_lqs(struct qunit_data *qdata, struct lustre_qunit_size *lqs, - int is_chk, int is_acq) + int is_chk, int is_acq) { - int is_blk; + long long *rec; LASSERT(qdata && lqs); LASSERT_SPIN_LOCKED(&lqs->lqs_lock); - is_blk = QDATA_IS_BLK(qdata); - - if (is_chk) { - if (is_acq) { - if (is_blk) - lqs->lqs_blk_rec += qdata->qd_count; - else - lqs->lqs_ino_rec += qdata->qd_count; - } else { - if (is_blk) - lqs->lqs_blk_rec -= qdata->qd_count; - else - lqs->lqs_ino_rec -= qdata->qd_count; - } - } else { - if (is_acq) { - if (is_blk) - lqs->lqs_blk_rec -= qdata->qd_count; - else - lqs->lqs_ino_rec -= qdata->qd_count; - } else { - if (is_blk) - lqs->lqs_blk_rec += qdata->qd_count; - else - lqs->lqs_ino_rec += qdata->qd_count; - } - } -} - -void qdata_to_oqaq(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq) -{ - LASSERT(qdata); - LASSERT(oqaq); - - oqaq->qaq_flags = qdata->qd_flags; - oqaq->qaq_id = qdata->qd_id; - if (QDATA_IS_ADJBLK(qdata)) - oqaq->qaq_bunit_sz = qdata->qd_qunit; - if (QDATA_IS_ADJINO(qdata)) - oqaq->qaq_iunit_sz = qdata->qd_qunit; -} - -int quota_search_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return) -{ - struct quota_adjust_qunit *oqaq_tmp = NULL; - ENTRY; - LASSERT(*lqs_return == NULL); - LASSERT(oqaq || qdata); + rec = QDATA_IS_BLK(qdata) ? &lqs->lqs_blk_rec : &lqs->lqs_ino_rec; - if (!oqaq) { - OBD_ALLOC_PTR(oqaq_tmp); - if (!oqaq_tmp) - RETURN(-ENOMEM); - qdata_to_oqaq(qdata, oqaq_tmp); - } else { - oqaq_tmp = oqaq; - } - - *lqs_return = lustre_hash_lookup(qctxt->lqc_lqs_hash, oqaq_tmp); - if (*lqs_return) - LQS_DEBUG((*lqs_return), "show lqs\n"); + if (!!is_chk + !!is_acq == 1) + *rec -= qdata->qd_count; + else + *rec += qdata->qd_count; - if (!oqaq) - OBD_FREE_PTR(oqaq_tmp); - RETURN(0); } -int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return) +static struct lustre_qunit_size * +quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt) { struct lustre_qunit_size *lqs = NULL; int rc = 0; - ENTRY; - - LASSERT(*lqs_return == NULL); - LASSERT(oqaq || qdata); OBD_ALLOC_PTR(lqs); if (!lqs) GOTO(out, rc = -ENOMEM); - if (!oqaq) { - qdata_to_oqaq(qdata, &lqs->lqs_key); - } else { - lqs->lqs_key = *oqaq; - } + lqs->lqs_key = lqs_key; spin_lock_init(&lqs->lqs_lock); lqs->lqs_bwrite_pending = 0; lqs->lqs_iwrite_pending = 0; lqs->lqs_ino_rec = 0; lqs->lqs_blk_rec = 0; - lqs->lqs_id = lqs->lqs_key.qaq_id; - lqs->lqs_flags = QAQ_IS_GRP(&lqs->lqs_key); + lqs->lqs_id = LQS_KEY_ID(lqs->lqs_key); + lqs->lqs_flags = LQS_KEY_GRP(lqs->lqs_key) ? LQUOTA_FLAGS_GRP : 0; lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz; lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz; lqs->lqs_btune_sz = qctxt->lqc_btune_sz; @@ -197,15 +128,48 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, &lqs->lqs_key, &lqs->lqs_hash); spin_unlock(&qctxt->lqc_lock); - LQS_DEBUG(lqs, "create lqs\n"); - if (!rc) { + if (!rc) lqs_getref(lqs); - *lqs_return = lqs; - } + out: if (rc && lqs) OBD_FREE_PTR(lqs); - RETURN(rc); + + if (rc) + return ERR_PTR(rc); + else + return lqs; +} + +struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, + struct lustre_quota_ctxt *qctxt, + int create) +{ + int rc = 0; + struct lustre_qunit_size *lqs; + + search_lqs: + lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key); + if (lqs == NULL && create) { + lqs = quota_create_lqs(lqs_key, qctxt); + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc == -EALREADY) { + rc = 0; + goto search_lqs; + } + } + + if (lqs) + LQS_DEBUG(lqs, "%s\n", + (create == 1 ? "create lqs" : "search lqs")); + + if (rc == 0) { + return lqs; + } else { + CDEBUG(D_ERROR, "get lqs error(rc: %d)\n", rc); + return ERR_PTR(rc); + } } int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, @@ -222,30 +186,22 @@ int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, RETURN(0); LASSERT(qctxt); -search_lqs: - rc = quota_search_lqs(NULL, oqaq, qctxt, &lqs); + lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id), + qctxt, 1); + if (lqs == NULL || IS_ERR(lqs)) + RETURN(PTR_ERR(lqs)); /* deleting the lqs, because a user sets lfs quota 0 0 0 0 */ if (!oqaq->qaq_bunit_sz && !oqaq->qaq_iunit_sz && QAQ_IS_ADJBLK(oqaq) && QAQ_IS_ADJINO(oqaq)) { - if (lqs) { - LQS_DEBUG(lqs, "release lqs\n"); - /* this is for quota_search_lqs */ - lqs_putref(lqs); - /* kill lqs */ - lqs_putref(lqs); - } + LQS_DEBUG(lqs, "release lqs\n"); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + /* kill lqs */ + lqs_putref(lqs); RETURN(rc); } - if (!lqs) { - rc = quota_create_lqs(NULL, oqaq, qctxt, &lqs); - if (rc == -EALREADY) - goto search_lqs; - if (rc < 0) - RETURN(rc); - } - lbunit = &lqs->lqs_bunit_sz; liunit = &lqs->lqs_iunit_sz; lbtune = &lqs->lqs_btune_sz; diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 95e0c23..b8009b5 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -275,18 +275,10 @@ check_cur_qunit(struct obd_device *obd, if (!limit) GOTO(out, ret = 0); - search_lqs: - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (!lqs) { - CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); - ret = quota_create_lqs(qdata, NULL, qctxt, &lqs); - if (ret == -EALREADY) { - ret = 0; - goto search_lqs; - } - if (ret < 0) - GOTO (out, ret); - } + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 1); + if (IS_ERR(lqs)) + GOTO (out, ret = PTR_ERR(lqs)); spin_lock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) { @@ -473,10 +465,12 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) { - struct lustre_qunit_size *lqs = NULL; + struct lustre_qunit_size *lqs; - quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data), + qunit->lq_data.qd_id), + qunit->lq_ctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); if (qunit->lq_opc == QUOTA_DQACQ) quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); @@ -517,6 +511,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait, struct obd_trans_info *oti); +static inline void qdata_to_oqaq(struct qunit_data *qdata, + struct quota_adjust_qunit *oqaq) +{ + LASSERT(qdata); + LASSERT(oqaq); + + oqaq->qaq_flags = qdata->qd_flags; + oqaq->qaq_id = qdata->qd_id; + if (QDATA_IS_ADJBLK(qdata)) + oqaq->qaq_bunit_sz = qdata->qd_qunit; + if (QDATA_IS_ADJINO(qdata)) + oqaq->qaq_iunit_sz = qdata->qd_qunit; +} + static int dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc) @@ -859,8 +867,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, insert_qunit_nolock(qctxt, qunit); spin_unlock(&qunit_hash_lock); - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0); /* when this qdata returned from mds, it will call lqs_putref */ @@ -1362,18 +1371,15 @@ lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) static int lqs_compare(void *key, struct hlist_node *hnode) { - struct quota_adjust_qunit *lqs_key; struct lustre_qunit_size *q; int rc; ENTRY; LASSERT(key); - lqs_key = (struct quota_adjust_qunit *)key; q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); spin_lock(&q->lqs_lock); - rc = ((lqs_key->qaq_id == q->lqs_id) && - (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q))); + rc = (q->lqs_key == *((unsigned long long *)key)); spin_unlock(&q->lqs_lock); RETURN(rc); diff --git a/lustre/quota/quota_interface.c b/lustre/quota/quota_interface.c index fb1dd26..5e94307 100644 --- a/lustre/quota/quota_interface.c +++ b/lustre/quota/quota_interface.c @@ -163,6 +163,7 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore) RETURN(0); } +#define GET_OA_ID(flag, oa) (flag == USRQUOTA ? oa->o_uid : oa->o_gid) static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) { struct obd_device_target *obt = &obd->u.obt; @@ -185,14 +186,14 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA); for (cnt = 0; cnt < MAXQUOTAS; cnt++) { - struct quota_adjust_qunit oqaq_tmp; struct lustre_qunit_size *lqs = NULL; - oqaq_tmp.qaq_flags = cnt; - oqaq_tmp.qaq_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid; - - quota_search_lqs(NULL, &oqaq_tmp, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(cnt, GET_OA_ID(cnt, oa)), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + rc = PTR_ERR(lqs); + break; + } else { spin_lock(&lqs->lqs_lock); if (lqs->lqs_bunit_sz <= qctxt->lqc_sync_blk) { oa->o_flags |= (cnt == USRQUOTA) ? @@ -283,8 +284,8 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid, if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) continue; - quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); - if (!lqs) + lqs = quota_search_lqs(LQS_KEY(i, id[i]), qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) continue; rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk); @@ -499,6 +500,7 @@ static int quota_pending_commit(struct obd_device *obd, unsigned int uid, do_gettimeofday(&work_start); for (i = 0; i < MAXQUOTAS; i++) { struct lustre_qunit_size *lqs = NULL; + int flag = 0; qdata[i].qd_id = id[i]; qdata[i].qd_flags = i; @@ -509,39 +511,36 @@ static int quota_pending_commit(struct obd_device *obd, unsigned int uid, if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) continue; - quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); - if (lqs) { - int flag = 0; - spin_lock(&lqs->lqs_lock); - if (isblk) { - if (lqs->lqs_bwrite_pending >= pending) { - lqs->lqs_bwrite_pending -= pending; - flag = 1; - } else { - CDEBUG(D_ERROR, - "there are too many blocks!\n"); - } + lqs = quota_search_lqs(LQS_KEY(i, qdata[i].qd_id), qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) + continue; + + spin_lock(&lqs->lqs_lock); + if (isblk) { + if (lqs->lqs_bwrite_pending >= pending) { + lqs->lqs_bwrite_pending -= pending; + flag = 1; } else { - if (lqs->lqs_iwrite_pending >= pending) { - lqs->lqs_iwrite_pending -= pending; - flag = 1; - } else { - CDEBUG(D_ERROR, - "there are too many files!\n"); - } + CDEBUG(D_ERROR, "there are too many blocks!\n"); } - CDEBUG(D_QUOTA, "lqs pending: %lu, pending: %d, " - "isblk: %d.\n", - isblk ? lqs->lqs_bwrite_pending : - lqs->lqs_iwrite_pending, pending, isblk); + } else { + if (lqs->lqs_iwrite_pending >= pending) { + lqs->lqs_iwrite_pending -= pending; + flag = 1; + } else { + CDEBUG(D_ERROR, "there are too many files!\n"); + } + } + CDEBUG(D_QUOTA, "lqs pending: %lu, pending: %d, isblk: %d.\n", + isblk ? lqs->lqs_bwrite_pending : + lqs->lqs_iwrite_pending, pending, isblk); - spin_unlock(&lqs->lqs_lock); + spin_unlock(&lqs->lqs_lock); + lqs_putref(lqs); + /* When lqs_*_pening is changed back, we'll putref lqs + * here b=14784 */ + if (flag) lqs_putref(lqs); - /* When lqs_*_pening is changed back, we'll putref lqs - * here b=14784 */ - if (flag) - lqs_putref(lqs); - } } do_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); diff --git a/lustre/quota/quota_internal.h b/lustre/quota/quota_internal.h index aa0f0ae..6be30e4 100644 --- a/lustre/quota/quota_internal.h +++ b/lustre/quota/quota_internal.h @@ -153,17 +153,10 @@ int target_quota_check(struct obd_export *exp, struct obd_quotactl *oqctl); int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct lustre_quota_ctxt *qctxt); -void qdata_to_oqaq(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq); #ifdef __KERNEL__ -int quota_search_lqs(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return); -int quota_create_lqs(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return); +struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, + struct lustre_quota_ctxt *qctxt, + int create); void quota_compute_lqs(struct qunit_data *qdata, struct lustre_qunit_size *lqs, int is_chk, int is_acq); diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index 0e7c3ed..5249a87 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -235,8 +235,8 @@ static void init_oqaq(struct quota_adjust_qunit *oqaq, oqaq->qaq_id = id; oqaq->qaq_flags = type; - quota_search_lqs(NULL, oqaq, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz; oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz; @@ -440,29 +440,24 @@ out: dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata)); - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (QDATA_IS_BLK(qdata)) { - if (!lqs) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = qctxt->lqc_bunit_sz; - } else { - spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = lqs->lqs_bunit_sz; - spin_unlock(&lqs->lqs_lock); - } - QDATA_SET_ADJBLK(qdata); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : + qctxt->lqc_iunit_sz; } else { - if (!lqs) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = qctxt->lqc_iunit_sz; - } else { - spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = lqs->lqs_iunit_sz; - spin_unlock(&lqs->lqs_lock); - } - QDATA_SET_ADJINO(qdata); + spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + spin_unlock(&lqs->lqs_lock); } + if (QDATA_IS_BLK(qdata)) + QDATA_SET_ADJBLK(qdata); + else + QDATA_SET_ADJINO(qdata); + QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); if (lqs) lqs_putref(lqs); -- 1.8.3.1