Whamcloud - gitweb
LU-882 quota: Quota code compares unsigned < 0
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 537f101..f1be4f8 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
 #include <obd_class.h>
 #include <lustre_quota.h>
 #include <lustre_fsfilt.h>
-#include <class_hash.h>
 #include <lprocfs_status.h>
 #include "quota_internal.h"
 
+static int hash_lqs_cur_bits = HASH_LQS_CUR_BITS;
+CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
+                "the current bits of lqs hash");
+
 #ifdef HAVE_QUOTA_SUPPORT
 
-static lustre_hash_ops_t lqs_hash_ops;
+static cfs_hash_ops_t lqs_hash_ops;
 
 unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */
 unsigned long default_btune_ratio = 50;             /* 50 percentage */
@@ -71,8 +77,8 @@ unsigned long default_iunit_sz = 5120;              /* 5120 inodes */
 unsigned long default_itune_ratio = 50;             /* 50 percentage */
 
 cfs_mem_cache_t *qunit_cachep = NULL;
-struct list_head qunit_hash[NR_DQHASH];
-spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
+cfs_list_t qunit_hash[NR_DQHASH];
+cfs_spinlock_t qunit_hash_lock = CFS_SPIN_LOCK_UNLOCKED;
 
 /* please sync qunit_state with qunit_state_names */
 enum qunit_state {
@@ -104,41 +110,43 @@ static const char *qunit_state_names[] = {
 };
 
 struct lustre_qunit {
-        struct list_head lq_hash;          /** Hash list in memory */
-        atomic_t lq_refcnt;                /** Use count */
+        cfs_list_t lq_hash;      /** Hash list in memory */
+        cfs_atomic_t lq_refcnt;            /** Use count */
         struct lustre_quota_ctxt *lq_ctxt; /** Quota context this applies to */
         struct qunit_data lq_data;         /** See qunit_data */
         unsigned int lq_opc;               /** QUOTA_DQACQ, QUOTA_DQREL */
         cfs_waitq_t lq_waitq;              /** Threads waiting for this qunit */
-        spinlock_t lq_lock;                /** Protect the whole structure */
+        cfs_spinlock_t lq_lock;            /** Protect the whole structure */
         enum qunit_state lq_state;         /** Present the status of qunit */
         int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
 #define QUNIT_SET_STATE(qunit, state)                                   \
 do {                                                                    \
-        spin_lock(&qunit->lq_lock);                                     \
+        cfs_spin_lock(&qunit->lq_lock);                                 \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
-        spin_unlock(&qunit->lq_lock);                                   \
+        cfs_spin_unlock(&qunit->lq_lock);                               \
 } while(0)
 
 #define QUNIT_SET_STATE_AND_RC(qunit, state, rc)                        \
 do {                                                                    \
-        spin_lock(&qunit->lq_lock);                                     \
+        cfs_spin_lock(&qunit->lq_lock);                                 \
         qunit->lq_rc = rc;                                              \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
-        spin_unlock(&qunit->lq_lock);                                   \
+        cfs_spin_unlock(&qunit->lq_lock);                               \
 } while(0)
 
-
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
@@ -155,10 +163,10 @@ void qunit_cache_cleanup(void)
         int i;
         ENTRY;
 
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++)
-                LASSERT(list_empty(qunit_hash + i));
-        spin_unlock(&qunit_hash_lock);
+                LASSERT(cfs_list_empty(qunit_hash + i));
+        cfs_spin_unlock(&qunit_hash_lock);
 
         if (qunit_cachep) {
                 int rc;
@@ -181,10 +189,10 @@ int qunit_cache_init(void)
         if (!qunit_cachep)
                 RETURN(-ENOMEM);
 
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++)
                 CFS_INIT_LIST_HEAD(qunit_hash + i);
-        spin_unlock(&qunit_hash_lock);
+        cfs_spin_unlock(&qunit_hash_lock);
         RETURN(0);
 }
 
@@ -212,7 +220,7 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent,
         struct qunit_data *tmp;
 
         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
