X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fquota_context.c;h=181474b9a4630727e0ef236b01279f06a98a34eb;hp=6e86c6f124bd0fb94ac90c6c18c1e85dc7afa3a2;hb=d24dce9f96591cf63d7c1ab22c36b9aed16a4ef2;hpb=6869932b552ac705f411de3362f01bd50c1f6f7d diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 6e86c6f..181474b 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -44,7 +44,7 @@ # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #include #include @@ -57,33 +57,99 @@ #include #include #include +#include #include "quota_internal.h" -unsigned long default_bunit_sz = 100 * 1024 * 1024; /* 100M bytes */ -unsigned long default_btune_ratio = 50; /* 50 percentage */ -unsigned long default_iunit_sz = 5000; /* 5000 inodes */ -unsigned long default_itune_ratio = 50; /* 50 percentage */ +static int hash_lqs_cur_bits = HASH_LQS_CUR_BITS; +CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444, + "the current bits of lqs hash"); + +#ifdef HAVE_QUOTA_SUPPORT + +static cfs_hash_ops_t lqs_hash_ops; + +unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */ +unsigned long default_btune_ratio = 50; /* 50 percentage */ +unsigned long default_iunit_sz = 5120; /* 5120 inodes */ +unsigned long default_itune_ratio = 50; /* 50 percentage */ cfs_mem_cache_t *qunit_cachep = NULL; -struct list_head qunit_hash[NR_DQHASH]; -spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED; +cfs_list_t qunit_hash[NR_DQHASH]; +cfs_spinlock_t qunit_hash_lock = CFS_SPIN_LOCK_UNLOCKED; + +/* please sync qunit_state with qunit_state_names */ +enum qunit_state { + /** + * a qunit is created + */ + QUNIT_CREATED = 0, + /** + * a qunit is added into qunit hash, that means + * a quota req will be sent or is flying + */ + QUNIT_IN_HASH = 1, + /** + * a qunit is removed from qunit hash, that + * means a quota req is handled and comes back + */ + QUNIT_RM_FROM_HASH = 2, + /** + * qunit can wake up all threads waiting for it + */ + QUNIT_FINISHED = 3, +}; + +static const char *qunit_state_names[] = { + [QUNIT_CREATED] = "CREATED", + [QUNIT_IN_HASH] = "IN_HASH", + [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH", + [QUNIT_FINISHED] = "FINISHED", +}; struct lustre_qunit { - struct list_head lq_hash; /* Hash list in memory */ - atomic_t lq_refcnt; /* Use count */ - struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */ - struct qunit_data lq_data; /* See qunit_data */ - unsigned int lq_opc; /* QUOTA_DQACQ, QUOTA_DQREL */ - struct list_head lq_waiters; /* All write threads waiting for this qunit */ + cfs_list_t lq_hash; /** Hash list in memory */ + cfs_atomic_t lq_refcnt; /** Use count */ + struct lustre_quota_ctxt *lq_ctxt; /** Quota context this applies to */ + struct qunit_data lq_data; /** See qunit_data */ + unsigned int lq_opc; /** QUOTA_DQACQ, QUOTA_DQREL */ + cfs_waitq_t lq_waitq; /** Threads waiting for this qunit */ + cfs_spinlock_t lq_lock; /** Protect the whole structure */ + enum qunit_state lq_state; /** Present the status of qunit */ + int lq_rc; /** The rc of lq_data */ + pid_t lq_owner; }; +#define QUNIT_SET_STATE(qunit, state) \ +do { \ + cfs_spin_lock(&qunit->lq_lock); \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d), lq_owner(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ + qunit->lq_state = state; \ + cfs_spin_unlock(&qunit->lq_lock); \ +} while(0) + +#define QUNIT_SET_STATE_AND_RC(qunit, state, rc) \ +do { \ + cfs_spin_lock(&qunit->lq_lock); \ + qunit->lq_rc = rc; \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d), lq_owner(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ + qunit->lq_state = state; \ + cfs_spin_unlock(&qunit->lq_lock); \ +} while(0) + int should_translate_quota (struct obd_import *imp) { ENTRY; LASSERT(imp); - if ((imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64) && - !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) + if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64) RETURN(0); else RETURN(1); @@ -94,15 +160,15 @@ void qunit_cache_cleanup(void) int i; ENTRY; - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) - LASSERT(list_empty(qunit_hash + i)); - spin_unlock(&qunit_hash_lock); + LASSERT(cfs_list_empty(qunit_hash + i)); + cfs_spin_unlock(&qunit_hash_lock); if (qunit_cachep) { int rc; rc = cfs_mem_cache_destroy(qunit_cachep); - LASSERTF(rc == 0, "couldn't destory qunit_cache slab\n"); + LASSERTF(rc == 0, "couldn't destroy qunit_cache slab\n"); qunit_cachep = NULL; } EXIT; @@ -120,10 +186,10 @@ int qunit_cache_init(void) if (!qunit_cachep) RETURN(-ENOMEM); - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) CFS_INIT_LIST_HEAD(qunit_hash + i); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); RETURN(0); } @@ -135,66 +201,13 @@ static inline int qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata) { unsigned int id = qdata->qd_id; - unsigned int type = qdata->qd_flags & QUOTA_IS_GRP; + unsigned int type = QDATA_IS_GRP(qdata); unsigned long tmp = ((unsigned long)qctxt >> L1_CACHE_SHIFT) ^ id; tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH; return tmp; } -/* compute the remaining quota for certain gid or uid b=11693 */ -int compute_remquota(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata) -{ - struct super_block *sb = qctxt->lqc_sb; - __u64 usage, limit; - struct obd_quotactl *qctl; - int ret = QUOTA_RET_OK; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - ENTRY; - - if (!sb_any_quota_enabled(sb)) - RETURN(QUOTA_RET_NOQUOTA); - - /* ignore root user */ - if (qdata->qd_id == 0 && qdata_type == USRQUOTA) - RETURN(QUOTA_RET_NOLIMIT); - - OBD_ALLOC_PTR(qctl); - if (qctl == NULL) - RETURN(-ENOMEM); - - /* get fs quota usage & limit */ - qctl->qc_cmd = Q_GETQUOTA; - qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; - ret = fsfilt_quotactl(obd, sb, qctl); - if (ret) { - if (ret == -ESRCH) /* no limit */ - ret = QUOTA_RET_NOLIMIT; - else - CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)", - ret); - GOTO(out, ret); - } - - usage = qctl->qc_dqblk.dqb_curspace; - limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS; - if (!limit){ /* no limit */ - ret = QUOTA_RET_NOLIMIT; - GOTO(out, ret); - } - - if (limit >= usage) - qdata->qd_count = limit - usage; - else - qdata->qd_count = 0; - EXIT; -out: - OBD_FREE_PTR(qctl); - return ret; -} - /* caller must hold qunit_hash_lock */ static inline struct lustre_qunit *find_qunit(unsigned int hashent, struct lustre_quota_ctxt *qctxt, @@ -204,10 +217,12 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent, struct qunit_data *tmp; LASSERT_SPIN_LOCKED(&qunit_hash_lock); - list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) { + cfs_list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) { tmp = &qunit->lq_data; if (qunit->lq_ctxt == qctxt && - qdata->qd_id == tmp->qd_id && qdata->qd_flags == tmp->qd_flags) + qdata->qd_id == tmp->qd_id && + (qdata->qd_flags & LQUOTA_QUNIT_FLAGS) == + (tmp->qd_flags & LQUOTA_QUNIT_FLAGS)) return qunit; } return NULL; @@ -218,9 +233,9 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent, * @qdata: the type of quota unit to be checked * * return: 1 - need acquire qunit; - * 2 - need release qunit; - * 0 - need do nothing. - * < 0 - error. + * 2 - need release qunit; + * 0 - need do nothing. + * < 0 - error. */ static int check_cur_qunit(struct obd_device *obd, @@ -228,16 +243,23 @@ check_cur_qunit(struct obd_device *obd, { struct super_block *sb = qctxt->lqc_sb; unsigned long qunit_sz, tune_sz; - __u64 usage, limit; + __u64 usage, limit, limit_org, pending_write = 0; + long long record = 0; struct obd_quotactl *qctl; + struct lustre_qunit_size *lqs = NULL; int ret = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; ENTRY; - if (!sb_any_quota_enabled(sb)) + if (!ll_sb_any_quota_active(sb)) RETURN(0); + cfs_spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid){ + cfs_spin_unlock(&qctxt->lqc_lock); + RETURN(0); + } + cfs_spin_unlock(&qctxt->lqc_lock); + OBD_ALLOC_PTR(qctl); if (qctl == NULL) RETURN(-ENOMEM); @@ -245,7 +267,7 @@ check_cur_qunit(struct obd_device *obd, /* get fs quota usage & limit */ qctl->qc_cmd = Q_GETQUOTA; qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; + qctl->qc_type = QDATA_IS_GRP(qdata); ret = fsfilt_quotactl(obd, sb, qctl); if (ret) { if (ret == -ESRCH) /* no limit */ @@ -255,57 +277,151 @@ check_cur_qunit(struct obd_device *obd, GOTO(out, ret); } - if (is_blk) { + if (QDATA_IS_BLK(qdata)) { usage = qctl->qc_dqblk.dqb_curspace; limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS; - qunit_sz = qctxt->lqc_bunit_sz; - tune_sz = qctxt->lqc_btune_sz; - - LASSERT(!(qunit_sz % QUOTABLOCK_SIZE)); } else { usage = qctl->qc_dqblk.dqb_curinodes; limit = qctl->qc_dqblk.dqb_ihardlimit; - qunit_sz = qctxt->lqc_iunit_sz; - tune_sz = qctxt->lqc_itune_sz; } - /* ignore the no quota limit case */ + /* ignore the no quota limit case; and it can avoid creating + * unnecessary lqs for uid/gid */ if (!limit) GOTO(out, ret = 0); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (IS_ERR(lqs) || lqs == NULL) { + CERROR("fail to find a lqs for %sid: %u)!\n", + QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id); + GOTO (out, ret = 0); + } + cfs_spin_lock(&lqs->lqs_lock); + + if (QDATA_IS_BLK(qdata)) { + qunit_sz = lqs->lqs_bunit_sz; + tune_sz = lqs->lqs_btune_sz; + pending_write = lqs->lqs_bwrite_pending; + record = lqs->lqs_blk_rec; + LASSERT(!(qunit_sz % QUOTABLOCK_SIZE)); + } else { + /* we didn't need change inode qunit size now */ + qunit_sz = lqs->lqs_iunit_sz; + tune_sz = lqs->lqs_itune_sz; + pending_write = lqs->lqs_iwrite_pending; + record = lqs->lqs_ino_rec; + } + /* we don't count the MIN_QLIMIT */ - if ((limit == MIN_QLIMIT && !is_blk) || - (toqb(limit) == MIN_QLIMIT && is_blk)) + if ((limit == MIN_QLIMIT && !QDATA_IS_BLK(qdata)) || + (toqb(limit) == MIN_QLIMIT && QDATA_IS_BLK(qdata))) limit = 0; + usage += pending_write; + limit_org = limit; + /* when a releasing quota req is sent, before it returned + limit is assigned a small value. limit will overflow */ + if (limit + record < 0) + usage -= record; + else + limit += record; + LASSERT(qdata->qd_count == 0); if (limit <= usage + tune_sz) { - while (qdata->qd_count + limit <= usage + tune_sz) + while (qdata->qd_count + limit <= + usage + tune_sz) qdata->qd_count += qunit_sz; ret = 1; - } else if (limit > usage + qunit_sz + tune_sz) { - while (limit - qdata->qd_count > usage + qunit_sz + tune_sz) + } else if (limit > usage + qunit_sz + tune_sz && + limit_org > qdata->qd_count + qunit_sz) { + while (limit - qdata->qd_count > usage + qunit_sz + tune_sz && + limit_org > qdata->qd_count + qunit_sz) qdata->qd_count += qunit_sz; ret = 2; + /* if there are other pending writes for this uid/gid, releasing + * quota is put off until the last pending write b=16645 */ + /* if there is an ongoing quota request, a releasing request is aborted. + * That ongoing quota request will call this function again when + * it returned b=18630 */ + if (pending_write || record) { + CDEBUG(D_QUOTA, "delay quota release\n"); + ret = 0; + } } + if (ret > 0) + quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0); + + CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64 + ", pending_write: "LPU64", record: %lld" + ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n", + QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write, + record, qunit_sz, tune_sz, ret); LASSERT(ret == 0 || qdata->qd_count); + + cfs_spin_unlock(&lqs->lqs_lock); + lqs_putref(lqs); + EXIT; -out: + out: OBD_FREE_PTR(qctl); return ret; } -/* caller must hold qunit_hash_lock */ -static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata) +/** + * Compute the remaining quota for certain gid or uid b=11693 + */ +int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata, int isblk) { - unsigned int hashent = qunit_hashfn(qctxt, qdata); - struct lustre_qunit *qunit; + struct super_block *sb = qctxt->lqc_sb; + __u64 usage, limit; + struct obd_quotactl *qctl; + int ret = QUOTA_RET_OK; ENTRY; - LASSERT_SPIN_LOCKED(&qunit_hash_lock); - qunit = find_qunit(hashent, qctxt, qdata); - RETURN(qunit); + if (!ll_sb_any_quota_active(sb)) + RETURN(QUOTA_RET_NOQUOTA); + + /* ignore root user */ + if (qdata->qd_id == 0 && QDATA_IS_GRP(qdata) == USRQUOTA) + RETURN(QUOTA_RET_NOLIMIT); + + OBD_ALLOC_PTR(qctl); + if (qctl == NULL) + RETURN(-ENOMEM); + + /* get fs quota usage & limit */ + qctl->qc_cmd = Q_GETQUOTA; + qctl->qc_id = qdata->qd_id; + qctl->qc_type = QDATA_IS_GRP(qdata); + ret = fsfilt_quotactl(obd, sb, qctl); + if (ret) { + if (ret == -ESRCH) /* no limit */ + ret = QUOTA_RET_NOLIMIT; + else + CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)", + ret); + GOTO(out, ret); + } + + usage = isblk ? qctl->qc_dqblk.dqb_curspace : + qctl->qc_dqblk.dqb_curinodes; + limit = isblk ? qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS : + qctl->qc_dqblk.dqb_ihardlimit; + if (!limit){ /* no limit */ + ret = QUOTA_RET_NOLIMIT; + GOTO(out, ret); + } + + if (limit >= usage) + qdata->qd_count = limit - usage; + else + qdata->qd_count = 0; + EXIT; +out: + OBD_FREE_PTR(qctl); + return ret; } static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, @@ -314,17 +430,19 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit = NULL; ENTRY; - OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO); if (qunit == NULL) RETURN(NULL); CFS_INIT_LIST_HEAD(&qunit->lq_hash); - CFS_INIT_LIST_HEAD(&qunit->lq_waiters); - atomic_set(&qunit->lq_refcnt, 1); + cfs_waitq_init(&qunit->lq_waitq); + cfs_atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; memcpy(&qunit->lq_data, qdata, sizeof(*qdata)); qunit->lq_opc = opc; - + qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED; + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); } @@ -335,112 +453,189 @@ static inline void free_qunit(struct lustre_qunit *qunit) static inline void qunit_get(struct lustre_qunit *qunit) { - atomic_inc(&qunit->lq_refcnt); + cfs_atomic_inc(&qunit->lq_refcnt); } static void qunit_put(struct lustre_qunit *qunit) { - LASSERT(atomic_read(&qunit->lq_refcnt)); - if (atomic_dec_and_test(&qunit->lq_refcnt)) + LASSERT(cfs_atomic_read(&qunit->lq_refcnt)); + if (cfs_atomic_dec_and_test(&qunit->lq_refcnt)) free_qunit(qunit); } +/* caller must hold qunit_hash_lock and release ref of qunit after using it */ +static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata) +{ + unsigned int hashent = qunit_hashfn(qctxt, qdata); + struct lustre_qunit *qunit; + ENTRY; + + LASSERT_SPIN_LOCKED(&qunit_hash_lock); + qunit = find_qunit(hashent, qctxt, qdata); + if (qunit) + qunit_get(qunit); + RETURN(qunit); +} + static void insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) { - struct list_head *head; + cfs_list_t *head; - LASSERT(list_empty(&qunit->lq_hash)); + LASSERT(cfs_list_empty(&qunit->lq_hash)); + qunit_get(qunit); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); - list_add(&qunit->lq_hash, head); + cfs_list_add(&qunit->lq_hash, head); + QUNIT_SET_STATE(qunit, QUNIT_IN_HASH); +} + +static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) +{ + struct lustre_qunit_size *lqs; + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data), + qunit->lq_data.qd_id), + qunit->lq_ctxt, 0); + if (lqs && !IS_ERR(lqs)) { + cfs_spin_lock(&lqs->lqs_lock); + if (qunit->lq_opc == QUOTA_DQACQ) + quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); + if (qunit->lq_opc == QUOTA_DQREL) + quota_compute_lqs(&qunit->lq_data, lqs, 0, 0); + cfs_spin_unlock(&lqs->lqs_lock); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + /* this is for schedule_dqacq */ + lqs_putref(lqs); + } } static void remove_qunit_nolock(struct lustre_qunit *qunit) { - LASSERT(!list_empty(&qunit->lq_hash)); - list_del_init(&qunit->lq_hash); + LASSERT(!cfs_list_empty(&qunit->lq_hash)); + LASSERT_SPIN_LOCKED(&qunit_hash_lock); + + cfs_list_del_init(&qunit->lq_hash); + QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH); + qunit_put(qunit); } -struct qunit_waiter { - struct list_head qw_entry; - cfs_waitq_t qw_waitq; - int qw_rc; -}; +void* quota_barrier(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl, int isblk) +{ + struct lustre_qunit *qunit, *find_qunit; + int cycle = 1; + + OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + if (qunit == NULL) { + CERROR("locating %sunit failed for %sid %u\n", + isblk ? "b" : "i", oqctl->qc_type ? "g" : "u", + oqctl->qc_id); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + return NULL; + } + + CFS_INIT_LIST_HEAD(&qunit->lq_hash); + qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED; + cfs_waitq_init(&qunit->lq_waitq); + cfs_atomic_set(&qunit->lq_refcnt, 1); + qunit->lq_ctxt = qctxt; + qunit->lq_data.qd_id = oqctl->qc_id; + qunit->lq_data.qd_flags = oqctl->qc_type; + if (isblk) + QDATA_SET_BLK(&qunit->lq_data); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + /* it means it is only an invalid qunit for barrier */ + qunit->lq_opc = QUOTA_LAST_OPC; + + while (1) { + cfs_spin_lock(&qunit_hash_lock); + find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data); + if (find_qunit) { + cfs_spin_unlock(&qunit_hash_lock); + qunit_put(find_qunit); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + CDEBUG(D_QUOTA, "cycle=%d\n", cycle++); + continue; + } + break; + } + insert_qunit_nolock(qctxt, qunit); + cfs_spin_unlock(&qunit_hash_lock); + return qunit; +} + +void quota_unbarrier(void *handle) +{ + struct lustre_qunit *qunit = (struct lustre_qunit *)handle; + + if (qunit == NULL) { + CERROR("handle is NULL\n"); + return; + } + + LASSERT(qunit->lq_opc == QUOTA_LAST_OPC); + cfs_spin_lock(&qunit_hash_lock); + remove_qunit_nolock(qunit); + cfs_spin_unlock(&qunit_hash_lock); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED); + cfs_waitq_signal(&qunit->lq_waitq); + qunit_put(qunit); +} #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ (limit = count) : (limit += count) -/* FIXME check if this mds is the master of specified id */ -static int -is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - unsigned int id, int type) +static inline int is_master(struct lustre_quota_ctxt *qctxt) { return qctxt->lqc_handler ? 1 : 0; } static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait); + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti); -static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait) +static inline void qdata_to_oqaq(struct qunit_data *qdata, + struct quota_adjust_qunit *oqaq) { - int rc = 0; - unsigned long factor; - struct qunit_data tmp_qdata; - ENTRY; - - LASSERT(qdata && qdata->qd_count); - QDATA_DEBUG(qdata, "%s quota split.\n", - (qdata->qd_flags & QUOTA_IS_BLOCK) ? "block" : "inode"); - if (qdata->qd_flags & QUOTA_IS_BLOCK) - factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * - qctxt->lqc_bunit_sz; - else - factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * - qctxt->lqc_iunit_sz; - - if (qctxt->lqc_import && should_translate_quota(qctxt->lqc_import) && - qdata->qd_count > factor) { - tmp_qdata = *qdata; - tmp_qdata.qd_count = factor; - qdata->qd_count -= tmp_qdata.qd_count; - QDATA_DEBUG((&tmp_qdata), "be split.\n"); - rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait); - } else{ - QDATA_DEBUG(qdata, "don't be split.\n"); - rc = schedule_dqacq(obd, qctxt, qdata, opc, wait); - } - - RETURN(rc); + LASSERT(qdata); + LASSERT(oqaq); + + oqaq->qaq_flags = qdata->qd_flags; + oqaq->qaq_id = qdata->qd_id; + if (QDATA_IS_ADJBLK(qdata)) + oqaq->qaq_bunit_sz = qdata->qd_qunit; + if (QDATA_IS_ADJINO(qdata)) + oqaq->qaq_iunit_sz = qdata->qd_qunit; } static int -dqacq_completion(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, +dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc) { struct lustre_qunit *qunit = NULL; struct super_block *sb = qctxt->lqc_sb; - unsigned long qunit_sz; - struct qunit_waiter *qw, *tmp; int err = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; - __u64 qd_tmp = qdata->qd_count; - unsigned long div_r; + struct quota_adjust_qunit *oqaq = NULL; + int rc1 = 0; ENTRY; LASSERT(qdata); - qunit_sz = is_blk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz; - div_r = do_div(qd_tmp, qunit_sz); - LASSERTF(!div_r, "qunit_sz: %lu, return qunit_sz: "LPU64"\n", - qunit_sz, qd_tmp); + QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n", + obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); + + /* do it only when a releasing quota req more than 5MB b=18491 */ + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880) + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5); /* update local operational quota file */ if (rc == 0) { - __u32 count = QUSG(qdata->qd_count, is_blk); + __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata)); struct obd_quotactl *qctl; __u64 *hardlimit; @@ -453,14 +648,14 @@ dqacq_completion(struct obd_device *obd, * set fs quota limit */ qctl->qc_cmd = Q_GETQUOTA; qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; + qctl->qc_type = QDATA_IS_GRP(qdata); err = fsfilt_quotactl(obd, sb, qctl); if (err) { CERROR("error get quota fs limit! (rc:%d)\n", err); GOTO(out_mem, err); } - if (is_blk) { + if (QDATA_IS_BLK(qdata)) { qctl->qc_dqblk.dqb_valid = QIF_BLIMITS; hardlimit = &qctl->qc_dqblk.dqb_bhardlimit; } else { @@ -468,20 +663,24 @@ dqacq_completion(struct obd_device *obd, hardlimit = &qctl->qc_dqblk.dqb_ihardlimit; } + CDEBUG(D_QUOTA, "hardlimt: "LPU64"\n", *hardlimit); + + if (*hardlimit == 0) + goto out_mem; + switch (opc) { case QUOTA_DQACQ: - CDEBUG(D_QUOTA, "%s(acq):count: %d, hardlimt: "LPU64 - ",type: %s.\n", obd->obd_name, count, *hardlimit, - qdata_type ? "grp": "usr"); INC_QLIMIT(*hardlimit, count); break; case QUOTA_DQREL: - CDEBUG(D_QUOTA, "%s(rel):count: %d, hardlimt: "LPU64 - ",type: %s.\n", obd->obd_name, count, *hardlimit, - qdata_type ? "grp": "usr"); - LASSERTF(count < *hardlimit, - "count: %d, hardlimit: "LPU64".\n", - count, *hardlimit); + LASSERTF(count < *hardlimit, + "id(%u) flag(%u) type(%c) isblk(%c) " + "count("LPU64") qd_qunit("LPU64") " + "hardlimit("LPU64").\n", + qdata->qd_id, qdata->qd_flags, + QDATA_IS_GRP(qdata) ? 'g' : 'u', + QDATA_IS_BLK(qdata) ? 'b': 'i', + qdata->qd_count, qdata->qd_qunit, *hardlimit); *hardlimit -= count; break; default: @@ -510,47 +709,73 @@ out_mem: } out: /* remove the qunit from hash */ - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); /* this qunit has been removed by qctxt_cleanup() */ if (!qunit) { - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); + QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n", + opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); RETURN(err); } LASSERT(opc == qunit->lq_opc); + /* remove this qunit from lq_hash so that new processes cannot be added + * to qunit->lq_waiters */ remove_qunit_nolock(qunit); + cfs_spin_unlock(&qunit_hash_lock); - /* wake up all waiters */ - list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = rc; - wake_up(&qw->qw_waitq); - } + compute_lqs_after_removing_qunit(qunit); - spin_unlock(&qunit_hash_lock); + if (rc == 0) + rc = QUOTA_REQ_RETURNED; + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); + /* wake up all waiters */ + cfs_waitq_broadcast(&qunit->lq_waitq); + /* this is for dqacq_in_flight() */ qunit_put(qunit); + if (rc < 0 && rc != -EDQUOT) + GOTO(out1, err); /* don't reschedule in such cases: - * - acq/rel failure, but not for quota recovery. + * - acq/rel failure and qunit isn't changed, + * but not for quota recovery. * - local dqacq/dqrel. * - local disk io failure. */ - if (err || (rc && rc != -EBUSY) || - is_master(obd, qctxt, qdata->qd_id, qdata_type)) - RETURN(err); + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + GOTO(out1, err = -ENOMEM); + qdata_to_oqaq(qdata, oqaq); + /* adjust the qunit size in slaves */ + rc1 = quota_adjust_slave_lqs(oqaq, qctxt); + OBD_FREE_PTR(oqaq); + if (rc1 < 0) { + CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); + GOTO(out1, err = rc1); + } + if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) + GOTO(out1, err); + + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 && + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL)) + GOTO(out1, err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; - rc = check_cur_qunit(obd, qctxt, qdata); - if (rc > 0) { + qdata->qd_flags &= LQUOTA_QUNIT_FLAGS; + rc1 = check_cur_qunit(obd, qctxt, qdata); + if (rc1 > 0) { int opc; - opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0); - QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc); + opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL; + rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL); + QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } + out1: + /* this is for alloc_qunit() */ + qunit_put(qunit); RETURN(err); } @@ -559,213 +784,390 @@ struct dqacq_async_args { struct lustre_qunit *aa_qunit; }; -static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc) +static int dqacq_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc) { struct dqacq_async_args *aa = (struct dqacq_async_args *)data; struct lustre_quota_ctxt *qctxt = aa->aa_ctxt; + struct obd_device_target *obt = qctxt->lqc_obt; struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; - struct qunit_data_old *qdata_old = NULL; ENTRY; LASSERT(req); LASSERT(req->rq_import); - if ((req->rq_import->imp_connect_data.ocd_connect_flags & - OBD_CONNECT_QUOTA64) && - !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) { - CDEBUG(D_QUOTA, "qd_count is 64bit!\n"); + cfs_down_read(&obt->obt_rwsem); + /* if a quota req timeouts or is dropped, we should update quota + * statistics which will be handled in dqacq_completion. And in + * this situation we should get qdata from request instead of + * reply */ + qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); + DEBUG_REQ(D_ERROR, req, + "error unpacking qunit_data(rc: %ld)\n", + PTR_ERR(qdata)); + qdata = &qunit->lq_data; + } - qdata = req_capsule_server_swab_get(&req->rq_pill, - &RMF_QUNIT_DATA, - (void*)lustre_swab_qdata); - } else { - CDEBUG(D_QUOTA, "qd_count is 32bit!\n"); + QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); + QDATA_DEBUG((&qunit->lq_data), "lq_data: \n"); - qdata = req_capsule_server_swab_get(&req->rq_pill, - &RMF_QUNIT_DATA, - (void*)lustre_swab_qdata_old); - qdata = lustre_quota_old_to_new(qdata_old); + if (qdata->qd_id != qunit->lq_data.qd_id || + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) { + CERROR("the returned qd_id isn't expected!" + "(qdata: %u, lq_data: %u)\n", qdata->qd_id, + qunit->lq_data.qd_id); + qdata->qd_id = qunit->lq_data.qd_id; + rc = -EPROTO; } - if (qdata == NULL) { - DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data"); - RETURN(-EPROTO); + if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) { + CERROR("the returned grp/usr isn't expected!" + "(qdata: %u, lq_data: %u)\n", qdata->qd_flags, + qunit->lq_data.qd_flags); + if (QDATA_IS_GRP(&qunit->lq_data)) + QDATA_SET_GRP(qdata); + else + QDATA_CLR_GRP(qdata); + rc = -EPROTO; + } + if (qdata->qd_count > qunit->lq_data.qd_count) { + CERROR("the returned qd_count isn't expected!" + "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count, + qunit->lq_data.qd_count); + rc = -EPROTO; } - LASSERT(qdata->qd_id == qunit->lq_data.qd_id && - (qdata->qd_flags & QUOTA_IS_GRP) == - (qunit->lq_data.qd_flags & QUOTA_IS_GRP) && - (qdata->qd_count == qunit->lq_data.qd_count || - qdata->qd_count == 0)); - - QDATA_DEBUG(qdata, "%s interpret rc(%d).\n", - lustre_msg_get_opc(req->rq_reqmsg) == QUOTA_DQACQ ? - "DQACQ" : "DQREL", rc); + if (unlikely(rc == -ESRCH)) + CERROR("quota for %s has been enabled by master, but disabled " + "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user"); rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); + cfs_up_read(&obt->obt_rwsem); RETURN(rc); } -static int got_qunit(struct qunit_waiter *waiter) +/** + * check if quota master is online + */ +int check_qm(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + cfs_spin_lock(&qctxt->lqc_lock); + /* quit waiting when mds is back or qctxt is cleaned up */ + rc = qctxt->lqc_import || !qctxt->lqc_valid; + cfs_spin_unlock(&qctxt->lqc_lock); + + RETURN(rc); +} + +/* wake up all waiting threads when lqc_import is NULL */ +void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) +{ + struct lustre_qunit *qunit, *tmp; + int i; + ENTRY; + + cfs_spin_lock(&qunit_hash_lock); + for (i = 0; i < NR_DQHASH; i++) { + cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], + lq_hash) { + if (qunit->lq_ctxt != qctxt) + continue; + + /* Wake up all waiters. Do not change lq_state. + * The waiters will check lq_rc which is kept as 0 + * if no others change it, then the waiters will return + * -EAGAIN to caller who can perform related quota + * acq/rel if necessary. */ + cfs_waitq_broadcast(&qunit->lq_waitq); + } + } + cfs_spin_unlock(&qunit_hash_lock); + EXIT; +} + +static int got_qunit(struct lustre_qunit *qunit, int is_master) { + struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt; int rc = 0; ENTRY; - spin_lock(&qunit_hash_lock); - rc = list_empty(&waiter->qw_entry); - spin_unlock(&qunit_hash_lock); + + cfs_spin_lock(&qunit->lq_lock); + switch (qunit->lq_state) { + case QUNIT_IN_HASH: + case QUNIT_RM_FROM_HASH: + break; + case QUNIT_FINISHED: + rc = 1; + break; + default: + CERROR("invalid qunit state %d\n", qunit->lq_state); + } + cfs_spin_unlock(&qunit->lq_lock); + + if (!rc) { + cfs_spin_lock(&qctxt->lqc_lock); + rc = !qctxt->lqc_valid; + if (!is_master) + rc |= !qctxt->lqc_import; + cfs_spin_unlock(&qctxt->lqc_lock); + } + RETURN(rc); } +static inline void +revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc) +{ + /* revoke lqs_xxx_rec which is computed in check_cur_qunit + * b=18630 */ + cfs_spin_lock(&lqs->lqs_lock); + quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0); + cfs_spin_unlock(&lqs->lqs_lock); +} + static int -schedule_dqacq(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait) +schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti) { struct lustre_qunit *qunit, *empty; - struct qunit_waiter qw; struct l_wait_info lwi = { 0 }; struct ptlrpc_request *req; - struct qunit_data *reqdata; struct dqacq_async_args *aa; - unsigned long factor; + struct obd_import *imp = NULL; + struct lustre_qunit_size *lqs = NULL; + struct timeval work_start; + struct timeval work_end; + long timediff; int rc = 0; ENTRY; - CFS_INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; + LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL); + cfs_gettimeofday(&work_start); - if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CERROR("Can't find the lustre qunit size!\n"); + RETURN(-EPERM); + } + + if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) { + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); RETURN(-ENOMEM); + } - spin_lock(&qunit_hash_lock); + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5); + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { - if (wait) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); + qunit_put(empty); - free_qunit(empty); + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); goto wait_completion; } qunit = empty; + qunit_get(qunit); insert_qunit_nolock(qctxt, qunit); - if (wait) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); - LASSERT(qunit); + /* From here, the quota request will be sent anyway. + * When this qdata request returned or is cancelled, + * lqs_putref will be called at that time */ + lqs_getref(lqs); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n", + obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); /* master is going to dqacq/dqrel from itself */ - if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP)) - { + if (is_master(qctxt)) { int rc2; QDATA_DEBUG(qdata, "local %s.\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); + QDATA_SET_CHANGE_QS(qdata); rc = qctxt->lqc_handler(obd, qdata, opc); rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc); - RETURN((rc && rc != -EDQUOT) ? rc : rc2); + /* this is for qunit_get() */ + qunit_put(qunit); + + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + if (opc == QUOTA_DQACQ) + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ, + timediff); + else + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL, + timediff); + RETURN(rc ? rc : rc2); } + cfs_spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_import) { + cfs_spin_unlock(&qctxt->lqc_lock); + QDATA_DEBUG(qdata, "lqc_import is invalid.\n"); + + cfs_spin_lock(&qunit_hash_lock); + remove_qunit_nolock(qunit); + cfs_spin_unlock(&qunit_hash_lock); + + compute_lqs_after_removing_qunit(qunit); + + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); + cfs_waitq_broadcast(&qunit->lq_waitq); + + /* this is for qunit_get() */ + qunit_put(qunit); + /* this for alloc_qunit() */ + qunit_put(qunit); + cfs_spin_lock(&qctxt->lqc_lock); + if (wait && !qctxt->lqc_import) { + cfs_spin_unlock(&qctxt->lqc_lock); + LASSERT(oti && oti->oti_thread); + /* The recovery thread doesn't have watchdog + * attached. LU-369 */ + if (oti->oti_thread->t_watchdog) + lc_watchdog_disable(oti->oti_thread->\ + t_watchdog); + CDEBUG(D_QUOTA, "sleep for quota master\n"); + l_wait_event(qctxt->lqc_wait_for_qmaster, + check_qm(qctxt), &lwi); + CDEBUG(D_QUOTA, "wake up when quota master is back\n"); + if (oti->oti_thread->t_watchdog) + lc_watchdog_touch(oti->oti_thread->t_watchdog, + CFS_GET_TIMEOUT(oti->oti_thread->t_svc)); + } else { + cfs_spin_unlock(&qctxt->lqc_lock); + } + + RETURN(-EAGAIN); + } + imp = class_import_get(qctxt->lqc_import); + cfs_spin_unlock(&qctxt->lqc_lock); + /* build dqacq/dqrel request */ - LASSERT(qctxt->lqc_import); + LASSERT(imp); - req = ptlrpc_request_alloc_pack(qctxt->lqc_import, &RQF_MDS_QUOTA_DQACQ, + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_QUOTA_DQACQ, LUSTRE_MDS_VERSION, opc); + class_import_put(imp); if (req == NULL) { + CERROR("Can't alloc request\n"); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); + /* this is for qunit_get() */ + qunit_put(qunit); RETURN(-ENOMEM); } - if (qdata->qd_flags & QUOTA_IS_BLOCK) - factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * - qctxt->lqc_bunit_sz; - else - factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * - qctxt->lqc_iunit_sz; - - LASSERT(!should_translate_quota(qctxt->lqc_import) || - qdata->qd_count <= factor); - if (should_translate_quota(qctxt->lqc_import)) - { - struct qunit_data_old *reqdata_old, *tmp; - - reqdata_old = req_capsule_client_get(&req->rq_pill, - &RMF_QUNIT_DATA); - - tmp = lustre_quota_new_to_old(qdata); - *reqdata_old = *tmp; - req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER, - sizeof(*reqdata_old)); - CDEBUG(D_QUOTA, "qd_count is 32bit!\n"); - } else { - reqdata = req_capsule_client_get(&req->rq_pill, - &RMF_QUNIT_DATA); - - *reqdata = *qdata; - req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER, - sizeof(*reqdata)); - CDEBUG(D_QUOTA, "qd_count is 64bit!\n"); - } ptlrpc_request_set_replen(req); + req->rq_no_resend = req->rq_no_delay = 1; + rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); + if (rc < 0) { + CERROR("Can't pack qunit_data(rc: %d)\n", rc); + ptlrpc_req_finished(req); + dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); + /* this is for qunit_get() */ + qunit_put(qunit); + RETURN(rc); + } CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct dqacq_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_ctxt = qctxt; aa->aa_qunit = qunit; req->rq_interpret_reply = dqacq_interpret; - ptlrpcd_add_req(req); + ptlrpcd_add_req(req, PSCOPE_OTHER); QDATA_DEBUG(qdata, "%s scheduled.\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); wait_completion: if (wait && qunit) { struct qunit_data *p = &qunit->lq_data; - QDATA_DEBUG(p, "wait for dqacq.\n"); - - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); - if (qw.qw_rc == 0) - rc = -EAGAIN; - CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); + /* rc = -EAGAIN, it means the quota master isn't ready yet + * rc = QUOTA_REQ_RETURNED, it means a quota req is finished; + * rc = -EDQUOT, it means out of quota + * rc = -EBUSY, it means recovery is happening + * other rc < 0, it means real errors, functions who call + * schedule_dqacq should take care of this */ + cfs_spin_lock(&qunit->lq_lock); + rc = qunit->lq_rc; + cfs_spin_unlock(&qunit->lq_lock); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " + "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, + qunit->lq_data.qd_flags, rc, qunit->lq_owner); } + + qunit_put(qunit); + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + if (opc == QUOTA_DQACQ) + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ, + timediff); + else + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL, + timediff); + RETURN(rc); } int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - uid_t uid, gid_t gid, __u32 isblk, int wait) + const unsigned int id[], __u32 isblk, int wait, + struct obd_trans_info *oti) { - int ret, rc = 0, i = USRQUOTA; - __u32 id[MAXQUOTAS] = { uid, gid }; + int rc = 0, i = USRQUOTA; struct qunit_data qdata[MAXQUOTAS]; ENTRY; - CLASSERT(MAXQUOTAS < 4); - if (!sb_any_quota_enabled(qctxt->lqc_sb)) + if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0) RETURN(0); for (i = 0; i < MAXQUOTAS; i++) { qdata[i].qd_id = id[i]; - qdata[i].qd_flags = 0; - qdata[i].qd_flags |= i; - qdata[i].qd_flags |= isblk ? QUOTA_IS_BLOCK : 0; + qdata[i].qd_flags = i; + if (isblk) + QDATA_SET_BLK(&qdata[i]); qdata[i].qd_count = 0; - ret = check_cur_qunit(obd, qctxt, &qdata[i]); - if (ret > 0) { + rc = check_cur_qunit(obd, qctxt, &qdata[i]); + if (rc > 0) { int opc; /* need acquire or release */ - opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - ret = split_before_schedule_dqacq(obd, qctxt, &qdata[i], - opc, wait); - if (!rc) - rc = ret; + opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL; + rc = schedule_dqacq(obd, qctxt, &qdata[i], opc, + wait,oti); + if (rc < 0) + RETURN(rc); + } else if (wait == 1) { + /* when wait equates 1, that means mds_quota_acquire + * or filter_quota_acquire is calling it. */ + rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk); + if (rc < 0) + RETURN(rc); } } @@ -777,100 +1179,207 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, unsigned short type, int isblk) { struct lustre_qunit *qunit = NULL; - struct qunit_waiter qw; struct qunit_data qdata; + struct timeval work_start; + struct timeval work_end; + long timediff; struct l_wait_info lwi = { 0 }; + int rc = 0; ENTRY; - CFS_INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; - + cfs_gettimeofday(&work_start); qdata.qd_id = id; - qdata.qd_flags = 0; - qdata.qd_flags |= type; - qdata.qd_flags |= isblk ? QUOTA_IS_BLOCK : 0; + qdata.qd_flags = type; + if (isblk) + QDATA_SET_BLK(&qdata); qdata.qd_count = 0; - spin_lock(&qunit_hash_lock); - + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, &qdata); - if (qunit) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); if (qunit) { - struct qunit_data *p = &qdata; - QDATA_DEBUG(p, "wait for dqacq completion.\n"); - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); - QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + struct qunit_data *p = &qunit->lq_data; + + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " + "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); + /* keep same as schedule_dqacq() b=17030 */ + cfs_spin_lock(&qunit->lq_lock); + rc = qunit->lq_rc; + cfs_spin_unlock(&qunit->lq_lock); + /* this is for dqacq_in_flight() */ + qunit_put(qunit); + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA : + LQUOTA_WAIT_PENDING_INO_QUOTA, + timediff); + } else { + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA : + LQUOTA_NOWAIT_PENDING_INO_QUOTA, + timediff); } - RETURN(0); + + RETURN(rc); } int -qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb, - dqacq_handler_t handler) +qctxt_init(struct obd_device *obd, dqacq_handler_t handler) { + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct obd_device_target *obt = &obd->u.obt; + struct super_block *sb = obt->obt_sb; int rc = 0; ENTRY; + LASSERT(qctxt); + rc = ptlrpcd_addref(); if (rc) RETURN(rc); + cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); + cfs_waitq_init(&qctxt->lqc_lqs_waitq); + cfs_atomic_set(&qctxt->lqc_lqs, 0); + cfs_spin_lock_init(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; qctxt->lqc_sb = sb; + qctxt->lqc_obt = obt; qctxt->lqc_import = NULL; qctxt->lqc_recovery = 0; - qctxt->lqc_atype = 0; - qctxt->lqc_status= 0; + qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */ + qctxt->lqc_valid = 1; + qctxt->lqc_cqs_boundary_factor = 4; + qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE; + qctxt->lqc_cqs_least_iunit = 2; + qctxt->lqc_cqs_qs_factor = 2; + qctxt->lqc_flags = 0; + QUOTA_MASTER_UNREADY(qctxt); qctxt->lqc_bunit_sz = default_bunit_sz; qctxt->lqc_btune_sz = default_bunit_sz / 100 * default_btune_ratio; qctxt->lqc_iunit_sz = default_iunit_sz; qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100; + qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes + * after the last shrinking */ + qctxt->lqc_sync_blk = 0; + cfs_spin_unlock(&qctxt->lqc_lock); + + qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH", + hash_lqs_cur_bits, + HASH_LQS_MAX_BITS, + min(hash_lqs_cur_bits, + HASH_LQS_BKT_BITS), + 0, CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &lqs_hash_ops, CFS_HASH_DEFAULT); + if (!qctxt->lqc_lqs_hash) { + CERROR("initialize hash lqs for %s error!\n", obd->obd_name); + RETURN(-ENOMEM); + } - RETURN(0); +#ifdef LPROCFS + rc = lquota_proc_setup(obd, is_master(qctxt)); + if (rc) + CERROR("initialize proc for %s error!\n", obd->obd_name); +#endif + + RETURN(rc); +} + +static int check_lqs(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + rc = !cfs_atomic_read(&qctxt->lqc_lqs); + + RETURN(rc); +} + +int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) +{ + /* remove from hash and -1 refcount */ + cfs_hash_bd_del_locked(hs, bd, hnode); + return 0; } void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; - struct qunit_waiter *qw, *tmp2; + cfs_list_t tmp_list; + struct l_wait_info lwi = { 0 }; + struct obd_device_target *obt = qctxt->lqc_obt; int i; ENTRY; - spin_lock(&qunit_hash_lock); + CFS_INIT_LIST_HEAD(&tmp_list); + cfs_spin_lock(&qctxt->lqc_lock); + qctxt->lqc_valid = 0; + cfs_spin_unlock(&qctxt->lqc_lock); + + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) { - list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { + cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], + lq_hash) { if (qunit->lq_ctxt != qctxt) continue; - remove_qunit_nolock(qunit); - /* wake up all waiters */ - list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters, - qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = 0; - wake_up(&qw->qw_waitq); - } - qunit_put(qunit); + cfs_list_add(&qunit->lq_hash, &tmp_list); } } + cfs_spin_unlock(&qunit_hash_lock); + + cfs_list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) { + cfs_list_del_init(&qunit->lq_hash); + compute_lqs_after_removing_qunit(qunit); + + /* wake up all waiters */ + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0); + cfs_waitq_broadcast(&qunit->lq_waitq); + qunit_put(qunit); + } + + /* after qctxt_cleanup, qctxt might be freed, then check_qm() is + * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ + while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { + cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); + cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(1)); + } - spin_unlock(&qunit_hash_lock); + /* release refcount on lustre_qunit_size holding by lqs_hash */ + cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL); + + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); + cfs_down_write(&obt->obt_rwsem); + cfs_hash_putref(qctxt->lqc_lqs_hash); + qctxt->lqc_lqs_hash = NULL; + cfs_up_write(&obt->obt_rwsem); ptlrpcd_decref(); +#ifdef LPROCFS + if (lquota_proc_cleanup(qctxt)) + CERROR("cleanup proc error!\n"); +#endif + EXIT; } struct qslave_recov_thread_data { struct obd_device *obd; struct lustre_quota_ctxt *qctxt; - struct completion comp; + cfs_completion_t comp; }; /* FIXME only recovery block quota by now */ @@ -883,30 +1392,39 @@ static int qslave_recovery_main(void *arg) int rc = 0; ENTRY; - ptlrpc_daemonize("qslave_recovd"); + cfs_daemonize_ctxt("qslave_recovd"); - complete(&data->comp); + /* for obdfilter */ + class_incref(obd, "qslave_recovd_filter", obd); - if (qctxt->lqc_recovery) + cfs_complete(&data->comp); + + cfs_spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_recovery) { + cfs_spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); - qctxt->lqc_recovery = 1; + } else { + qctxt->lqc_recovery = 1; + cfs_spin_unlock(&qctxt->lqc_lock); + } for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; struct quota_info *dqopt = sb_dqopt(qctxt->lqc_sb); - struct list_head id_list; + cfs_list_t id_list; struct dquot_id *dqid, *tmp; int ret; LOCK_DQONOFF_MUTEX(dqopt); - if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) { + if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) { UNLOCK_DQONOFF_MUTEX(dqopt); break; } LASSERT(dqopt->files[type] != NULL); CFS_INIT_LIST_HEAD(&id_list); -#ifndef KERNEL_SUPPORTS_QUOTA_READ +#ifndef KERNEL_SUPPORTS_QUOTA_READ rc = fsfilt_qids(obd, dqopt->files[type], NULL, type, &id_list); #else rc = fsfilt_qids(obd, NULL, dqopt->files[type], type, &id_list); @@ -915,38 +1433,43 @@ static int qslave_recovery_main(void *arg) if (rc) CERROR("Get ids from quota file failed. (rc:%d)\n", rc); - list_for_each_entry_safe(dqid, tmp, &id_list, di_link) { - list_del_init(&dqid->di_link); + cfs_list_for_each_entry_safe(dqid, tmp, &id_list, di_link) { + cfs_list_del_init(&dqid->di_link); /* skip slave recovery on itself */ - if (is_master(obd, qctxt, dqid->di_id, type)) + if (is_master(qctxt)) goto free; if (rc && rc != -EBUSY) goto free; qdata.qd_id = dqid->di_id; - qdata.qd_flags = 0; - qdata.qd_flags |= type; - qdata.qd_flags |= QUOTA_IS_BLOCK; + qdata.qd_flags = type; + QDATA_SET_BLK(&qdata); qdata.qd_count = 0; ret = check_cur_qunit(obd, qctxt, &qdata); if (ret > 0) { int opc; opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc = split_before_schedule_dqacq(obd, qctxt, &qdata, opc, 0); - } else + rc = schedule_dqacq(obd, qctxt, &qdata, opc, + 0, NULL); + if (rc == -EDQUOT) + rc = 0; + } else { rc = 0; + } - if (rc) - CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR, - "qslave recovery failed! (id:%d type:%d " + if (rc && rc != -EBUSY) + CERROR("qslave recovery failed! (id:%d type:%d " " rc:%d)\n", dqid->di_id, type, rc); free: - kfree(dqid); + OBD_FREE_PTR(dqid); } } + cfs_spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; + cfs_spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } @@ -957,19 +1480,170 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt) int rc; ENTRY; - if (!sb_any_quota_enabled(qctxt->lqc_sb)) + if (!ll_sb_any_quota_active(qctxt->lqc_sb)) goto exit; data.obd = obd; data.qctxt = qctxt; - init_completion(&data.comp); + cfs_init_completion(&data.comp); - rc = kernel_thread(qslave_recovery_main, &data, CLONE_VM|CLONE_FILES); + rc = cfs_create_thread(qslave_recovery_main, &data, + CFS_DAEMON_FLAGS); if (rc < 0) { CERROR("Cannot start quota recovery thread: rc %d\n", rc); goto exit; } - wait_for_completion(&data.comp); + cfs_wait_for_completion(&data.comp); exit: EXIT; } + +inline int quota_is_on(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl) +{ + return ((qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) == + UGQUOTA2LQC(oqctl->qc_type)); +} + +inline int quota_is_off(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl) +{ + return !(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)); +} + +/** + * When quotaon, build a lqs for every uid/gid who has been set limitation + * for quota. After quota_search_lqs, it will hold one ref for the lqs. + * It will be released when qctxt_cleanup() is executed b=18574 + * + * Should be called with obt->obt_quotachecking held. b=20152 + */ +void build_lqs(struct obd_device *obd) +{ + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + cfs_list_t id_list; + int i, rc; + + LASSERT_SEM_LOCKED(&obt->obt_quotachecking); + CFS_INIT_LIST_HEAD(&id_list); + for (i = 0; i < MAXQUOTAS; i++) { + struct dquot_id *dqid, *tmp; + + if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL) + continue; + +#ifndef KERNEL_SUPPORTS_QUOTA_READ + rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL, + i, &id_list); +#else + rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i], + i, &id_list); +#endif + if (rc) { + CERROR("%s: failed to get %s qids!\n", obd->obd_name, + i ? "group" : "user"); + continue; + } + + cfs_list_for_each_entry_safe(dqid, tmp, &id_list, + di_link) { + struct lustre_qunit_size *lqs; + + cfs_list_del_init(&dqid->di_link); + lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id), + qctxt, 1); + if (lqs && !IS_ERR(lqs)) { + lqs->lqs_flags |= dqid->di_flag; + lqs_putref(lqs); + } else { + CERROR("%s: failed to create a lqs for %sid %u" + "\n", obd->obd_name, i ? "g" : "u", + dqid->di_id); + } + + OBD_FREE_PTR(dqid); + } + } +} + +/** + * lqs<->qctxt hash operations + */ + +/** + * string hashing using djb2 hash algorithm + */ +static unsigned +lqs_hash(cfs_hash_t *hs, const void *key, unsigned mask) +{ + unsigned long long id; + unsigned hash; + ENTRY; + + LASSERT(key); + id = *((unsigned long long *)key); + hash = (LQS_KEY_GRP(id) ? 5381 : 5387) * (unsigned)LQS_KEY_ID(id); + + RETURN(hash & mask); +} + +static void * +lqs_key(cfs_hlist_node_t *hnode) +{ + struct lustre_qunit_size *lqs; + ENTRY; + + lqs = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + RETURN(&lqs->lqs_key); +} + +static int +lqs_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct lustre_qunit_size *q = + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + + RETURN(q->lqs_key == *((unsigned long long *)key)); +} + +static void * +lqs_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); +} + +static void +lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct lustre_qunit_size *q = + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + + lqs_getref(q); +} + +static void +lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct lustre_qunit_size *q = + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + + lqs_putref(q); +} + +static void +lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + CERROR("It should not have any item left to be handled by this!"); +} + +static cfs_hash_ops_t lqs_hash_ops = { + .hs_hash = lqs_hash, + .hs_key = lqs_key, + .hs_keycmp = lqs_keycmp, + .hs_object = lqs_object, + .hs_get = lqs_get, + .hs_put_locked = lqs_put_locked, + .hs_exit = lqs_exit +}; +#endif /* HAVE_QUOTA_SUPPORT */