From 222305972a5d38b0fe111966da775e98a0ec8269 Mon Sep 17 00:00:00 2001 From: tianzy Date: Tue, 23 Jun 2009 06:20:26 +0000 Subject: [PATCH] Branch HEAD part I 1. mainly fix the lqs_key of lustre_qunit_size. 2. clean quota_compute_lqs(). 3. only supply quota_search_lqs() to outside and make quota_create_lqs() internal. b=18616 i=johann i=panda --- lustre/include/lustre_quota.h | 9 +- lustre/quota/quota_adjust_qunit.c | 167 ++++++++++++++------------------------ lustre/quota/quota_context.c | 48 ++++++----- lustre/quota/quota_interface.c | 29 +++---- lustre/quota/quota_internal.h | 13 +-- lustre/quota/quota_master.c | 39 ++++----- 6 files changed, 132 insertions(+), 173 deletions(-) diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 9c6cd5d..838c460 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -337,7 +337,7 @@ struct lustre_qunit_size { cfs_time_t lqs_last_bshrink; /** time of last block shrink */ cfs_time_t lqs_last_ishrink; /** time of last inode shrink */ spinlock_t lqs_lock; - struct quota_adjust_qunit lqs_key; /** hash key */ + unsigned long long lqs_key; /** hash key */ struct lustre_quota_ctxt *lqs_ctxt; /** quota ctxt */ }; @@ -349,6 +349,13 @@ struct lustre_qunit_size { #define LQS_SET_ADJBLK(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJBLK) #define LQS_SET_ADJINO(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJINO) +/* In the hash for lustre_qunit_size, the key is decided by + * grp_or_usr and uid/gid, in here, I combine these two values, + * which will make comparing easier and more efficient */ +#define LQS_KEY(is_grp, id) ((is_grp ? 1ULL << 32: 0) + id) +#define LQS_KEY_ID(key) (key & 0xffffffff) +#define LQS_KEY_GRP(key) (key >> 32) + static inline void lqs_getref(struct lustre_qunit_size *lqs) { atomic_inc(&lqs->lqs_refcount); diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index 1c20d86..a3ef7ef 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -75,109 +75,41 @@ * is_acq: whether it is acquiring; otherwise, it is releasing */ void quota_compute_lqs(struct qunit_data *qdata, struct lustre_qunit_size *lqs, - int is_chk, int is_acq) + int is_chk, int is_acq) { - int is_blk; + long long *rec; LASSERT(qdata && lqs); LASSERT_SPIN_LOCKED(&lqs->lqs_lock); - is_blk = QDATA_IS_BLK(qdata); - - if (is_chk) { - if (is_acq) { - if (is_blk) - lqs->lqs_blk_rec += qdata->qd_count; - else - lqs->lqs_ino_rec += qdata->qd_count; - } else { - if (is_blk) - lqs->lqs_blk_rec -= qdata->qd_count; - else - lqs->lqs_ino_rec -= qdata->qd_count; - } - } else { - if (is_acq) { - if (is_blk) - lqs->lqs_blk_rec -= qdata->qd_count; - else - lqs->lqs_ino_rec -= qdata->qd_count; - } else { - if (is_blk) - lqs->lqs_blk_rec += qdata->qd_count; - else - lqs->lqs_ino_rec += qdata->qd_count; - } - } -} - -void qdata_to_oqaq(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq) -{ - LASSERT(qdata); - LASSERT(oqaq); - - oqaq->qaq_flags = qdata->qd_flags; - oqaq->qaq_id = qdata->qd_id; - if (QDATA_IS_ADJBLK(qdata)) - oqaq->qaq_bunit_sz = qdata->qd_qunit; - if (QDATA_IS_ADJINO(qdata)) - oqaq->qaq_iunit_sz = qdata->qd_qunit; -} - -int quota_search_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return) -{ - struct quota_adjust_qunit *oqaq_tmp = NULL; - ENTRY; - LASSERT(*lqs_return == NULL); - LASSERT(oqaq || qdata); + rec = QDATA_IS_BLK(qdata) ? &lqs->lqs_blk_rec : &lqs->lqs_ino_rec; - if (!oqaq) { - OBD_ALLOC_PTR(oqaq_tmp); - if (!oqaq_tmp) - RETURN(-ENOMEM); - qdata_to_oqaq(qdata, oqaq_tmp); - } else { - oqaq_tmp = oqaq; - } - - *lqs_return = lustre_hash_lookup(qctxt->lqc_lqs_hash, oqaq_tmp); - if (*lqs_return) - LQS_DEBUG((*lqs_return), "show lqs\n"); + if (!!is_chk + !!is_acq == 1) + *rec -= qdata->qd_count; + else + *rec += qdata->qd_count; - if (!oqaq) - OBD_FREE_PTR(oqaq_tmp); - RETURN(0); } -int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return) +static struct lustre_qunit_size * +quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt) { struct lustre_qunit_size *lqs = NULL; int rc = 0; - ENTRY; - - LASSERT(*lqs_return == NULL); - LASSERT(oqaq || qdata); OBD_ALLOC_PTR(lqs); if (!lqs) GOTO(out, rc = -ENOMEM); - if (!oqaq) - qdata_to_oqaq(qdata, &lqs->lqs_key); - else - lqs->lqs_key = *oqaq; + lqs->lqs_key = lqs_key; spin_lock_init(&lqs->lqs_lock); lqs->lqs_bwrite_pending = 0; lqs->lqs_iwrite_pending = 0; lqs->lqs_ino_rec = 0; lqs->lqs_blk_rec = 0; - lqs->lqs_id = lqs->lqs_key.qaq_id; - lqs->lqs_flags = QAQ_IS_GRP(&lqs->lqs_key); + lqs->lqs_id = LQS_KEY_ID(lqs->lqs_key); + lqs->lqs_flags = LQS_KEY_GRP(lqs->lqs_key) ? LQUOTA_FLAGS_GRP : 0; lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz; lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz; lqs->lqs_btune_sz = qctxt->lqc_btune_sz; @@ -188,17 +120,50 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, lqs->lqs_last_ishrink = 0; } lqs_initref(lqs); - rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, - &lqs->lqs_key, &lqs->lqs_hash); - LQS_DEBUG(lqs, "create lqs\n"); - if (!rc) { + rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, &lqs->lqs_key, + &lqs->lqs_hash); + if (!rc) lqs_getref(lqs); - *lqs_return = lqs; - } -out: + + out: if (rc && lqs) OBD_FREE_PTR(lqs); - RETURN(rc); + + if (rc) + return ERR_PTR(rc); + else + return lqs; +} + +struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, + struct lustre_quota_ctxt *qctxt, + int create) +{ + int rc = 0; + struct lustre_qunit_size *lqs; + + search_lqs: + lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key); + if (lqs == NULL && create) { + lqs = quota_create_lqs(lqs_key, qctxt); + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc == -EALREADY) { + rc = 0; + goto search_lqs; + } + } + + if (lqs) + LQS_DEBUG(lqs, "%s\n", + (create == 1 ? "create lqs" : "search lqs")); + + if (rc == 0) { + return lqs; + } else { + CDEBUG(D_ERROR, "get lqs error(rc: %d)\n", rc); + return ERR_PTR(rc); + } } int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, @@ -212,30 +177,22 @@ int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, ENTRY; LASSERT(qctxt); -search_lqs: - rc = quota_search_lqs(NULL, oqaq, qctxt, &lqs); + lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id), + qctxt, 1); + if (lqs == NULL || IS_ERR(lqs)) + RETURN(PTR_ERR(lqs)); /* deleting the lqs, because a user sets lfs quota 0 0 0 0 */ if (!oqaq->qaq_bunit_sz && !oqaq->qaq_iunit_sz && QAQ_IS_ADJBLK(oqaq) && QAQ_IS_ADJINO(oqaq)) { - if (lqs) { - LQS_DEBUG(lqs, "release lqs\n"); - /* this is for quota_search_lqs */ - lqs_putref(lqs); - /* kill lqs */ - lqs_putref(lqs); - } + LQS_DEBUG(lqs, "release lqs\n"); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + /* kill lqs */ + lqs_putref(lqs); RETURN(rc); } - if (!lqs) { - rc = quota_create_lqs(NULL, oqaq, qctxt, &lqs); - if (rc == -EALREADY) - goto search_lqs; - if (rc < 0) - RETURN(rc); - } - lbunit = &lqs->lqs_bunit_sz; liunit = &lqs->lqs_iunit_sz; lbtune = &lqs->lqs_btune_sz; diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 54a9de4..c96cbc2 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -288,18 +288,10 @@ check_cur_qunit(struct obd_device *obd, if (!limit) GOTO(out, ret = 0); - search_lqs: - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (!lqs) { - CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); - ret = quota_create_lqs(qdata, NULL, qctxt, &lqs); - if (ret == -EALREADY) { - ret = 0; - goto search_lqs; - } - if (ret < 0) - GOTO (out, ret); - } + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 1); + if (IS_ERR(lqs)) + GOTO (out, ret = PTR_ERR(lqs)); spin_lock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) { @@ -488,10 +480,12 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) { - struct lustre_qunit_size *lqs = NULL; + struct lustre_qunit_size *lqs; - quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data), + qunit->lq_data.qd_id), + qunit->lq_ctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); if (qunit->lq_opc == QUOTA_DQACQ) quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); @@ -529,6 +523,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait, struct obd_trans_info *oti); +static inline void qdata_to_oqaq(struct qunit_data *qdata, + struct quota_adjust_qunit *oqaq) +{ + LASSERT(qdata); + LASSERT(oqaq); + + oqaq->qaq_flags = qdata->qd_flags; + oqaq->qaq_id = qdata->qd_id; + if (QDATA_IS_ADJBLK(qdata)) + oqaq->qaq_bunit_sz = qdata->qd_qunit; + if (QDATA_IS_ADJINO(qdata)) + oqaq->qaq_iunit_sz = qdata->qd_qunit; +} + static int dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc) @@ -868,8 +876,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, insert_qunit_nolock(qctxt, qunit); spin_unlock(&qunit_hash_lock); - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0); /* when this qdata returned from mds, it will call lqs_putref */ @@ -1380,18 +1389,15 @@ lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) static int lqs_compare(void *key, struct hlist_node *hnode) { - struct quota_adjust_qunit *lqs_key; struct lustre_qunit_size *q; int rc; ENTRY; LASSERT(key); - lqs_key = (struct quota_adjust_qunit *)key; q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); spin_lock(&q->lqs_lock); - rc = ((lqs_key->qaq_id == q->lqs_id) && - (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q))); + rc = (q->lqs_key == *((unsigned long long *)key)); spin_unlock(&q->lqs_lock); RETURN(rc); diff --git a/lustre/quota/quota_interface.c b/lustre/quota/quota_interface.c index dff4ce4..20471a1 100644 --- a/lustre/quota/quota_interface.c +++ b/lustre/quota/quota_interface.c @@ -177,6 +177,7 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore) RETURN(0); } +#define GET_OA_ID(flag, oa) (flag == USRQUOTA ? oa->o_uid : oa->o_gid) static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) { struct obd_device_target *obt = &obd->u.obt; @@ -199,14 +200,14 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA); for (cnt = 0; cnt < MAXQUOTAS; cnt++) { - struct quota_adjust_qunit oqaq_tmp; struct lustre_qunit_size *lqs = NULL; - oqaq_tmp.qaq_flags = cnt; - oqaq_tmp.qaq_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid; - - quota_search_lqs(NULL, &oqaq_tmp, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(cnt, GET_OA_ID(cnt, oa)), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + rc = PTR_ERR(lqs); + break; + } else { spin_lock(&lqs->lqs_lock); if (lqs->lqs_bunit_sz <= qctxt->lqc_sync_blk) { oa->o_flags |= (cnt == USRQUOTA) ? @@ -287,8 +288,8 @@ static int quota_check_common(struct obd_device *obd, const unsigned int id[], if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) continue; - quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); - if (!lqs) + lqs = quota_search_lqs(LQS_KEY(i, id[i]), qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) continue; if (IS_ERR(lqs)) { @@ -523,12 +524,12 @@ static int quota_pending_commit(struct obd_device *obd, const unsigned int id[], if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) continue; - quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); + lqs = quota_search_lqs(LQS_KEY(i, qdata[i].qd_id), qctxt, 0); if (lqs == NULL || IS_ERR(lqs)) { CERROR("can not find lqs for pending_commit: " "[id %u] [%c] [pending %u] [isblk %d] (rc %ld), " "maybe cause unexpected lqs refcount error!\n", - id[i], i % 2 ? 'g': 'u', pending[i], isblk, + id[i], i ? 'g': 'u', pending[i], isblk, lqs ? PTR_ERR(lqs) : -1); continue; } @@ -551,10 +552,10 @@ static int quota_pending_commit(struct obd_device *obd, const unsigned int id[], lqs->lqs_iwrite_pending -= pending[i]; } - CDEBUG(D_QUOTA, "id: %u, %c, lqs pending: %lu, pending: %d, " - "isblk: %d.\n", id[i], i % 2 ? 'g' : 'u', - isblk ? lqs->lqs_bwrite_pending: lqs->lqs_iwrite_pending, - pending[i], isblk); + CDEBUG(D_QUOTA, "%s: lqs_pending=%lu pending[%d]=%d isblk=%d\n", + obd->obd_name, + isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending, + i, pending[i], isblk); spin_unlock(&lqs->lqs_lock); /* for quota_search_lqs in pending_commit */ diff --git a/lustre/quota/quota_internal.h b/lustre/quota/quota_internal.h index 3d50ad5..e7a633d 100644 --- a/lustre/quota/quota_internal.h +++ b/lustre/quota/quota_internal.h @@ -154,17 +154,10 @@ int target_quota_check(struct obd_device *obd, struct obd_export *exp, int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct lustre_quota_ctxt *qctxt); -void qdata_to_oqaq(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq); #ifdef __KERNEL__ -int quota_search_lqs(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return); -int quota_create_lqs(struct qunit_data *qdata, - struct quota_adjust_qunit *oqaq, - struct lustre_quota_ctxt *qctxt, - struct lustre_qunit_size **lqs_return); +struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, + struct lustre_quota_ctxt *qctxt, + int create); void quota_compute_lqs(struct qunit_data *qdata, struct lustre_qunit_size *lqs, int is_chk, int is_acq); diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index 2d50d8b..7753bff 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -235,8 +235,8 @@ static void init_oqaq(struct quota_adjust_qunit *oqaq, oqaq->qaq_id = id; oqaq->qaq_flags = type; - quota_search_lqs(NULL, oqaq, qctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz; oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz; @@ -441,29 +441,24 @@ out: dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata)); - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (QDATA_IS_BLK(qdata)) { - if (!lqs) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = qctxt->lqc_bunit_sz; - } else { - spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = lqs->lqs_bunit_sz; - spin_unlock(&lqs->lqs_lock); - } - QDATA_SET_ADJBLK(qdata); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : + qctxt->lqc_iunit_sz; } else { - if (!lqs) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = qctxt->lqc_iunit_sz; - } else { - spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = lqs->lqs_iunit_sz; - spin_unlock(&lqs->lqs_lock); - } - QDATA_SET_ADJINO(qdata); + spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + spin_unlock(&lqs->lqs_lock); } + if (QDATA_IS_BLK(qdata)) + QDATA_SET_ADJBLK(qdata); + else + QDATA_SET_ADJINO(qdata); + QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); if (lqs) lqs_putref(lqs); -- 1.8.3.1