-        list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) {
+        cfs_list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) {
                 tmp = &qunit->lq_data;
                 if (qunit->lq_ctxt == qctxt &&
                     qdata->qd_id == tmp->qd_id &&
@@ -245,15 +253,15 @@ check_cur_qunit(struct obd_device *obd,
         int ret = 0;
         ENTRY;
 
-        if (!sb_any_quota_enabled(sb))
+        if (!ll_sb_any_quota_active(sb))
                 RETURN(0);
 
-        spin_lock(&qctxt->lqc_lock);
+        cfs_spin_lock(&qctxt->lqc_lock);
         if (!qctxt->lqc_valid){
-                spin_unlock(&qctxt->lqc_lock);
+                cfs_spin_unlock(&qctxt->lqc_lock);
                 RETURN(0);
         }
-        spin_unlock(&qctxt->lqc_lock);
+        cfs_spin_unlock(&qctxt->lqc_lock);
 
         OBD_ALLOC_PTR(qctl);
         if (qctl == NULL)
@@ -285,19 +293,14 @@ check_cur_qunit(struct obd_device *obd,
         if (!limit)
                 GOTO(out, ret = 0);
 
- search_lqs:
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (!lqs) {
-                CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
-                ret = quota_create_lqs(qdata, NULL, qctxt, &lqs);
-                if (ret == -EALREADY) {
-                        ret = 0;
-                        goto search_lqs;
-                }
-                if (ret < 0)
-                        GOTO (out, ret);
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (IS_ERR(lqs) || lqs == NULL) {
+                CERROR("fail to find a lqs for %sid: %u)!\n",
+                       QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id);
+                GOTO (out, ret = 0);
         }
-        spin_lock(&lqs->lqs_lock);
+        cfs_spin_lock(&lqs->lqs_lock);
 
         if (QDATA_IS_BLK(qdata)) {
                 qunit_sz = lqs->lqs_bunit_sz;
@@ -322,7 +325,7 @@ check_cur_qunit(struct obd_device *obd,
         limit_org = limit;
         /* when a releasing quota req is sent, before it returned
            limit is assigned a small value. limit will overflow */
-        if (limit + record < 0)
+        if (record < 0)
                 usage -= record;
         else
                 limit += record;
@@ -341,20 +344,27 @@ check_cur_qunit(struct obd_device *obd,
                 ret = 2;
                 /* if there are other pending writes for this uid/gid, releasing
                  * quota is put off until the last pending write b=16645 */
-                if (ret == 2 && pending_write) {
+                /* if there is an ongoing quota request, a releasing request is aborted.
+                 * That ongoing quota request will call this function again when
+                 * it returned b=18630 */
+                if (pending_write || record) {
                         CDEBUG(D_QUOTA, "delay quota release\n");
                         ret = 0;
                 }
         }
+        if (ret > 0)
+                quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
+
         CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
-               ", pending_write: "LPU64", record: "LPD64
+               ", pending_write: "LPU64", record: %lld"
                ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
                QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write,
                record, qunit_sz, tune_sz, ret);
         LASSERT(ret == 0 || qdata->qd_count);
 
-        spin_unlock(&lqs->lqs_lock);
+        cfs_spin_unlock(&lqs->lqs_lock);
         lqs_putref(lqs);
+
         EXIT;
  out:
         OBD_FREE_PTR(qctl);
@@ -373,7 +383,7 @@ int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         int ret = QUOTA_RET_OK;
         ENTRY;
 
-        if (!sb_any_quota_enabled(sb))
+        if (!ll_sb_any_quota_active(sb))
                 RETURN(QUOTA_RET_NOQUOTA);
 
         /* ignore root user */
@@ -428,13 +438,14 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
                 RETURN(NULL);
 
         CFS_INIT_LIST_HEAD(&qunit->lq_hash);
-        init_waitqueue_head(&qunit->lq_waitq);
-        atomic_set(&qunit->lq_refcnt, 1);
+        cfs_waitq_init(&qunit->lq_waitq);
+        cfs_atomic_set(&qunit->lq_refcnt, 1);
         qunit->lq_ctxt = qctxt;
         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
         qunit->lq_opc = opc;
-        qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+        qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -445,13 +456,13 @@ static inline void free_qunit(struct lustre_qunit *qunit)
 
 static inline void qunit_get(struct lustre_qunit *qunit)
 {
-        atomic_inc(&qunit->lq_refcnt);
+        cfs_atomic_inc(&qunit->lq_refcnt);
 }
 
 static void qunit_put(struct lustre_qunit *qunit)
 {
-        LASSERT(atomic_read(&qunit->lq_refcnt));
-        if (atomic_dec_and_test(&qunit->lq_refcnt))
+        LASSERT(cfs_atomic_read(&qunit->lq_refcnt));
+        if (cfs_atomic_dec_and_test(&qunit->lq_refcnt))
                 free_qunit(qunit);
 }
 
@@ -473,27 +484,29 @@ static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
 static void
 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 {
-        struct list_head *head;
+        cfs_list_t *head;
 
-        LASSERT(list_empty(&qunit->lq_hash));
+        LASSERT(cfs_list_empty(&qunit->lq_hash));
         qunit_get(qunit);
         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
-        list_add(&qunit->lq_hash, head);
+        cfs_list_add(&qunit->lq_hash, head);
         QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
 }
 
 static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
 {
-        struct lustre_qunit_size *lqs = NULL;
+        struct lustre_qunit_size *lqs;
 
-        quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs);
-        if (lqs) {
-                spin_lock(&lqs->lqs_lock);
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
+                                       qunit->lq_data.qd_id),
+                               qunit->lq_ctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
+                cfs_spin_lock(&lqs->lqs_lock);
                 if (qunit->lq_opc == QUOTA_DQACQ)
                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
                 if (qunit->lq_opc == QUOTA_DQREL)
                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 0);
-                spin_unlock(&lqs->lqs_lock);
+                cfs_spin_unlock(&lqs->lqs_lock);
                 /* this is for quota_search_lqs */
                 lqs_putref(lqs);
                 /* this is for schedule_dqacq */
@@ -503,14 +516,79 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
 {
-        LASSERT(!list_empty(&qunit->lq_hash));
+        LASSERT(!cfs_list_empty(&qunit->lq_hash));
         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
 
-        list_del_init(&qunit->lq_hash);
+        cfs_list_del_init(&qunit->lq_hash);
         QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
         qunit_put(qunit);
 }
 
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+                    struct obd_quotactl *oqctl, int isblk)
+{
+        struct lustre_qunit *qunit, *find_qunit;
+        int cycle = 1;
+
+        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        if (qunit == NULL) {
+                CERROR("locating %sunit failed for %sid %u\n",
+                       isblk ? "b" : "i", oqctl->qc_type ? "g" : "u",
+                       oqctl->qc_id);
+                qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                         oqctl->qc_type, isblk);
+                return NULL;
+        }
+
+        CFS_INIT_LIST_HEAD(&qunit->lq_hash);
+        qunit->lq_lock = CFS_SPIN_LOCK_UNLOCKED;
+        cfs_waitq_init(&qunit->lq_waitq);
+        cfs_atomic_set(&qunit->lq_refcnt, 1);
+        qunit->lq_ctxt = qctxt;
+        qunit->lq_data.qd_id = oqctl->qc_id;
+        qunit->lq_data.qd_flags =  oqctl->qc_type;
+        if (isblk)
+                QDATA_SET_BLK(&qunit->lq_data);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        /* it means it is only an invalid qunit for barrier */
+        qunit->lq_opc = QUOTA_LAST_OPC;
+
+        while (1) {
+                cfs_spin_lock(&qunit_hash_lock);
+                find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+                if (find_qunit) {
+                        cfs_spin_unlock(&qunit_hash_lock);
+                        qunit_put(find_qunit);
+                        qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                                 oqctl->qc_type, isblk);
+                        CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+                        continue;
+                }
+                break;
+        }
+        insert_qunit_nolock(qctxt, qunit);
+        cfs_spin_unlock(&qunit_hash_lock);
+        return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+        struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+        if (qunit == NULL) {
+                CERROR("handle is NULL\n");
+                return;
+        }
+
+        LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+        cfs_spin_lock(&qunit_hash_lock);
+        remove_qunit_nolock(qunit);
+        cfs_spin_unlock(&qunit_hash_lock);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+        cfs_waitq_signal(&qunit->lq_waitq);
+        qunit_put(qunit);
+}
+
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
@@ -525,6 +603,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
                struct obd_trans_info *oti);
 
+static inline void qdata_to_oqaq(struct qunit_data *qdata,
+                                 struct quota_adjust_qunit *oqaq)
+{
+        LASSERT(qdata);
+        LASSERT(oqaq);
+
+        oqaq->qaq_flags = qdata->qd_flags;
+        oqaq->qaq_id    = qdata->qd_id;
+        if (QDATA_IS_ADJBLK(qdata))
+                oqaq->qaq_bunit_sz = qdata->qd_qunit;
+        if (QDATA_IS_ADJINO(qdata))
+                oqaq->qaq_iunit_sz = qdata->qd_qunit;
+}
+
 static int
 dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                  struct qunit_data *qdata, int rc, int opc)
@@ -620,12 +712,12 @@ out_mem:
         }
 out:
         /* remove the qunit from hash */
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
 
         qunit = dqacq_in_flight(qctxt, qdata);
         /* this qunit has been removed by qctxt_cleanup() */
         if (!qunit) {
-                spin_unlock(&qunit_hash_lock);
+                cfs_spin_unlock(&qunit_hash_lock);
                 QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n",
                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
                 RETURN(err);
@@ -635,7 +727,7 @@ out:
         /* remove this qunit from lq_hash so that new processes cannot be added
          * to qunit->lq_waiters */
         remove_qunit_nolock(qunit);
-        spin_unlock(&qunit_hash_lock);
+        cfs_spin_unlock(&qunit_hash_lock);
 
         compute_lqs_after_removing_qunit(qunit);
 
@@ -643,14 +735,12 @@ out:
                 rc = QUOTA_REQ_RETURNED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
         /* wake up all waiters */
-        wake_up_all(&qunit->lq_waitq);
+        cfs_waitq_broadcast(&qunit->lq_waitq);
 
         /* this is for dqacq_in_flight() */
         qunit_put(qunit);
-        /* this is for alloc_qunit() */
-        qunit_put(qunit);
         if (rc < 0 && rc != -EDQUOT)
-                 RETURN(err);
+                GOTO(out1, err);
 
         /* don't reschedule in such cases:
          *   - acq/rel failure and qunit isn't changed,
@@ -660,21 +750,21 @@ out:
          */
          OBD_ALLOC_PTR(oqaq);
          if (!oqaq)
-                 RETURN(-ENOMEM);
+                 GOTO(out1, err = -ENOMEM);
          qdata_to_oqaq(qdata, oqaq);
          /* adjust the qunit size in slaves */
          rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
          OBD_FREE_PTR(oqaq);
          if (rc1 < 0) {
                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
-                 RETURN(rc1);
+                 GOTO(out1, err = rc1);
          }
          if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
-                 RETURN(err);
+                 GOTO(out1, err);
 
          if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
              OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
-                 RETURN(err);
+                 GOTO(out1, err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
@@ -686,6 +776,9 @@ out:
                 rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL);
                 QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
         }
