X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fquota_context.c;h=9e4cac281ab1b33e2756002fdb4df31efd732e90;hp=4af5414ce2c0e5576d686017aafe2bdb1d767c58;hb=e2af7fb3c91dfb13d34d8e1b2f2df8c09621f768;hpb=3b4c006b28c9d6a7c3b00535cd3a6292178fa4c6 diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 4af5414..9e4cac2 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,8 +24,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -40,10 +40,6 @@ * Author: Niu YaWei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include @@ -60,7 +56,9 @@ #include #include "quota_internal.h" -#ifdef HAVE_QUOTA_SUPPORT +static int hash_lqs_cur_bits = HASH_LQS_CUR_BITS; +CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444, + "the current bits of lqs hash"); static cfs_hash_ops_t lqs_hash_ops; @@ -70,8 +68,8 @@ unsigned long default_iunit_sz = 5120; /* 5120 inodes */ unsigned long default_itune_ratio = 50; /* 50 percentage */ cfs_mem_cache_t *qunit_cachep = NULL; -struct list_head qunit_hash[NR_DQHASH]; -spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED; +cfs_list_t qunit_hash[NR_DQHASH]; +cfs_spinlock_t qunit_hash_lock = CFS_SPIN_LOCK_UNLOCKED; /* please sync qunit_state with qunit_state_names */ enum qunit_state { @@ -103,13 +101,13 @@ static const char *qunit_state_names[] = { }; struct lustre_qunit { - struct list_head lq_hash; /** Hash list in memory */ - atomic_t lq_refcnt; /** Use count */ + cfs_list_t lq_hash; /** Hash list in memory */ + cfs_atomic_t lq_refcnt; /** Use count */ struct lustre_quota_ctxt *lq_ctxt; /** Quota context this applies to */ struct qunit_data lq_data; /** See qunit_data */ unsigned int lq_opc; /** QUOTA_DQACQ, QUOTA_DQREL */ cfs_waitq_t lq_waitq; /** Threads waiting for this qunit */ - spinlock_t lq_lock; /** Protect the whole structure */ + cfs_spinlock_t lq_lock; /** Protect the whole structure */ enum qunit_state lq_state; /** Present the status of qunit */ int lq_rc; /** The rc of lq_data */ pid_t lq_owner; @@ -117,19 +115,19 @@ struct lustre_qunit { #define QUNIT_SET_STATE(qunit, state) \ do { \ - spin_lock(&qunit->lq_lock); \ + cfs_spin_lock(&qunit->lq_lock); \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ qunit_state_names[state], qunit->lq_rc, \ qunit->lq_owner); \ qunit->lq_state = state; \ - spin_unlock(&qunit->lq_lock); \ + cfs_spin_unlock(&qunit->lq_lock); \ } while(0) #define QUNIT_SET_STATE_AND_RC(qunit, state, rc) \ do { \ - spin_lock(&qunit->lq_lock); \ + cfs_spin_lock(&qunit->lq_lock); \ qunit->lq_rc = rc; \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ "lq_rc(%d), lq_owner(%d)\n", \ @@ -137,7 +135,7 @@ do { \ qunit_state_names[state], qunit->lq_rc, \ qunit->lq_owner); \ qunit->lq_state = state; \ - spin_unlock(&qunit->lq_lock); \ + cfs_spin_unlock(&qunit->lq_lock); \ } while(0) int should_translate_quota (struct obd_import *imp) @@ -156,10 +154,10 @@ void qunit_cache_cleanup(void) int i; ENTRY; - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) - LASSERT(list_empty(qunit_hash + i)); - spin_unlock(&qunit_hash_lock); + LASSERT(cfs_list_empty(qunit_hash + i)); + cfs_spin_unlock(&qunit_hash_lock); if (qunit_cachep) { int rc; @@ -182,10 +180,10 @@ int qunit_cache_init(void) if (!qunit_cachep) RETURN(-ENOMEM); - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) CFS_INIT_LIST_HEAD(qunit_hash + i); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); RETURN(0); } @@ -213,7 +211,7 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent, struct qunit_data *tmp; LASSERT_SPIN_LOCKED(&qunit_hash_lock); - list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) { + cfs_list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) { tmp = &qunit->lq_data; if (qunit->lq_ctxt == qctxt && qdata->qd_id == tmp->qd_id && @@ -249,12 +247,12 @@ check_cur_qunit(struct obd_device *obd, if (!ll_sb_any_quota_active(sb)) RETURN(0); - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); if (!qctxt->lqc_valid){ - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); RETURN(0); } - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); OBD_ALLOC_PTR(qctl); if (qctl == NULL) @@ -289,11 +287,11 @@ check_cur_qunit(struct obd_device *obd, lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), qctxt, 0); if (IS_ERR(lqs) || lqs == NULL) { - CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n", - QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id); + CERROR("fail to find a lqs for %sid: %u)!\n", + QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id); GOTO (out, ret = 0); } - spin_lock(&lqs->lqs_lock); + cfs_spin_lock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) { qunit_sz = lqs->lqs_bunit_sz; @@ -318,7 +316,7 @@ check_cur_qunit(struct obd_device *obd, limit_org = limit; /* when a releasing quota req is sent, before it returned limit is assigned a small value. limit will overflow */ - if (limit + record < 0) + if (record < 0) usage -= record; else limit += record; @@ -337,19 +335,25 @@ check_cur_qunit(struct obd_device *obd, ret = 2; /* if there are other pending writes for this uid/gid, releasing * quota is put off until the last pending write b=16645 */ - if (ret == 2 && pending_write) { + /* if there is an ongoing quota request, a releasing request is aborted. + * That ongoing quota request will call this function again when + * it returned b=18630 */ + if (pending_write || record) { CDEBUG(D_QUOTA, "delay quota release\n"); ret = 0; } } + if (ret > 0) + quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0); + CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64 - ", pending_write: "LPU64", record: "LPD64 + ", pending_write: "LPU64", record: %lld" ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n", QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write, record, qunit_sz, tune_sz, ret); LASSERT(ret == 0 || qdata->qd_count); - spin_unlock(&lqs->lqs_lock); + cfs_spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); EXIT; @@ -425,12 +429,12 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, RETURN(NULL); CFS_INIT_LIST_HEAD(&qunit->lq_hash); - init_waitqueue_head(&qunit->lq_waitq); - atomic_set(&qunit->lq_refcnt, 1); + cfs_waitq_init(&qunit->lq_waitq); + cfs_atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; memcpy(&qunit->lq_data, qdata, sizeof(*qdata)); qunit->lq_opc = opc; - qunit->lq_lock = SPIN_LOCK_UNLOCKED; + qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); @@ -443,13 +447,13 @@ static inline void free_qunit(struct lustre_qunit *qunit) static inline void qunit_get(struct lustre_qunit *qunit) { - atomic_inc(&qunit->lq_refcnt); + cfs_atomic_inc(&qunit->lq_refcnt); } static void qunit_put(struct lustre_qunit *qunit) { - LASSERT(atomic_read(&qunit->lq_refcnt)); - if (atomic_dec_and_test(&qunit->lq_refcnt)) + LASSERT(cfs_atomic_read(&qunit->lq_refcnt)); + if (cfs_atomic_dec_and_test(&qunit->lq_refcnt)) free_qunit(qunit); } @@ -471,12 +475,12 @@ static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, static void insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) { - struct list_head *head; + cfs_list_t *head; - LASSERT(list_empty(&qunit->lq_hash)); + LASSERT(cfs_list_empty(&qunit->lq_hash)); qunit_get(qunit); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); - list_add(&qunit->lq_hash, head); + cfs_list_add(&qunit->lq_hash, head); QUNIT_SET_STATE(qunit, QUNIT_IN_HASH); } @@ -488,12 +492,12 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) qunit->lq_data.qd_id), qunit->lq_ctxt, 0); if (lqs && !IS_ERR(lqs)) { - spin_lock(&lqs->lqs_lock); + cfs_spin_lock(&lqs->lqs_lock); if (qunit->lq_opc == QUOTA_DQACQ) quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); if (qunit->lq_opc == QUOTA_DQREL) quota_compute_lqs(&qunit->lq_data, lqs, 0, 0); - spin_unlock(&lqs->lqs_lock); + cfs_spin_unlock(&lqs->lqs_lock); /* this is for quota_search_lqs */ lqs_putref(lqs); /* this is for schedule_dqacq */ @@ -503,10 +507,10 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) static void remove_qunit_nolock(struct lustre_qunit *qunit) { - LASSERT(!list_empty(&qunit->lq_hash)); + LASSERT(!cfs_list_empty(&qunit->lq_hash)); LASSERT_SPIN_LOCKED(&qunit_hash_lock); - list_del_init(&qunit->lq_hash); + cfs_list_del_init(&qunit->lq_hash); QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH); qunit_put(qunit); } @@ -517,19 +521,20 @@ void* quota_barrier(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit, *find_qunit; int cycle = 1; - OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + OBD_SLAB_ALLOC_PTR(qunit, qunit_cachep); if (qunit == NULL) { - CERROR("locating qunit failed.(id=%u isblk=%d %s)\n", - oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr"); + CERROR("locating %sunit failed for %sid %u\n", + isblk ? "b" : "i", oqctl->qc_type ? "g" : "u", + oqctl->qc_id); qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, oqctl->qc_type, isblk); return NULL; } - INIT_LIST_HEAD(&qunit->lq_hash); - qunit->lq_lock = SPIN_LOCK_UNLOCKED; - init_waitqueue_head(&qunit->lq_waitq); - atomic_set(&qunit->lq_refcnt, 1); + CFS_INIT_LIST_HEAD(&qunit->lq_hash); + qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED; + cfs_waitq_init(&qunit->lq_waitq); + cfs_atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; qunit->lq_data.qd_id = oqctl->qc_id; qunit->lq_data.qd_flags = oqctl->qc_type; @@ -540,10 +545,10 @@ void* quota_barrier(struct lustre_quota_ctxt *qctxt, qunit->lq_opc = QUOTA_LAST_OPC; while (1) { - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data); if (find_qunit) { - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); qunit_put(find_qunit); qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, oqctl->qc_type, isblk); @@ -553,7 +558,7 @@ void* quota_barrier(struct lustre_quota_ctxt *qctxt, break; } insert_qunit_nolock(qctxt, qunit); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); return qunit; } @@ -567,11 +572,11 @@ void quota_unbarrier(void *handle) } LASSERT(qunit->lq_opc == QUOTA_LAST_OPC); - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); remove_qunit_nolock(qunit); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED); - wake_up(&qunit->lq_waitq); + cfs_waitq_signal(&qunit->lq_waitq); qunit_put(qunit); } @@ -698,12 +703,12 @@ out_mem: } out: /* remove the qunit from hash */ - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); /* this qunit has been removed by qctxt_cleanup() */ if (!qunit) { - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); RETURN(err); @@ -713,7 +718,7 @@ out: /* remove this qunit from lq_hash so that new processes cannot be added * to qunit->lq_waiters */ remove_qunit_nolock(qunit); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); compute_lqs_after_removing_qunit(qunit); @@ -721,14 +726,12 @@ out: rc = QUOTA_REQ_RETURNED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); /* wake up all waiters */ - wake_up_all(&qunit->lq_waitq); + cfs_waitq_broadcast(&qunit->lq_waitq); /* this is for dqacq_in_flight() */ qunit_put(qunit); - /* this is for alloc_qunit() */ - qunit_put(qunit); if (rc < 0 && rc != -EDQUOT) - RETURN(err); + GOTO(out1, err); /* don't reschedule in such cases: * - acq/rel failure and qunit isn't changed, @@ -738,21 +741,21 @@ out: */ OBD_ALLOC_PTR(oqaq); if (!oqaq) - RETURN(-ENOMEM); + GOTO(out1, err = -ENOMEM); qdata_to_oqaq(qdata, oqaq); /* adjust the qunit size in slaves */ rc1 = quota_adjust_slave_lqs(oqaq, qctxt); OBD_FREE_PTR(oqaq); if (rc1 < 0) { CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); - RETURN(rc1); + GOTO(out1, err = rc1); } if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) - RETURN(err); + GOTO(out1, err); if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 && OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL)) - RETURN(err); + GOTO(out1, err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; @@ -764,6 +767,9 @@ out: rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL); QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } + out1: + /* this is for alloc_qunit() */ + qunit_put(qunit); RETURN(err); } @@ -786,7 +792,7 @@ static int dqacq_interpret(const struct lu_env *env, LASSERT(req); LASSERT(req->rq_import); - down_read(&obt->obt_rwsem); + cfs_down_read(&obt->obt_rwsem); /* if a quota req timeouts or is dropped, we should update quota * statistics which will be handled in dqacq_completion. And in * this situation we should get qdata from request instead of @@ -798,7 +804,7 @@ static int dqacq_interpret(const struct lu_env *env, DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data(rc: %ld)\n", PTR_ERR(qdata)); - RETURN(PTR_ERR(qdata)); + qdata = &qunit->lq_data; } QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); @@ -806,14 +812,14 @@ static int dqacq_interpret(const struct lu_env *env, if (qdata->qd_id != qunit->lq_data.qd_id || OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) { - CDEBUG(D_ERROR, "the returned qd_id isn't expected!" + CERROR("the returned qd_id isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_id, qunit->lq_data.qd_id); qdata->qd_id = qunit->lq_data.qd_id; rc = -EPROTO; } if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) { - CDEBUG(D_ERROR, "the returned grp/usr isn't expected!" + CERROR("the returned grp/usr isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_flags, qunit->lq_data.qd_flags); if (QDATA_IS_GRP(&qunit->lq_data)) @@ -823,7 +829,7 @@ static int dqacq_interpret(const struct lu_env *env, rc = -EPROTO; } if (qdata->qd_count > qunit->lq_data.qd_count) { - CDEBUG(D_ERROR, "the returned qd_count isn't expected!" + CERROR("the returned qd_count isn't expected!" "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count, qunit->lq_data.qd_count); rc = -EPROTO; @@ -836,7 +842,7 @@ static int dqacq_interpret(const struct lu_env *env, rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); - up_read(&obt->obt_rwsem); + cfs_up_read(&obt->obt_rwsem); RETURN(rc); } @@ -848,10 +854,10 @@ int check_qm(struct lustre_quota_ctxt *qctxt) int rc; ENTRY; - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); /* quit waiting when mds is back or qctxt is cleaned up */ rc = qctxt->lqc_import || !qctxt->lqc_valid; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); RETURN(rc); } @@ -863,9 +869,10 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) int i; ENTRY; - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) { - list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { + cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], + lq_hash) { if (qunit->lq_ctxt != qctxt) continue; @@ -874,10 +881,10 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) * if no others change it, then the waiters will return * -EAGAIN to caller who can perform related quota * acq/rel if necessary. */ - wake_up_all(&qunit->lq_waitq); + cfs_waitq_broadcast(&qunit->lq_waitq); } } - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); EXIT; } @@ -887,7 +894,7 @@ static int got_qunit(struct lustre_qunit *qunit, int is_master) int rc = 0; ENTRY; - spin_lock(&qunit->lq_lock); + cfs_spin_lock(&qunit->lq_lock); switch (qunit->lq_state) { case QUNIT_IN_HASH: case QUNIT_RM_FROM_HASH: @@ -898,19 +905,29 @@ static int got_qunit(struct lustre_qunit *qunit, int is_master) default: CERROR("invalid qunit state %d\n", qunit->lq_state); } - spin_unlock(&qunit->lq_lock); + cfs_spin_unlock(&qunit->lq_lock); if (!rc) { - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); rc = !qctxt->lqc_valid; if (!is_master) rc |= !qctxt->lqc_import; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); } RETURN(rc); } +static inline void +revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc) +{ + /* revoke lqs_xxx_rec which is computed in check_cur_qunit + * b=18630 */ + cfs_spin_lock(&lqs->lqs_lock); + quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0); + cfs_spin_unlock(&lqs->lqs_lock); +} + static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait, @@ -929,36 +946,46 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ENTRY; LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL); - do_gettimeofday(&work_start); - if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) + cfs_gettimeofday(&work_start); + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CERROR("Can't find the lustre qunit size!\n"); + RETURN(-EPERM); + } + + if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) { + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); RETURN(-ENOMEM); + } - spin_lock(&qunit_hash_lock); + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5); + + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); qunit_put(empty); + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); goto wait_completion; } qunit = empty; qunit_get(qunit); insert_qunit_nolock(qctxt, qunit); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); - lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), - qctxt, 0); - if (lqs && !IS_ERR(lqs)) { - spin_lock(&lqs->lqs_lock); - quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0); - /* when this qdata returned from mds, it will call lqs_putref */ - lqs_getref(lqs); - spin_unlock(&lqs->lqs_lock); - /* this is for quota_search_lqs */ - lqs_putref(lqs); - } else { - CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n"); - } + /* From here, the quota request will be sent anyway. + * When this qdata request returned or is cancelled, + * lqs_putref will be called at that time */ + lqs_getref(lqs); + /* this is for quota_search_lqs */ + lqs_putref(lqs); QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n", obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); @@ -973,7 +1000,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, /* this is for qunit_get() */ qunit_put(qunit); - do_gettimeofday(&work_end); + cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); if (opc == QUOTA_DQACQ) lprocfs_counter_add(qctxt->lqc_stats, @@ -986,46 +1013,48 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, RETURN(rc ? rc : rc2); } - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); if (!qctxt->lqc_import) { - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); QDATA_DEBUG(qdata, "lqc_import is invalid.\n"); - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); remove_qunit_nolock(qunit); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); compute_lqs_after_removing_qunit(qunit); QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); - wake_up_all(&qunit->lq_waitq); + cfs_waitq_broadcast(&qunit->lq_waitq); /* this is for qunit_get() */ qunit_put(qunit); /* this for alloc_qunit() */ qunit_put(qunit); - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); if (wait && !qctxt->lqc_import) { - spin_unlock(&qctxt->lqc_lock); - - LASSERT(oti && oti->oti_thread && - oti->oti_thread->t_watchdog); - - lc_watchdog_disable(oti->oti_thread->t_watchdog); + cfs_spin_unlock(&qctxt->lqc_lock); + LASSERT(oti && oti->oti_thread); + /* The recovery thread doesn't have watchdog + * attached. LU-369 */ + if (oti->oti_thread->t_watchdog) + lc_watchdog_disable(oti->oti_thread->\ + t_watchdog); CDEBUG(D_QUOTA, "sleep for quota master\n"); l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt), &lwi); CDEBUG(D_QUOTA, "wake up when quota master is back\n"); - lc_watchdog_touch(oti->oti_thread->t_watchdog, - GET_TIMEOUT(oti->oti_thread->t_svc)); + if (oti->oti_thread->t_watchdog) + lc_watchdog_touch(oti->oti_thread->t_watchdog, + CFS_GET_TIMEOUT(oti->oti_thread->t_svc)); } else { - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); } RETURN(-EAGAIN); } imp = class_import_get(qctxt->lqc_import); - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); /* build dqacq/dqrel request */ LASSERT(imp); @@ -1034,7 +1063,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, LUSTRE_MDS_VERSION, opc); class_import_put(imp); if (req == NULL) { - CDEBUG(D_ERROR, "Can't alloc request\n"); + CERROR("Can't alloc request\n"); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); /* this is for qunit_get() */ qunit_put(qunit); @@ -1045,7 +1074,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, req->rq_no_resend = req->rq_no_delay = 1; rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); if (rc < 0) { - CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); + CERROR("Can't pack qunit_data(rc: %d)\n", rc); ptlrpc_req_finished(req); dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); /* this is for qunit_get() */ @@ -1059,7 +1088,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, aa->aa_qunit = qunit; req->rq_interpret_reply = dqacq_interpret; - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); QDATA_DEBUG(qdata, "%s scheduled.\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); @@ -1076,16 +1105,16 @@ wait_completion: * rc = -EBUSY, it means recovery is happening * other rc < 0, it means real errors, functions who call * schedule_dqacq should take care of this */ - spin_lock(&qunit->lq_lock); + cfs_spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; - spin_unlock(&qunit->lq_lock); + cfs_spin_unlock(&qunit->lq_lock); CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, qunit->lq_data.qd_flags, rc, qunit->lq_owner); } qunit_put(qunit); - do_gettimeofday(&work_end); + cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); if (opc == QUOTA_DQACQ) lprocfs_counter_add(qctxt->lqc_stats, @@ -1152,16 +1181,16 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, int rc = 0; ENTRY; - do_gettimeofday(&work_start); + cfs_gettimeofday(&work_start); qdata.qd_id = id; qdata.qd_flags = type; if (isblk) QDATA_SET_BLK(&qdata); qdata.qd_count = 0; - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, &qdata); - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); if (qunit) { struct qunit_data *p = &qunit->lq_data; @@ -1172,19 +1201,19 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); /* keep same as schedule_dqacq() b=17030 */ - spin_lock(&qunit->lq_lock); + cfs_spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; - spin_unlock(&qunit->lq_lock); + cfs_spin_unlock(&qunit->lq_lock); /* this is for dqacq_in_flight() */ qunit_put(qunit); - do_gettimeofday(&work_end); + cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); lprocfs_counter_add(qctxt->lqc_stats, isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA : LQUOTA_WAIT_PENDING_INO_QUOTA, timediff); } else { - do_gettimeofday(&work_end); + cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); lprocfs_counter_add(qctxt->lqc_stats, isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA : @@ -1212,9 +1241,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); cfs_waitq_init(&qctxt->lqc_lqs_waitq); - atomic_set(&qctxt->lqc_lqs, 0); - spin_lock_init(&qctxt->lqc_lock); - spin_lock(&qctxt->lqc_lock); + cfs_atomic_set(&qctxt->lqc_lqs, 0); + cfs_spin_lock_init(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; qctxt->lqc_sb = sb; qctxt->lqc_obt = obt; @@ -1235,12 +1264,16 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes * after the last shrinking */ qctxt->lqc_sync_blk = 0; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH", - HASH_LQS_CUR_BITS, + hash_lqs_cur_bits, HASH_LQS_MAX_BITS, - &lqs_hash_ops, CFS_HASH_REHASH); + min(hash_lqs_cur_bits, + HASH_LQS_BKT_BITS), + 0, CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &lqs_hash_ops, CFS_HASH_DEFAULT); if (!qctxt->lqc_lqs_hash) { CERROR("initialize hash lqs for %s error!\n", obd->obd_name); RETURN(-ENOMEM); @@ -1260,21 +1293,23 @@ static int check_lqs(struct lustre_quota_ctxt *qctxt) int rc; ENTRY; - rc = !atomic_read(&qctxt->lqc_lqs); + rc = !cfs_atomic_read(&qctxt->lqc_lqs); RETURN(rc); } - -void hash_put_lqs(void *obj, void *data) +int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd, + cfs_hlist_node_t *hnode, void *data) { - lqs_putref((struct lustre_qunit_size *)obj); + /* remove from hash and -1 refcount */ + cfs_hash_bd_del_locked(hs, bd, hnode); + return 0; } void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; - struct list_head tmp_list; + cfs_list_t tmp_list; struct l_wait_info lwi = { 0 }; struct obd_device_target *obt = qctxt->lqc_obt; int i; @@ -1282,28 +1317,29 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) CFS_INIT_LIST_HEAD(&tmp_list); - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); qctxt->lqc_valid = 0; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); - spin_lock(&qunit_hash_lock); + cfs_spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) { - list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { + cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], + lq_hash) { if (qunit->lq_ctxt != qctxt) continue; remove_qunit_nolock(qunit); - list_add(&qunit->lq_hash, &tmp_list); + cfs_list_add(&qunit->lq_hash, &tmp_list); } } - spin_unlock(&qunit_hash_lock); + cfs_spin_unlock(&qunit_hash_lock); - list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) { - list_del_init(&qunit->lq_hash); + cfs_list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) { + cfs_list_del_init(&qunit->lq_hash); compute_lqs_after_removing_qunit(qunit); /* wake up all waiters */ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0); - wake_up_all(&qunit->lq_waitq); + cfs_waitq_broadcast(&qunit->lq_waitq); qunit_put(qunit); } @@ -1311,16 +1347,18 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); - cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, - cfs_time_seconds(1)); + cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(1)); } - cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); + /* release refcount on lustre_qunit_size holding by lqs_hash */ + cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL); + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); - down_write(&obt->obt_rwsem); - cfs_hash_destroy(qctxt->lqc_lqs_hash); + cfs_down_write(&obt->obt_rwsem); + cfs_hash_putref(qctxt->lqc_lqs_hash); qctxt->lqc_lqs_hash = NULL; - up_write(&obt->obt_rwsem); + cfs_up_write(&obt->obt_rwsem); ptlrpcd_decref(); @@ -1335,7 +1373,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) struct qslave_recov_thread_data { struct obd_device *obd; struct lustre_quota_ctxt *qctxt; - struct completion comp; + cfs_completion_t comp; }; /* FIXME only recovery block quota by now */ @@ -1353,22 +1391,22 @@ static int qslave_recovery_main(void *arg) /* for obdfilter */ class_incref(obd, "qslave_recovd_filter", obd); - complete(&data->comp); + cfs_complete(&data->comp); - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); if (qctxt->lqc_recovery) { - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); } else { qctxt->lqc_recovery = 1; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); } for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; struct quota_info *dqopt = sb_dqopt(qctxt->lqc_sb); - struct list_head id_list; + cfs_list_t id_list; struct dquot_id *dqid, *tmp; int ret; @@ -1389,8 +1427,8 @@ static int qslave_recovery_main(void *arg) if (rc) CERROR("Get ids from quota file failed. (rc:%d)\n", rc); - list_for_each_entry_safe(dqid, tmp, &id_list, di_link) { - list_del_init(&dqid->di_link); + cfs_list_for_each_entry_safe(dqid, tmp, &id_list, di_link) { + cfs_list_del_init(&dqid->di_link); /* skip slave recovery on itself */ if (is_master(qctxt)) goto free; @@ -1414,18 +1452,17 @@ static int qslave_recovery_main(void *arg) rc = 0; } - if (rc) - CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR, - "qslave recovery failed! (id:%d type:%d " + if (rc && rc != -EBUSY) + CERROR("qslave recovery failed! (id:%d type:%d " " rc:%d)\n", dqid->di_id, type, rc); free: OBD_FREE_PTR(dqid); } } - spin_lock(&qctxt->lqc_lock); + cfs_spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; - spin_unlock(&qctxt->lqc_lock); + cfs_spin_unlock(&qctxt->lqc_lock); class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } @@ -1442,42 +1479,30 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt) data.obd = obd; data.qctxt = qctxt; - init_completion(&data.comp); + cfs_init_completion(&data.comp); - rc = kernel_thread(qslave_recovery_main, &data, CLONE_VM|CLONE_FILES); + rc = cfs_create_thread(qslave_recovery_main, &data, + CFS_DAEMON_FLAGS); if (rc < 0) { CERROR("Cannot start quota recovery thread: rc %d\n", rc); goto exit; } - wait_for_completion(&data.comp); + cfs_wait_for_completion(&data.comp); exit: EXIT; } -int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl) +inline int quota_is_on(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl) { - unsigned int type; - - for (type = USRQUOTA; type < MAXQUOTAS; type++) { - if (!Q_TYPESET(oqctl, type)) - continue; - if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) - return 0; - } - return 1; + return ((qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) == + UGQUOTA2LQC(oqctl->qc_type)); } -int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl) +inline int quota_is_off(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl) { - unsigned int type; - - for (type = USRQUOTA; type < MAXQUOTAS; type++) { - if (!Q_TYPESET(oqctl, type)) - continue; - if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) - return 0; - } - return 1; + return !(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)); } /** @@ -1491,11 +1516,11 @@ void build_lqs(struct obd_device *obd) { struct obd_device_target *obt = &obd->u.obt; struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; - struct list_head id_list; + cfs_list_t id_list; int i, rc; LASSERT_SEM_LOCKED(&obt->obt_quotachecking); - INIT_LIST_HEAD(&id_list); + CFS_INIT_LIST_HEAD(&id_list); for (i = 0; i < MAXQUOTAS; i++) { struct dquot_id *dqid, *tmp; @@ -1510,24 +1535,24 @@ void build_lqs(struct obd_device *obd) i, &id_list); #endif if (rc) { - CDEBUG(D_ERROR, "fail to get %s qids!\n", + CERROR("%s: failed to get %s qids!\n", obd->obd_name, i ? "group" : "user"); continue; } - list_for_each_entry_safe(dqid, tmp, &id_list, - di_link) { + cfs_list_for_each_entry_safe(dqid, tmp, &id_list, + di_link) { struct lustre_qunit_size *lqs; - list_del_init(&dqid->di_link); + cfs_list_del_init(&dqid->di_link); lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id), qctxt, 1); if (lqs && !IS_ERR(lqs)) { lqs->lqs_flags |= dqid->di_flag; lqs_putref(lqs); } else { - CDEBUG(D_ERROR, "fail to create a lqs" - "(%s id: %u)!\n", i ? "group" : "user", + CERROR("%s: failed to create a lqs for %sid %u" + "\n", obd->obd_name, i ? "g" : "u", dqid->di_id); } @@ -1544,84 +1569,74 @@ void build_lqs(struct obd_device *obd) * string hashing using djb2 hash algorithm */ static unsigned -lqs_hash(cfs_hash_t *hs, void *key, unsigned mask) +lqs_hash(cfs_hash_t *hs, const void *key, unsigned mask) { - struct quota_adjust_qunit *lqs_key; + unsigned long long id; unsigned hash; ENTRY; LASSERT(key); - lqs_key = (struct quota_adjust_qunit *)key; - hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id; + id = *((unsigned long long *)key); + hash = (LQS_KEY_GRP(id) ? 5381 : 5387) * (unsigned)LQS_KEY_ID(id); RETURN(hash & mask); } -static int -lqs_compare(void *key, struct hlist_node *hnode) +static void * +lqs_key(cfs_hlist_node_t *hnode) { - struct lustre_qunit_size *q; - int rc; + struct lustre_qunit_size *lqs; ENTRY; - LASSERT(key); - q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - - spin_lock(&q->lqs_lock); - rc = (q->lqs_key == *((unsigned long long *)key)); - spin_unlock(&q->lqs_lock); - - RETURN(rc); + lqs = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + RETURN(&lqs->lqs_key); } -static void * -lqs_get(struct hlist_node *hnode) +static int +lqs_keycmp(const void *key, cfs_hlist_node_t *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - ENTRY; - - __lqs_getref(q); + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - RETURN(q); + RETURN(q->lqs_key == *((unsigned long long *)key)); } static void * -lqs_put(struct hlist_node *hnode) +lqs_object(cfs_hlist_node_t *hnode) { - struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - ENTRY; + return cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); +} - __lqs_putref(q); +static void +lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct lustre_qunit_size *q = + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - RETURN(q); + lqs_getref(q); } static void -lqs_exit(struct hlist_node *hnode) +lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - ENTRY; + cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - /* - * Nothing should be left. User of lqs put it and - * lqs also was deleted from table by this time - * so we should have 0 refs. - */ - LASSERTF(atomic_read(&q->lqs_refcount) == 0, - "Busy lqs %p with %d refs\n", q, - atomic_read(&q->lqs_refcount)); - OBD_FREE_PTR(q); - EXIT; + lqs_putref(q); +} + +static void +lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + CERROR("It should not have any item left to be handled by this!"); } static cfs_hash_ops_t lqs_hash_ops = { - .hs_hash = lqs_hash, - .hs_compare = lqs_compare, - .hs_get = lqs_get, - .hs_put = lqs_put, - .hs_exit = lqs_exit + .hs_hash = lqs_hash, + .hs_key = lqs_key, + .hs_keycmp = lqs_keycmp, + .hs_object = lqs_object, + .hs_get = lqs_get, + .hs_put_locked = lqs_put_locked, + .hs_exit = lqs_exit }; -#endif /* HAVE_QUOTA_SUPPORT */