+ out1:
+        /* this is for alloc_qunit() */
+        qunit_put(qunit);
         RETURN(err);
 }
 
@@ -703,30 +796,24 @@ static int dqacq_interpret(const struct lu_env *env,
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        int rc1 = 0;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        /* there are several forms of qunit(historic causes), so we need to
-         * adjust qunit from slaves to the same form here */
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-
-        down_read(&obt->obt_rwsem);
+        cfs_down_read(&obt->obt_rwsem);
         /* if a quota req timeouts or is dropped, we should update quota
          * statistics which will be handled in dqacq_completion. And in
          * this situation we should get qdata from request instead of
          * reply */
-        rc1 = quota_get_qdata(req, qdata,
-                              (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
-                              QUOTA_IMPORT);
-        if (rc1 < 0) {
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 DEBUG_REQ(D_ERROR, req,
-                          "error unpacking qunit_data(rc: %d)\n", rc1);
-                GOTO(exit, rc = rc1);
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                qdata = &qunit->lq_data;
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -734,14 +821,14 @@ static int dqacq_interpret(const struct lu_env *env,
 
         if (qdata->qd_id != qunit->lq_data.qd_id ||
             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) {
-                CDEBUG(D_ERROR, "the returned qd_id isn't expected!"
+                CERROR("the returned qd_id isn't expected!"
                        "(qdata: %u, lq_data: %u)\n", qdata->qd_id,
                        qunit->lq_data.qd_id);
                 qdata->qd_id = qunit->lq_data.qd_id;
                 rc = -EPROTO;
         }
         if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) {
-                CDEBUG(D_ERROR, "the returned grp/usr isn't expected!"
+                CERROR("the returned grp/usr isn't expected!"
                        "(qdata: %u, lq_data: %u)\n", qdata->qd_flags,
                        qunit->lq_data.qd_flags);
                 if (QDATA_IS_GRP(&qunit->lq_data))
@@ -751,19 +838,20 @@ static int dqacq_interpret(const struct lu_env *env,
                 rc = -EPROTO;
         }
         if (qdata->qd_count > qunit->lq_data.qd_count) {
-                CDEBUG(D_ERROR, "the returned qd_count isn't expected!"
+                CERROR("the returned qd_count isn't expected!"
                        "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count,
                        qunit->lq_data.qd_count);
                 rc = -EPROTO;
         }
 
+        if (unlikely(rc == -ESRCH))
+                CERROR("quota for %s has been enabled by master, but disabled "
+                       "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user");
+
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
-exit:
-        up_read(&obt->obt_rwsem);
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-
+        cfs_up_read(&obt->obt_rwsem);
         RETURN(rc);
 }
 
@@ -775,10 +863,10 @@ int check_qm(struct lustre_quota_ctxt *qctxt)
         int rc;
         ENTRY;
 
-        spin_lock(&qctxt->lqc_lock);
+        cfs_spin_lock(&qctxt->lqc_lock);
         /* quit waiting when mds is back or qctxt is cleaned up */
         rc = qctxt->lqc_import || !qctxt->lqc_valid;
-        spin_unlock(&qctxt->lqc_lock);
+        cfs_spin_unlock(&qctxt->lqc_lock);
 
         RETURN(rc);
 }
@@ -790,9 +878,10 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
         int i;
         ENTRY;
 
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++) {
-                list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+                cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i],
+                                             lq_hash) {
                         if (qunit->lq_ctxt != qctxt)
                                 continue;
 
@@ -801,20 +890,20 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
                          * if no others change it, then the waiters will return
                          * -EAGAIN to caller who can perform related quota
                          * acq/rel if necessary. */
-                        wake_up_all(&qunit->lq_waitq);
+                        cfs_waitq_broadcast(&qunit->lq_waitq);
                 }
         }
-        spin_unlock(&qunit_hash_lock);
+        cfs_spin_unlock(&qunit_hash_lock);
         EXIT;
 }
 
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
 {
         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
         int rc = 0;
         ENTRY;
 
-        spin_lock(&qunit->lq_lock);
+        cfs_spin_lock(&qunit->lq_lock);
         switch (qunit->lq_state) {
         case QUNIT_IN_HASH:
         case QUNIT_RM_FROM_HASH:
@@ -825,17 +914,29 @@ static int got_qunit(struct lustre_qunit *qunit)
         default:
                 CERROR("invalid qunit state %d\n", qunit->lq_state);
         }
-        spin_unlock(&qunit->lq_lock);
+        cfs_spin_unlock(&qunit->lq_lock);
 
         if (!rc) {
-                spin_lock(&qctxt->lqc_lock);
-                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
-                spin_unlock(&qctxt->lqc_lock);
+                cfs_spin_lock(&qctxt->lqc_lock);
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
+                cfs_spin_unlock(&qctxt->lqc_lock);
         }
 
         RETURN(rc);
 }
 
+static inline void
+revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc)
+{
+        /* revoke lqs_xxx_rec which is computed in check_cur_qunit
+         * b=18630 */
+        cfs_spin_lock(&lqs->lqs_lock);
+        quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0);
+        cfs_spin_unlock(&lqs->lqs_lock);
+}
+
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
@@ -854,35 +955,46 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         ENTRY;
 
         LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
-        do_gettimeofday(&work_start);
-        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
+        cfs_gettimeofday(&work_start);
+
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs == NULL || IS_ERR(lqs)) {
+                CERROR("Can't find the lustre qunit size!\n");
+                RETURN(-EPERM);
+        }
+
+        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) {
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 RETURN(-ENOMEM);
+        }
 
-        spin_lock(&qunit_hash_lock);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5);
+
+        cfs_spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                spin_unlock(&qunit_hash_lock);
+                cfs_spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 goto wait_completion;
         }
         qunit = empty;
         qunit_get(qunit);
         insert_qunit_nolock(qctxt, qunit);
-        spin_unlock(&qunit_hash_lock);
-
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (lqs) {
-                spin_lock(&lqs->lqs_lock);
-                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
-                /* when this qdata returned from mds, it will call lqs_putref */
-                lqs_getref(lqs);
-                spin_unlock(&lqs->lqs_lock);
-                /* this is for quota_search_lqs */
-                lqs_putref(lqs);
-        } else {
-                CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
-        }
+        cfs_spin_unlock(&qunit_hash_lock);
+
+        /* From here, the quota request will be sent anyway.
+         * When this qdata request returned or is cancelled,
+         * lqs_putref will be called at that time */
+        lqs_getref(lqs);
+        /* this is for quota_search_lqs */
+        lqs_putref(lqs);
 
         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
@@ -897,7 +1009,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 /* this is for qunit_get() */
                 qunit_put(qunit);
 
-                do_gettimeofday(&work_end);
+                cfs_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
                 if (opc == QUOTA_DQACQ)
                         lprocfs_counter_add(qctxt->lqc_stats,
@@ -910,46 +1022,48 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 RETURN(rc ? rc : rc2);
         }
 
-        spin_lock(&qctxt->lqc_lock);
+        cfs_spin_lock(&qctxt->lqc_lock);
         if (!qctxt->lqc_import) {
-                spin_unlock(&qctxt->lqc_lock);
+                cfs_spin_unlock(&qctxt->lqc_lock);
                 QDATA_DEBUG(qdata, "lqc_import is invalid.\n");
 
-                spin_lock(&qunit_hash_lock);
+                cfs_spin_lock(&qunit_hash_lock);
                 remove_qunit_nolock(qunit);
-                spin_unlock(&qunit_hash_lock);
+                cfs_spin_unlock(&qunit_hash_lock);
 
                 compute_lqs_after_removing_qunit(qunit);
 
                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
-                wake_up_all(&qunit->lq_waitq);
+                cfs_waitq_broadcast(&qunit->lq_waitq);
 
                 /* this is for qunit_get() */
                 qunit_put(qunit);
                 /* this for alloc_qunit() */
                 qunit_put(qunit);
-                spin_lock(&qctxt->lqc_lock);
+                cfs_spin_lock(&qctxt->lqc_lock);
                 if (wait && !qctxt->lqc_import) {
-                        spin_unlock(&qctxt->lqc_lock);
-
-                        LASSERT(oti && oti->oti_thread &&
-                                oti->oti_thread->t_watchdog);
-
-                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        cfs_spin_unlock(&qctxt->lqc_lock);
+                        LASSERT(oti && oti->oti_thread);
+                        /* The recovery thread doesn't have watchdog
+                         * attached. LU-369 */
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_disable(oti->oti_thread->\
+                                                t_watchdog);
                         CDEBUG(D_QUOTA, "sleep for quota master\n");
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
-                                 GET_TIMEOUT(oti->oti_thread->t_svc));
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                      CFS_GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
-                        spin_unlock(&qctxt->lqc_lock);
+                        cfs_spin_unlock(&qctxt->lqc_lock);
                 }
 
                 RETURN(-EAGAIN);
         }
         imp = class_import_get(qctxt->lqc_import);
-        spin_unlock(&qctxt->lqc_lock);
+        cfs_spin_unlock(&qctxt->lqc_lock);
 
         /* build dqacq/dqrel request */
         LASSERT(imp);
@@ -958,7 +1072,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                                         LUSTRE_MDS_VERSION, opc);
         class_import_put(imp);
         if (req == NULL) {
-                CDEBUG(D_ERROR, "Can't alloc request\n");
+                CERROR("Can't alloc request\n");
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
                 /* this is for qunit_get() */
                 qunit_put(qunit);
@@ -969,7 +1083,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         req->rq_no_resend = req->rq_no_delay = 1;
         rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
         if (rc < 0) {
-                CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+                CERROR("Can't pack qunit_data(rc: %d)\n", rc);
                 ptlrpc_req_finished(req);
                 dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
                 /* this is for qunit_get() */
@@ -983,7 +1097,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         aa->aa_qunit = qunit;
 
         req->rq_interpret_reply = dqacq_interpret;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
 
         QDATA_DEBUG(qdata, "%s scheduled.\n",
                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
@@ -992,22 +1106,24 @@ wait_completion:
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
                 /* rc = -EAGAIN, it means the quota master isn't ready yet
                  * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
                  * rc = -EBUSY, it means recovery is happening
                  * other rc < 0, it means real errors, functions who call
                  * schedule_dqacq should take care of this */
-                spin_lock(&qunit->lq_lock);
+                cfs_spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
-                spin_unlock(&qunit->lq_lock);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, rc);
+                cfs_spin_unlock(&qunit->lq_lock);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
 
         qunit_put(qunit);
-        do_gettimeofday(&work_end);
+        cfs_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
         if (opc == QUOTA_DQACQ)
                 lprocfs_counter_add(qctxt->lqc_stats,
@@ -1023,16 +1139,14 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   const unsigned int id[], __u32 isblk, int wait,
                    struct obd_trans_info *oti)
 {
         int rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0)
                 RETURN(0);
 
         for (i = 0; i < MAXQUOTAS; i++) {
@@ -1076,38 +1190,39 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
         int rc = 0;
         ENTRY;
 
-        do_gettimeofday(&work_start);
+        cfs_gettimeofday(&work_start);
         qdata.qd_id = id;
         qdata.qd_flags = type;
         if (isblk)
                 QDATA_SET_BLK(&qdata);
         qdata.qd_count = 0;
 
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, &qdata);
-        spin_unlock(&qunit_hash_lock);
+        cfs_spin_unlock(&qunit_hash_lock);
 
         if (qunit) {
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, qunit->lq_rc);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
-                spin_lock(&qunit->lq_lock);
+                cfs_spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
-                spin_unlock(&qunit->lq_lock);
+                cfs_spin_unlock(&qunit->lq_lock);
                 /* this is for dqacq_in_flight() */
                 qunit_put(qunit);
-                do_gettimeofday(&work_end);
+                cfs_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
                 lprocfs_counter_add(qctxt->lqc_stats,
                                     isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA :
                                             LQUOTA_WAIT_PENDING_INO_QUOTA,
                                     timediff);
         } else {
-                do_gettimeofday(&work_end);
+                cfs_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
                 lprocfs_counter_add(qctxt->lqc_stats,
                                     isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA :
@@ -1134,8 +1249,10 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
                 RETURN(rc);
 
         cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
-        spin_lock_init(&qctxt->lqc_lock);
-        spin_lock(&qctxt->lqc_lock);
+        cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+        cfs_atomic_set(&qctxt->lqc_lqs, 0);
+        cfs_spin_lock_init(&qctxt->lqc_lock);
+        cfs_spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_handler = handler;
         qctxt->lqc_sb = sb;
         qctxt->lqc_obt = obt;
@@ -1156,10 +1273,16 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes
                                           * after the last shrinking */
         qctxt->lqc_sync_blk = 0;
-        spin_unlock(&qctxt->lqc_lock);
-
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
-                                               &lqs_hash_ops, 0);
+        cfs_spin_unlock(&qctxt->lqc_lock);
+
+        qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH",
+                                              hash_lqs_cur_bits,
+                                              HASH_LQS_MAX_BITS,
+                                              min(hash_lqs_cur_bits,
+                                                  HASH_LQS_BKT_BITS),
+                                              0, CFS_HASH_MIN_THETA,
+                                              CFS_HASH_MAX_THETA,
+                                              &lqs_hash_ops, CFS_HASH_DEFAULT);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
                 RETURN(-ENOMEM);
@@ -1174,54 +1297,78 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         RETURN(rc);
 }
 
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+        int rc;
+        ENTRY;
+
+        rc = !cfs_atomic_read(&qctxt->lqc_lqs);
+
+        RETURN(rc);
+}
+
+int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                 cfs_hlist_node_t *hnode, void *data)
+{
+        /* remove from hash and -1 refcount */
+        cfs_hash_bd_del_locked(hs, bd, hnode);
+        return 0;
+}
+
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
-        struct list_head tmp_list;
+        cfs_list_t tmp_list;
+        struct l_wait_info lwi = { 0 };
         struct obd_device_target *obt = qctxt->lqc_obt;
         int i;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&tmp_list);
 
-        spin_lock(&qctxt->lqc_lock);
+        cfs_spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_valid = 0;
-        spin_unlock(&qctxt->lqc_lock);
+        cfs_spin_unlock(&qctxt->lqc_lock);
 
-        spin_lock(&qunit_hash_lock);
+        cfs_spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++) {
-                list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+                cfs_list_for_each_entry_safe(qunit, tmp, &qunit_hash[i],
+                                             lq_hash) {
                         if (qunit->lq_ctxt != qctxt)
                                 continue;
                         remove_qunit_nolock(qunit);
-                        list_add(&qunit->lq_hash, &tmp_list);
+                        cfs_list_add(&qunit->lq_hash, &tmp_list);
                 }
         }
-        spin_unlock(&qunit_hash_lock);
+        cfs_spin_unlock(&qunit_hash_lock);
 
-        list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
-                list_del_init(&qunit->lq_hash);
+        cfs_list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
+                cfs_list_del_init(&qunit->lq_hash);
                 compute_lqs_after_removing_qunit(qunit);
 
                 /* wake up all waiters */
                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0);
-                wake_up_all(&qunit->lq_waitq);
+                cfs_waitq_broadcast(&qunit->lq_waitq);
                 qunit_put(qunit);
         }
 
-        down_write(&obt->obt_rwsem);
-        lustre_hash_exit(qctxt->lqc_lqs_hash);
-        qctxt->lqc_lqs_hash = NULL;
-        up_write(&obt->obt_rwsem);
-
         /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
          * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
         while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
                 cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
-                cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
-                                     cfs_time_seconds(1));
+                cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
+                                                   cfs_time_seconds(1));
         }
 
+        /* release refcount on lustre_qunit_size holding by lqs_hash */
+        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL);
+
+        l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+        cfs_down_write(&obt->obt_rwsem);
+        cfs_hash_putref(qctxt->lqc_lqs_hash);
+        qctxt->lqc_lqs_hash = NULL;
+        cfs_up_write(&obt->obt_rwsem);
+
         ptlrpcd_decref();
 
 #ifdef LPROCFS
@@ -1235,7 +1382,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 struct qslave_recov_thread_data {
         struct obd_device *obd;
         struct lustre_quota_ctxt *qctxt;
-        struct completion comp;
+        cfs_completion_t comp;
 };
 
 /* FIXME only recovery block quota by now */
@@ -1248,23 +1395,32 @@ static int qslave_recovery_main(void *arg)
         int rc = 0;
         ENTRY;
 
-        ptlrpc_daemonize("qslave_recovd");
+        cfs_daemonize_ctxt("qslave_recovd");
 
-        complete(&data->comp);
+        /* for obdfilter */
+        class_incref(obd, "qslave_recovd_filter", obd);
 
-        if (qctxt->lqc_recovery)
+        cfs_complete(&data->comp);
+
+        cfs_spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_recovery) {
+                cfs_spin_unlock(&qctxt->lqc_lock);
+                class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
-        qctxt->lqc_recovery = 1;
+        } else {
+                qctxt->lqc_recovery = 1;
+                cfs_spin_unlock(&qctxt->lqc_lock);
+        }
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
                 struct quota_info *dqopt = sb_dqopt(qctxt->lqc_sb);
-                struct list_head id_list;
+                cfs_list_t id_list;
                 struct dquot_id *dqid, *tmp;
                 int ret;
 
                 LOCK_DQONOFF_MUTEX(dqopt);
-                if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) {
+                if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) {
                         UNLOCK_DQONOFF_MUTEX(dqopt);
                         break;
                 }
@@ -1280,8 +1436,8 @@ static int qslave_recovery_main(void *arg)
                 if (rc)
                         CERROR("Get ids from quota file failed. (rc:%d)\n", rc);
 
-                list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
-                        list_del_init(&dqid->di_link);
+                cfs_list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
+                        cfs_list_del_init(&dqid->di_link);
                         /* skip slave recovery on itself */
                         if (is_master(qctxt))
                                 goto free;
@@ -1305,16 +1461,18 @@ static int qslave_recovery_main(void *arg)
                                 rc = 0;
                         }
 
-                        if (rc)
-                                CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
-                                       "qslave recovery failed! (id:%d type:%d "
+                        if (rc && rc != -EBUSY)
+                                CERROR("qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
 
+        cfs_spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        cfs_spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
@@ -1325,23 +1483,92 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
         int rc;
         ENTRY;
 
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (!ll_sb_any_quota_active(qctxt->lqc_sb))
                 goto exit;
 
         data.obd = obd;
         data.qctxt = qctxt;
-        init_completion(&data.comp);
+        cfs_init_completion(&data.comp);
 
-        rc = kernel_thread(qslave_recovery_main, &data, CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(qslave_recovery_main, &data,
+                               CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
                 goto exit;
         }
-        wait_for_completion(&data.comp);
+        cfs_wait_for_completion(&data.comp);
 exit:
         EXIT;
 }
 
+inline int quota_is_on(struct lustre_quota_ctxt *qctxt,
+                       struct obd_quotactl *oqctl)
+{
+        return ((qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) ==
+                UGQUOTA2LQC(oqctl->qc_type));
+}
+
+inline int quota_is_off(struct lustre_quota_ctxt *qctxt,
+                        struct obd_quotactl *oqctl)
+{
+        return !(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type));
+}
+
+/**
+ * When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574
+ *
+ * Should be called with obt->obt_quotachecking held. b=20152 
+ */
+void build_lqs(struct obd_device *obd)
+{
+        struct obd_device_target *obt = &obd->u.obt;
+        struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
+        cfs_list_t id_list;
+        int i, rc;
+
+        LASSERT_SEM_LOCKED(&obt->obt_quotachecking);
+        CFS_INIT_LIST_HEAD(&id_list);
+        for (i = 0; i < MAXQUOTAS; i++) {
+                struct dquot_id *dqid, *tmp;
+
+                if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
+                        continue;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+                rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+                                 i, &id_list);
+#else
+                rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+                                 i, &id_list);
+#endif
+                if (rc) {
+                        CERROR("%s: failed to get %s qids!\n", obd->obd_name,
+                               i ? "group" : "user");
+                        continue;
+                }
+
+                cfs_list_for_each_entry_safe(dqid, tmp, &id_list,
+                                             di_link) {
+                        struct lustre_qunit_size *lqs;
+
+                        cfs_list_del_init(&dqid->di_link);
+                        lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+                                               qctxt, 1);
+                        if (lqs && !IS_ERR(lqs)) {
+                                lqs->lqs_flags |= dqid->di_flag;
+                                lqs_putref(lqs);
+                        } else {
+                                CERROR("%s: failed to create a lqs for %sid %u"
+                                       "\n", obd->obd_name, i ? "g" : "u",
+                                       dqid->di_id);
+                        }
+
+                        OBD_FREE_PTR(dqid);
+                }
+        }
+}
 
 /**
  * lqs<->qctxt hash operations
@@ -1351,92 +1578,75 @@ exit:
  * string hashing using djb2 hash algorithm
  */
 static unsigned
-lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
+lqs_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
-        struct quota_adjust_qunit *lqs_key;
+        unsigned long long id;
         unsigned hash;
         ENTRY;
 
         LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
-        hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id;
+        id = *((unsigned long long *)key);
+        hash = (LQS_KEY_GRP(id) ? 5381 : 5387) * (unsigned)LQS_KEY_ID(id);
 
         RETURN(hash & mask);
 }
 
-static int
-lqs_compare(void *key, struct hlist_node *hnode)
+static void *
+lqs_key(cfs_hlist_node_t *hnode)
 {
-        struct quota_adjust_qunit *lqs_key;
-        struct lustre_qunit_size *q;
-        int rc;
+        struct lustre_qunit_size *lqs;
         ENTRY;
 
-        LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
-        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+        lqs = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+        RETURN(&lqs->lqs_key);
+}
 
-        spin_lock(&q->lqs_lock);
-        rc = ((lqs_key->qaq_id == q->lqs_id) &&
-              (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q)));
-        spin_unlock(&q->lqs_lock);
+static int
+lqs_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+        struct lustre_qunit_size *q =
+                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
-        RETURN(rc);
+        RETURN(q->lqs_key == *((unsigned long long *)key));
 }
 
 static void *
-lqs_get(struct hlist_node *hnode)
+lqs_object(cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q = 
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
+        return cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+}
 
-        atomic_inc(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+static void
+lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+        struct lustre_qunit_size *q =
+                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
-        RETURN(q);
+        lqs_getref(q);
 }
 
-static void *
-lqs_put(struct hlist_node *hnode)
+static void
+lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q = 
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
-
-        LASSERT(atomic_read(&q->lqs_refcount) > 0);
-        atomic_dec(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+        struct lustre_qunit_size *q =
+                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
-        RETURN(q);
+        lqs_putref(q);
 }
 
 static void
-lqs_exit(struct hlist_node *hnode)
+lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q;
-        ENTRY;
-
-        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        /* 
-         * Nothing should be left. User of lqs put it and
-         * lqs also was deleted from table by this time
-         * so we should have 0 refs.
-         */
-        LASSERTF(atomic_read(&q->lqs_refcount) == 0, 
-                 "Busy lqs %p with %d refs\n", q,
-                 atomic_read(&q->lqs_refcount));
-        OBD_FREE_PTR(q);
-        EXIT;
+        CERROR("It should not have any item left to be handled by this!");
 }
 
-static lustre_hash_ops_t lqs_hash_ops = {
-        .lh_hash    = lqs_hash,
-        .lh_compare = lqs_compare,
-        .lh_get     = lqs_get,
-        .lh_put     = lqs_put,
-        .lh_exit    = lqs_exit
+static cfs_hash_ops_t lqs_hash_ops = {
+        .hs_hash        = lqs_hash,
+        .hs_key         = lqs_key,
+        .hs_keycmp      = lqs_keycmp,
+        .hs_object      = lqs_object,
+        .hs_get         = lqs_get,
+        .hs_put_locked  = lqs_put_locked,
+        .hs_exit        = lqs_exit
 };
 #endif /* HAVE_QUOTA_SUPPORT */