Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 92313d2..c96cbc2 100644 (file)
@@ -44,7 +44,7 @@
 # define EXPORT_SYMTAB
 #endif
 
-#define DEBUG_SUBSYSTEM S_MDS
+#define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <linux/version.h>
 #include <linux/fs.h>
 #include <obd_class.h>
 #include <lustre_quota.h>
 #include <lustre_fsfilt.h>
+#include <class_hash.h>
+#include <lprocfs_status.h>
 #include "quota_internal.h"
 
-unsigned long default_bunit_sz = 100 * 1024 * 1024;       /* 100M bytes */
-unsigned long default_btune_ratio = 50;                   /* 50 percentage */
-unsigned long default_iunit_sz = 5000;       /* 5000 inodes */
-unsigned long default_itune_ratio = 50;      /* 50 percentage */
+#ifdef HAVE_QUOTA_SUPPORT
+
+static lustre_hash_ops_t lqs_hash_ops;
+
+unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */
+unsigned long default_btune_ratio = 50;             /* 50 percentage */
+unsigned long default_iunit_sz = 5120;              /* 5120 inodes */
+unsigned long default_itune_ratio = 50;             /* 50 percentage */
 
 cfs_mem_cache_t *qunit_cachep = NULL;
 struct list_head qunit_hash[NR_DQHASH];
 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
 
+/* please sync qunit_state with qunit_state_names */
+enum qunit_state {
+        /**
+         * a qunit is created
+         */
+        QUNIT_CREATED      = 0,
+        /**
+         * a qunit is added into qunit hash, that means
+         * a quota req will be sent or is flying
+         */
+        QUNIT_IN_HASH      = 1,
+        /**
+         * a qunit is removed from qunit hash, that
+         * means a quota req is handled and comes back
+         */
+        QUNIT_RM_FROM_HASH = 2,
+        /**
+         * qunit can wake up all threads waiting for it
+         */
+        QUNIT_FINISHED     = 3,
+};
+
+static const char *qunit_state_names[] = {
+        [QUNIT_CREATED]      = "CREATED",
+        [QUNIT_IN_HASH]      = "IN_HASH",
+        [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH",
+        [QUNIT_FINISHED]     = "FINISHED",
+};
+
 struct lustre_qunit {
-        struct list_head lq_hash;               /* Hash list in memory */
-        atomic_t lq_refcnt;                     /* Use count */
-        struct lustre_quota_ctxt *lq_ctxt;      /* Quota context this applies to */
-        struct qunit_data lq_data;              /* See qunit_data */
-        unsigned int lq_opc;                    /* QUOTA_DQACQ, QUOTA_DQREL */
-        struct list_head lq_waiters;            /* All write threads waiting for this qunit */
+        struct list_head lq_hash;          /** Hash list in memory */
+        atomic_t lq_refcnt;                /** Use count */
+        struct lustre_quota_ctxt *lq_ctxt; /** Quota context this applies to */
+        struct qunit_data lq_data;         /** See qunit_data */
+        unsigned int lq_opc;               /** QUOTA_DQACQ, QUOTA_DQREL */
+        cfs_waitq_t lq_waitq;              /** Threads waiting for this qunit */
+        spinlock_t lq_lock;                /** Protect the whole structure */
+        enum qunit_state lq_state;         /** Present the status of qunit */
+        int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
+#define QUNIT_SET_STATE(qunit, state)                                   \
+do {                                                                    \
+        spin_lock(&qunit->lq_lock);                                     \
+        QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
+                    qunit, qunit_state_names[qunit->lq_state],          \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
+        qunit->lq_state = state;                                        \
+        spin_unlock(&qunit->lq_lock);                                   \
+} while(0)
+
+#define QUNIT_SET_STATE_AND_RC(qunit, state, rc)                        \
+do {                                                                    \
+        spin_lock(&qunit->lq_lock);                                     \
+        qunit->lq_rc = rc;                                              \
+        QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
+                    qunit, qunit_state_names[qunit->lq_state],          \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
+        qunit->lq_state = state;                                        \
+        spin_unlock(&qunit->lq_lock);                                   \
+} while(0)
+
+
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
 
         LASSERT(imp);
-        if ((imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64) &&
-            !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT))
+        if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64)
                 RETURN(0);
         else
                 RETURN(1);
@@ -135,66 +199,13 @@ static inline int
 qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
 {
         unsigned int id = qdata->qd_id;
-        unsigned int type = qdata->qd_flags & QUOTA_IS_GRP;
+        unsigned int type = QDATA_IS_GRP(qdata);
 
         unsigned long tmp = ((unsigned long)qctxt >> L1_CACHE_SHIFT) ^ id;
         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
         return tmp;
 }
 
-/* compute the remaining quota for certain gid or uid b=11693 */
-int compute_remquota(struct obd_device *obd,
-                     struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
-{
-        struct super_block *sb = qctxt->lqc_sb;
-        __u64 usage, limit;
-        struct obd_quotactl *qctl;
-        int ret = QUOTA_RET_OK;
-        __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP;
-        ENTRY;
-
-        if (!sb_any_quota_enabled(sb))
-                RETURN(QUOTA_RET_NOQUOTA);
-
-        /* ignore root user */
-        if (qdata->qd_id == 0 && qdata_type == USRQUOTA)
-                RETURN(QUOTA_RET_NOLIMIT);
-
-        OBD_ALLOC_PTR(qctl);
-        if (qctl == NULL)
-                RETURN(-ENOMEM);
-
-        /* get fs quota usage & limit */
-        qctl->qc_cmd = Q_GETQUOTA;
-        qctl->qc_id = qdata->qd_id;
-        qctl->qc_type = qdata_type;
-        ret = fsfilt_quotactl(obd, sb, qctl);
-        if (ret) {
-                if (ret == -ESRCH)      /* no limit */
-                        ret = QUOTA_RET_NOLIMIT;
-                else
-                        CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)",
-                               ret);
-                GOTO(out, ret);
-        }
-
-        usage = qctl->qc_dqblk.dqb_curspace;
-        limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS;
-        if (!limit){            /* no limit */
-                ret = QUOTA_RET_NOLIMIT;
-                GOTO(out, ret);
-        }
-
-        if (limit >= usage)
-                qdata->qd_count = limit - usage;
-        else
-                qdata->qd_count = 0;
-        EXIT;
-out:
-        OBD_FREE_PTR(qctl);
-        return ret;
-}
-
 /* caller must hold qunit_hash_lock */
 static inline struct lustre_qunit *find_qunit(unsigned int hashent,
                                               struct lustre_quota_ctxt *qctxt,
@@ -207,7 +218,9 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent,
         list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) {
                 tmp = &qunit->lq_data;
                 if (qunit->lq_ctxt == qctxt &&
-                    qdata->qd_id == tmp->qd_id && qdata->qd_flags == tmp->qd_flags)
+                    qdata->qd_id == tmp->qd_id &&
+                    (qdata->qd_flags & LQUOTA_QUNIT_FLAGS) ==
+                    (tmp->qd_flags & LQUOTA_QUNIT_FLAGS))
                         return qunit;
         }
         return NULL;
@@ -218,9 +231,9 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent,
  * @qdata: the type of quota unit to be checked
  *
  * return: 1 - need acquire qunit;
- *        2 - need release qunit;
- *        0 - need do nothing.
- *      < 0 - error.
+ *         2 - need release qunit;
+ *         0 - need do nothing.
+ *       < 0 - error.
  */
 static int
 check_cur_qunit(struct obd_device *obd,
@@ -228,16 +241,23 @@ check_cur_qunit(struct obd_device *obd,
 {
         struct super_block *sb = qctxt->lqc_sb;
         unsigned long qunit_sz, tune_sz;
-        __u64 usage, limit;
+        __u64 usage, limit, limit_org, pending_write = 0;
+        long long record = 0;
         struct obd_quotactl *qctl;
+        struct lustre_qunit_size *lqs = NULL;
         int ret = 0;
-        __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP;
-        __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1;
         ENTRY;
 
         if (!sb_any_quota_enabled(sb))
                 RETURN(0);
 
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid){
+                spin_unlock(&qctxt->lqc_lock);
+                RETURN(0);
+        }
+        spin_unlock(&qctxt->lqc_lock);
+
         OBD_ALLOC_PTR(qctl);
         if (qctl == NULL)
                 RETURN(-ENOMEM);
@@ -245,7 +265,7 @@ check_cur_qunit(struct obd_device *obd,
         /* get fs quota usage & limit */
         qctl->qc_cmd = Q_GETQUOTA;
         qctl->qc_id = qdata->qd_id;
-        qctl->qc_type = qdata_type;
+        qctl->qc_type = QDATA_IS_GRP(qdata);
         ret = fsfilt_quotactl(obd, sb, qctl);
         if (ret) {
                 if (ret == -ESRCH)      /* no limit */
@@ -255,57 +275,141 @@ check_cur_qunit(struct obd_device *obd,
                 GOTO(out, ret);
         }
 
-        if (is_blk) {
+        if (QDATA_IS_BLK(qdata)) {
                 usage = qctl->qc_dqblk.dqb_curspace;
                 limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS;
-                qunit_sz = qctxt->lqc_bunit_sz;
-                tune_sz = qctxt->lqc_btune_sz;
-
-                LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
         } else {
                 usage = qctl->qc_dqblk.dqb_curinodes;
                 limit = qctl->qc_dqblk.dqb_ihardlimit;
-                qunit_sz = qctxt->lqc_iunit_sz;
-                tune_sz = qctxt->lqc_itune_sz;
         }
 
-        /* ignore the no quota limit case */
+        /* ignore the no quota limit case; and it can avoid creating
+         * unnecessary lqs for uid/gid */
         if (!limit)
                 GOTO(out, ret = 0);
 
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 1);
+        if (IS_ERR(lqs))
+                GOTO (out, ret = PTR_ERR(lqs));
+        spin_lock(&lqs->lqs_lock);
+
+        if (QDATA_IS_BLK(qdata)) {
+                qunit_sz = lqs->lqs_bunit_sz;
+                tune_sz  = lqs->lqs_btune_sz;
+                pending_write = lqs->lqs_bwrite_pending;
+                record   = lqs->lqs_blk_rec;
+                LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
+        } else {
+                /* we didn't need change inode qunit size now */
+                qunit_sz = lqs->lqs_iunit_sz;
+                tune_sz  = lqs->lqs_itune_sz;
+                pending_write = lqs->lqs_iwrite_pending;
+                record   = lqs->lqs_ino_rec;
+        }
+
         /* we don't count the MIN_QLIMIT */
-        if ((limit == MIN_QLIMIT && !is_blk) ||
-            (toqb(limit) == MIN_QLIMIT && is_blk))
+        if ((limit == MIN_QLIMIT && !QDATA_IS_BLK(qdata)) ||
+            (toqb(limit) == MIN_QLIMIT && QDATA_IS_BLK(qdata)))
                 limit = 0;
 
+        usage += pending_write;
+        limit_org = limit;
+        /* when a releasing quota req is sent, before it returned
+           limit is assigned a small value. limit will overflow */
+        if (limit + record < 0)
+                usage -= record;
+        else
+                limit += record;
+
         LASSERT(qdata->qd_count == 0);
         if (limit <= usage + tune_sz) {
-                while (qdata->qd_count + limit <= usage + tune_sz)
+                while (qdata->qd_count + limit <=
+                       usage + tune_sz)
                         qdata->qd_count += qunit_sz;
                 ret = 1;
-        } else if (limit > usage + qunit_sz + tune_sz) {
-                while (limit - qdata->qd_count > usage + qunit_sz + tune_sz)
+        } else if (limit > usage + qunit_sz + tune_sz &&
+                   limit_org > qdata->qd_count + qunit_sz) {
+                while (limit - qdata->qd_count > usage + qunit_sz + tune_sz &&
+                       limit_org > qdata->qd_count + qunit_sz)
                         qdata->qd_count += qunit_sz;
                 ret = 2;
+                /* if there are other pending writes for this uid/gid, releasing
+                 * quota is put off until the last pending write b=16645 */
+                if (ret == 2 && pending_write) {
+                        CDEBUG(D_QUOTA, "delay quota release\n");
+                        ret = 0;
+                }
         }
+        CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
+               ", pending_write: "LPU64", record: "LPD64
+               ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
+               QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write,
+               record, qunit_sz, tune_sz, ret);
         LASSERT(ret == 0 || qdata->qd_count);
+
+        spin_unlock(&lqs->lqs_lock);
+        lqs_putref(lqs);
         EXIT;
-out:
+ out:
         OBD_FREE_PTR(qctl);
         return ret;
 }
 
-/* caller must hold qunit_hash_lock */
-static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
-                                            struct qunit_data *qdata)
+/**
+ * Compute the remaining quota for certain gid or uid b=11693
+ */
+int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
+                     struct qunit_data *qdata, int isblk)
 {
-        unsigned int hashent = qunit_hashfn(qctxt, qdata);
-        struct lustre_qunit *qunit;
+        struct super_block *sb = qctxt->lqc_sb;
+        __u64 usage, limit;
+        struct obd_quotactl *qctl;
+        int ret = QUOTA_RET_OK;
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
-        qunit = find_qunit(hashent, qctxt, qdata);
-        RETURN(qunit);
+        if (!sb_any_quota_enabled(sb))
+                RETURN(QUOTA_RET_NOQUOTA);
+
+        /* ignore root user */
+        if (qdata->qd_id == 0 && QDATA_IS_GRP(qdata) == USRQUOTA)
+                RETURN(QUOTA_RET_NOLIMIT);
+
+        OBD_ALLOC_PTR(qctl);
+        if (qctl == NULL)
+                RETURN(-ENOMEM);
+
+        /* get fs quota usage & limit */
+        qctl->qc_cmd = Q_GETQUOTA;
+        qctl->qc_id = qdata->qd_id;
+        qctl->qc_type = QDATA_IS_GRP(qdata);
+        ret = fsfilt_quotactl(obd, sb, qctl);
+        if (ret) {
+                if (ret == -ESRCH)      /* no limit */
+                        ret = QUOTA_RET_NOLIMIT;
+                else
+                        CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)",
+                               ret);
+                GOTO(out, ret);
+        }
+
+        usage = isblk ? qctl->qc_dqblk.dqb_curspace :
+                qctl->qc_dqblk.dqb_curinodes;
+        limit = isblk ? qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS :
+                qctl->qc_dqblk.dqb_ihardlimit;
+        if (!limit){            /* no limit */
+                ret = QUOTA_RET_NOLIMIT;
+                GOTO(out, ret);
+        }
+
+        if (limit >= usage)
+                qdata->qd_count = limit - usage;
+        else
+                qdata->qd_count = 0;
+        EXIT;
+out:
+        OBD_FREE_PTR(qctl);
+        return ret;
 }
 
 static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
@@ -314,17 +418,19 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         struct lustre_qunit *qunit = NULL;
         ENTRY;
 
-        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
         if (qunit == NULL)
                 RETURN(NULL);
 
         CFS_INIT_LIST_HEAD(&qunit->lq_hash);
-        CFS_INIT_LIST_HEAD(&qunit->lq_waiters);
+        init_waitqueue_head(&qunit->lq_waitq);
         atomic_set(&qunit->lq_refcnt, 1);
         qunit->lq_ctxt = qctxt;
         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
         qunit->lq_opc = opc;
-
+        qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -345,102 +451,114 @@ static void qunit_put(struct lustre_qunit *qunit)
                 free_qunit(qunit);
 }
 
+/* caller must hold qunit_hash_lock and release ref of qunit after using it */
+static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
+                                            struct qunit_data *qdata)
+{
+        unsigned int hashent = qunit_hashfn(qctxt, qdata);
+        struct lustre_qunit *qunit;
+        ENTRY;
+
+        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
+        qunit = find_qunit(hashent, qctxt, qdata);
+        if (qunit)
+                qunit_get(qunit);
+        RETURN(qunit);
+}
+
 static void
 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 {
         struct list_head *head;
 
         LASSERT(list_empty(&qunit->lq_hash));
+        qunit_get(qunit);
         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
         list_add(&qunit->lq_hash, head);
+        QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
+}
+
+static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
+{
+        struct lustre_qunit_size *lqs;
+
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
+                                       qunit->lq_data.qd_id),
+                               qunit->lq_ctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
+                spin_lock(&lqs->lqs_lock);
+                if (qunit->lq_opc == QUOTA_DQACQ)
+                        quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
+                if (qunit->lq_opc == QUOTA_DQREL)
+                        quota_compute_lqs(&qunit->lq_data, lqs, 0, 0);
+                spin_unlock(&lqs->lqs_lock);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
+                /* this is for schedule_dqacq */
+                lqs_putref(lqs);
+        }
 }
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
 {
         LASSERT(!list_empty(&qunit->lq_hash));
+        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
+
         list_del_init(&qunit->lq_hash);
+        QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
+        qunit_put(qunit);
 }
 
-struct qunit_waiter {
-        struct list_head qw_entry;
-        cfs_waitq_t      qw_waitq;
-        int qw_rc;
-};
-
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
 
-/* FIXME check if this mds is the master of specified id */
-static int
-is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-          unsigned int id, int type)
+static inline int is_master(struct lustre_quota_ctxt *qctxt)
 {
         return qctxt->lqc_handler ? 1 : 0;
 }
 
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-               struct qunit_data *qdata, int opc, int wait);
+               struct qunit_data *qdata, int opc, int wait,
+               struct obd_trans_info *oti);
 
-static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                                       struct qunit_data *qdata, int opc, int wait)
+static inline void qdata_to_oqaq(struct qunit_data *qdata,
+                                 struct quota_adjust_qunit *oqaq)
 {
-        int rc = 0;
-        unsigned long factor;
-        struct qunit_data tmp_qdata;
-        ENTRY;
-
-        LASSERT(qdata && qdata->qd_count);
-        QDATA_DEBUG(qdata, "%s quota split.\n",
-                    (qdata->qd_flags & QUOTA_IS_BLOCK) ? "block" : "inode");
-        if (qdata->qd_flags & QUOTA_IS_BLOCK)
-                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz *
-                        qctxt->lqc_bunit_sz;
-        else
-                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz *
-                        qctxt->lqc_iunit_sz;
-
-        if (qctxt->lqc_import && should_translate_quota(qctxt->lqc_import) &&
-            qdata->qd_count > factor) {
-                        tmp_qdata = *qdata;
-                tmp_qdata.qd_count = factor;
-                        qdata->qd_count -= tmp_qdata.qd_count;
-                QDATA_DEBUG((&tmp_qdata), "be split.\n");
-                rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
-        } else{
-                QDATA_DEBUG(qdata, "don't be split.\n");
-                rc = schedule_dqacq(obd, qctxt, qdata, opc, wait);
-        }
-
-        RETURN(rc);
+        LASSERT(qdata);
+        LASSERT(oqaq);
+
+        oqaq->qaq_flags = qdata->qd_flags;
+        oqaq->qaq_id    = qdata->qd_id;
+        if (QDATA_IS_ADJBLK(qdata))
+                oqaq->qaq_bunit_sz = qdata->qd_qunit;
+        if (QDATA_IS_ADJINO(qdata))
+                oqaq->qaq_iunit_sz = qdata->qd_qunit;
 }
 
 static int
-dqacq_completion(struct obd_device *obd,
-                 struct lustre_quota_ctxt *qctxt,
+dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                  struct qunit_data *qdata, int rc, int opc)
 {
         struct lustre_qunit *qunit = NULL;
         struct super_block *sb = qctxt->lqc_sb;
-        unsigned long qunit_sz;
-        struct qunit_waiter *qw, *tmp;
         int err = 0;
-        __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP;
-        __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1;
-        __u64 qd_tmp = qdata->qd_count;
-        unsigned long div_r;
+        struct quota_adjust_qunit *oqaq = NULL;
+        int rc1 = 0;
         ENTRY;
 
         LASSERT(qdata);
-        qunit_sz = is_blk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz;
-        div_r = do_div(qd_tmp, qunit_sz);
-        LASSERTF(!div_r, "qunit_sz: %lu, return qunit_sz: "LPU64"\n",
-                 qunit_sz, qd_tmp);
+        QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
+                    obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
+
+        /* do it only when a releasing quota req more than 5MB b=18491 */
+        if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
 
         /* update local operational quota file */
         if (rc == 0) {
-                __u32 count = QUSG(qdata->qd_count, is_blk);
+                __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
                 struct obd_quotactl *qctl;
                 __u64 *hardlimit;
 
@@ -453,14 +571,14 @@ dqacq_completion(struct obd_device *obd,
                  * set fs quota limit */
                 qctl->qc_cmd = Q_GETQUOTA;
                 qctl->qc_id = qdata->qd_id;
-                qctl->qc_type = qdata_type;
+                qctl->qc_type = QDATA_IS_GRP(qdata);
                 err = fsfilt_quotactl(obd, sb, qctl);
                 if (err) {
                         CERROR("error get quota fs limit! (rc:%d)\n", err);
                         GOTO(out_mem, err);
                 }
 
-                if (is_blk) {
+                if (QDATA_IS_BLK(qdata)) {
                         qctl->qc_dqblk.dqb_valid = QIF_BLIMITS;
                         hardlimit = &qctl->qc_dqblk.dqb_bhardlimit;
                 } else {
@@ -468,20 +586,24 @@ dqacq_completion(struct obd_device *obd,
                         hardlimit = &qctl->qc_dqblk.dqb_ihardlimit;
                 }
 
+                CDEBUG(D_QUOTA, "hardlimt: "LPU64"\n", *hardlimit);
+
+                if (*hardlimit == 0)
+                        goto out_mem;
+
                 switch (opc) {
                 case QUOTA_DQACQ:
-                        CDEBUG(D_QUOTA, "%s(acq):count: %d, hardlimt: "LPU64
-                               ",type: %s.\n", obd->obd_name, count, *hardlimit,
-                               qdata_type ? "grp": "usr");
                         INC_QLIMIT(*hardlimit, count);
                         break;
                 case QUOTA_DQREL:
-                        CDEBUG(D_QUOTA, "%s(rel):count: %d, hardlimt: "LPU64
-                               ",type: %s.\n", obd->obd_name, count, *hardlimit,
-                               qdata_type ? "grp": "usr");
                         LASSERTF(count < *hardlimit,
-                                 "count: %d, hardlimit: "LPU64".\n",
-                                 count, *hardlimit);
+                                 "id(%u) flag(%u) type(%c) isblk(%c) "
+                                 "count("LPU64") qd_qunit("LPU64") "
+                                 "hardlimit("LPU64").\n",
+                                 qdata->qd_id, qdata->qd_flags,
+                                 QDATA_IS_GRP(qdata) ? 'g' : 'u',
+                                 QDATA_IS_BLK(qdata) ? 'b': 'i',
+                                 qdata->qd_count, qdata->qd_qunit, *hardlimit);
                         *hardlimit -= count;
                         break;
                 default:
@@ -516,40 +638,65 @@ out:
         /* this qunit has been removed by qctxt_cleanup() */
         if (!qunit) {
                 spin_unlock(&qunit_hash_lock);
+                QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n",
+                            opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
                 RETURN(err);
         }
 
         LASSERT(opc == qunit->lq_opc);
+        /* remove this qunit from lq_hash so that new processes cannot be added
+         * to qunit->lq_waiters */
         remove_qunit_nolock(qunit);
+        spin_unlock(&qunit_hash_lock);
 
-        /* wake up all waiters */
-        list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) {
-                list_del_init(&qw->qw_entry);
-                qw->qw_rc = rc;
-                wake_up(&qw->qw_waitq);
-        }
+        compute_lqs_after_removing_qunit(qunit);
 
-        spin_unlock(&qunit_hash_lock);
+        if (rc == 0)
+                rc = QUOTA_REQ_RETURNED;
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+        /* wake up all waiters */
+        wake_up_all(&qunit->lq_waitq);
 
+        /* this is for dqacq_in_flight() */
         qunit_put(qunit);
+        /* this is for alloc_qunit() */
+        qunit_put(qunit);
+        if (rc < 0 && rc != -EDQUOT)
+                 RETURN(err);
 
         /* don't reschedule in such cases:
-         *   - acq/rel failure, but not for quota recovery.
+         *   - acq/rel failure and qunit isn't changed,
+         *     but not for quota recovery.
          *   - local dqacq/dqrel.
          *   - local disk io failure.
          */
-        if (err || (rc && rc != -EBUSY) ||
-            is_master(obd, qctxt, qdata->qd_id, qdata_type))
-                RETURN(err);
+         OBD_ALLOC_PTR(oqaq);
+         if (!oqaq)
+                 RETURN(-ENOMEM);
+         qdata_to_oqaq(qdata, oqaq);
+         /* adjust the qunit size in slaves */
+         rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
+         OBD_FREE_PTR(oqaq);
+         if (rc1 < 0) {
+                 CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
+                 RETURN(rc1);
+         }
+         if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
+                 RETURN(err);
+
+         if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+                 RETURN(err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
-        rc = check_cur_qunit(obd, qctxt, qdata);
-        if (rc > 0) {
+        qdata->qd_flags &= LQUOTA_QUNIT_FLAGS;
+        rc1 = check_cur_qunit(obd, qctxt, qdata);
+        if (rc1 > 0) {
                 int opc;
-                opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
-                rc = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0);
-                QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc);
+                opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
+                rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL);
+                QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
         }
         RETURN(err);
 }
@@ -564,158 +711,280 @@ static int dqacq_interpret(const struct lu_env *env,
 {
         struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
         struct lustre_quota_ctxt *qctxt = aa->aa_ctxt;
+        struct obd_device_target *obt = qctxt->lqc_obt;
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        struct qunit_data_old *qdata_old = NULL;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        if ((req->rq_import->imp_connect_data.ocd_connect_flags &
-             OBD_CONNECT_QUOTA64) &&
-            !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
-                CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
+        down_read(&obt->obt_rwsem);
+        /* if a quota req timeouts or is dropped, we should update quota
+         * statistics which will be handled in dqacq_completion. And in
+         * this situation we should get qdata from request instead of
+         * reply */
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
+                DEBUG_REQ(D_ERROR, req,
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                RETURN(PTR_ERR(qdata));
+        }
 
-                qdata = req_capsule_server_swab_get(&req->rq_pill,
-                                                    &RMF_QUNIT_DATA,
-                                          (void*)lustre_swab_qdata);
-        } else {
-                CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
+        QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
+        QDATA_DEBUG((&qunit->lq_data), "lq_data: \n");
 
-                qdata = req_capsule_server_swab_get(&req->rq_pill,
-                                                    &RMF_QUNIT_DATA,
-                                       (void*)lustre_swab_qdata_old);
-                qdata = lustre_quota_old_to_new(qdata_old);
+        if (qdata->qd_id != qunit->lq_data.qd_id ||
+            OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) {
+                CDEBUG(D_ERROR, "the returned qd_id isn't expected!"
+                       "(qdata: %u, lq_data: %u)\n", qdata->qd_id,
+                       qunit->lq_data.qd_id);
+                qdata->qd_id = qunit->lq_data.qd_id;
+                rc = -EPROTO;
+        }
+        if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) {
+                CDEBUG(D_ERROR, "the returned grp/usr isn't expected!"
+                       "(qdata: %u, lq_data: %u)\n", qdata->qd_flags,
+                       qunit->lq_data.qd_flags);
+                if (QDATA_IS_GRP(&qunit->lq_data))
+                        QDATA_SET_GRP(qdata);
+                else
+                        QDATA_CLR_GRP(qdata);
+                rc = -EPROTO;
         }
-        if (qdata == NULL) {
-                DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data");
-                RETURN(-EPROTO);
+        if (qdata->qd_count > qunit->lq_data.qd_count) {
+                CDEBUG(D_ERROR, "the returned qd_count isn't expected!"
+                       "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count,
+                       qunit->lq_data.qd_count);
+                rc = -EPROTO;
         }
 
-        LASSERT(qdata->qd_id == qunit->lq_data.qd_id &&
-                (qdata->qd_flags & QUOTA_IS_GRP) ==
-                 (qunit->lq_data.qd_flags & QUOTA_IS_GRP) &&
-                (qdata->qd_count == qunit->lq_data.qd_count ||
-                 qdata->qd_count == 0));
-
-        QDATA_DEBUG(qdata, "%s interpret rc(%d).\n",
-                    lustre_msg_get_opc(req->rq_reqmsg) == QUOTA_DQACQ ?
-                    "DQACQ" : "DQREL", rc);
-
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
+        up_read(&obt->obt_rwsem);
         RETURN(rc);
 }
 
-static int got_qunit(struct qunit_waiter *waiter)
+/**
+ * check if quota master is online
+ */
+int check_qm(struct lustre_quota_ctxt *qctxt)
 {
-        int rc = 0;
+        int rc;
+        ENTRY;
+
+        spin_lock(&qctxt->lqc_lock);
+        /* quit waiting when mds is back or qctxt is cleaned up */
+        rc = qctxt->lqc_import || !qctxt->lqc_valid;
+        spin_unlock(&qctxt->lqc_lock);
+
+        RETURN(rc);
+}
+
+/* wake up all waiting threads when lqc_import is NULL */
+void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
+{
+        struct lustre_qunit *qunit, *tmp;
+        int i;
         ENTRY;
+
         spin_lock(&qunit_hash_lock);
-        rc = list_empty(&waiter->qw_entry);
+        for (i = 0; i < NR_DQHASH; i++) {
+                list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+                        if (qunit->lq_ctxt != qctxt)
+                                continue;
+
+                        /* Wake up all waiters. Do not change lq_state.
+                         * The waiters will check lq_rc which is kept as 0
+                         * if no others change it, then the waiters will return
+                         * -EAGAIN to caller who can perform related quota
+                         * acq/rel if necessary. */
+                        wake_up_all(&qunit->lq_waitq);
+                }
+        }
         spin_unlock(&qunit_hash_lock);
+        EXIT;
+}
+
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
+{
+        struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&qunit->lq_lock);
+        switch (qunit->lq_state) {
+        case QUNIT_IN_HASH:
+        case QUNIT_RM_FROM_HASH:
+                break;
+        case QUNIT_FINISHED:
+                rc = 1;
+                break;
+        default:
+                CERROR("invalid qunit state %d\n", qunit->lq_state);
+        }
+        spin_unlock(&qunit->lq_lock);
+
+        if (!rc) {
+                spin_lock(&qctxt->lqc_lock);
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
+                spin_unlock(&qctxt->lqc_lock);
+        }
+
         RETURN(rc);
 }
 
 static int
-schedule_dqacq(struct obd_device *obd,
-               struct lustre_quota_ctxt *qctxt,
-               struct qunit_data *qdata, int opc, int wait)
+schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
+               struct qunit_data *qdata, int opc, int wait,
+               struct obd_trans_info *oti)
 {
         struct lustre_qunit *qunit, *empty;
-        struct qunit_waiter qw;
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_request *req;
-        struct qunit_data *reqdata;
         struct dqacq_async_args *aa;
-       unsigned long factor;   
+        struct obd_import *imp = NULL;
+        struct lustre_qunit_size *lqs = NULL;
+        struct timeval work_start;
+        struct timeval work_end;
+        long timediff;
         int rc = 0;
         ENTRY;
 
-        CFS_INIT_LIST_HEAD(&qw.qw_entry);
-        init_waitqueue_head(&qw.qw_waitq);
-        qw.qw_rc = 0;
-
+        LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
+        do_gettimeofday(&work_start);
         if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
                 RETURN(-ENOMEM);
 
         spin_lock(&qunit_hash_lock);
-
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                if (wait)
-                        list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
                 spin_unlock(&qunit_hash_lock);
+                qunit_put(empty);
 
-                free_qunit(empty);
                 goto wait_completion;
         }
         qunit = empty;
+        qunit_get(qunit);
         insert_qunit_nolock(qctxt, qunit);
-        if (wait)
-                list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
         spin_unlock(&qunit_hash_lock);
 
-        LASSERT(qunit);
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
+                spin_lock(&lqs->lqs_lock);
+                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
+                /* when this qdata returned from mds, it will call lqs_putref */
+                lqs_getref(lqs);
+                spin_unlock(&lqs->lqs_lock);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
+        } else {
+                CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
+        }
 
+        QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
+                    obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
         /* master is going to dqacq/dqrel from itself */
-        if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP))
-        {
+        if (is_master(qctxt)) {
                 int rc2;
                 QDATA_DEBUG(qdata, "local %s.\n",
                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
+                QDATA_SET_CHANGE_QS(qdata);
                 rc = qctxt->lqc_handler(obd, qdata, opc);
                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
-                RETURN((rc && rc != -EDQUOT) ? rc : rc2);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+
+                do_gettimeofday(&work_end);
+                timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
+                if (opc == QUOTA_DQACQ)
+                        lprocfs_counter_add(qctxt->lqc_stats,
+                                            wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ,
+                                            timediff);
+                else
+                        lprocfs_counter_add(qctxt->lqc_stats,
+                                            wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL,
+                                            timediff);
+                RETURN(rc ? rc : rc2);
+        }
+
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_import) {
+                spin_unlock(&qctxt->lqc_lock);
+                QDATA_DEBUG(qdata, "lqc_import is invalid.\n");
+
+                spin_lock(&qunit_hash_lock);
+                remove_qunit_nolock(qunit);
+                spin_unlock(&qunit_hash_lock);
+
+                compute_lqs_after_removing_qunit(qunit);
+
+                QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
+                wake_up_all(&qunit->lq_waitq);
+
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+                /* this for alloc_qunit() */
+                qunit_put(qunit);
+                spin_lock(&qctxt->lqc_lock);
+                if (wait && !qctxt->lqc_import) {
+                        spin_unlock(&qctxt->lqc_lock);
+
+                        LASSERT(oti && oti->oti_thread &&
+                                oti->oti_thread->t_watchdog);
+
+                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "sleep for quota master\n");
+                        l_wait_event(qctxt->lqc_wait_for_qmaster,
+                                     check_qm(qctxt), &lwi);
+                        CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                 GET_TIMEOUT(oti->oti_thread->t_svc));
+                } else {
+                        spin_unlock(&qctxt->lqc_lock);
+                }
+
+                RETURN(-EAGAIN);
         }
+        imp = class_import_get(qctxt->lqc_import);
+        spin_unlock(&qctxt->lqc_lock);
 
         /* build dqacq/dqrel request */
-        LASSERT(qctxt->lqc_import);
+        LASSERT(imp);
 
-        req = ptlrpc_request_alloc_pack(qctxt->lqc_import, &RQF_MDS_QUOTA_DQACQ,
+        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_QUOTA_DQACQ,
                                         LUSTRE_MDS_VERSION, opc);
+        class_import_put(imp);
         if (req == NULL) {
+                CDEBUG(D_ERROR, "Can't alloc request\n");
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
                 RETURN(-ENOMEM);
         }
 
-       if (qdata->qd_flags & QUOTA_IS_BLOCK)
-               factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz *
-                         qctxt->lqc_bunit_sz;
-        else
-                factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz *
-                         qctxt->lqc_iunit_sz;
-
-        LASSERT(!should_translate_quota(qctxt->lqc_import) ||
-                qdata->qd_count <= factor);
-        if (should_translate_quota(qctxt->lqc_import))
-        {
-                struct qunit_data_old *reqdata_old, *tmp;
-
-                reqdata_old = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_QUNIT_DATA);
-
-                tmp = lustre_quota_new_to_old(qdata);
-                *reqdata_old = *tmp;
-                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
-                                     sizeof(*reqdata_old));
-                CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
-        } else {
-                reqdata = req_capsule_client_get(&req->rq_pill,
-                                                 &RMF_QUNIT_DATA);
-
-                *reqdata = *qdata;
-                req_capsule_set_size(&req->rq_pill, &RMF_QUNIT_DATA, RCL_SERVER,
-                                     sizeof(*reqdata));
-                CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
-        }
         ptlrpc_request_set_replen(req);
+        req->rq_no_resend = req->rq_no_delay = 1;
+        rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
+        if (rc < 0) {
+                CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+                ptlrpc_req_finished(req);
+                dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+                RETURN(rc);
+        }
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct dqacq_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         aa->aa_ctxt = qctxt;
         aa->aa_qunit = qunit;
 
@@ -727,23 +996,45 @@ schedule_dqacq(struct obd_device *obd,
 wait_completion:
         if (wait && qunit) {
                 struct qunit_data *p = &qunit->lq_data;
-                QDATA_DEBUG(p, "wait for dqacq.\n");
-
-                l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
-                if (qw.qw_rc == 0)
-                        rc = -EAGAIN;
 
-                CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+                QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                /* rc = -EAGAIN, it means the quota master isn't ready yet
+                 * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
+                 * rc = -EDQUOT, it means out of quota
+                 * rc = -EBUSY, it means recovery is happening
+                 * other rc < 0, it means real errors, functions who call
+                 * schedule_dqacq should take care of this */
+                spin_lock(&qunit->lq_lock);
+                rc = qunit->lq_rc;
+                spin_unlock(&qunit->lq_lock);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
+
+        qunit_put(qunit);
+        do_gettimeofday(&work_end);
+        timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
+        if (opc == QUOTA_DQACQ)
+                lprocfs_counter_add(qctxt->lqc_stats,
+                                    wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ,
+                                    timediff);
+        else
+                lprocfs_counter_add(qctxt->lqc_stats,
+                                    wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL,
+                                    timediff);
+
         RETURN(rc);
 }
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait)
+                   const unsigned int id[], __u32 isblk, int wait,
+                   struct obd_trans_info *oti)
 {
-        int ret, rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
+        int rc = 0, i = USRQUOTA;
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
@@ -753,20 +1044,26 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         for (i = 0; i < MAXQUOTAS; i++) {
                 qdata[i].qd_id = id[i];
-                qdata[i].qd_flags = 0;
-                qdata[i].qd_flags |= i;
-                qdata[i].qd_flags |= isblk ? QUOTA_IS_BLOCK : 0;
+                qdata[i].qd_flags = i;
+                if (isblk)
+                        QDATA_SET_BLK(&qdata[i]);
                 qdata[i].qd_count = 0;
 
-                ret = check_cur_qunit(obd, qctxt, &qdata[i]);
-                if (ret > 0) {
+                rc = check_cur_qunit(obd, qctxt, &qdata[i]);
+                if (rc > 0) {
                         int opc;
                         /* need acquire or release */
-                        opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
-                        ret = split_before_schedule_dqacq(obd, qctxt, &qdata[i],
-                                                          opc, wait);
-                        if (!rc)
-                                rc = ret;
+                        opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
+                        rc = schedule_dqacq(obd, qctxt, &qdata[i], opc,
+                                            wait,oti);
+                        if (rc < 0)
+                                RETURN(rc);
+                } else if (wait == 1) {
+                        /* when wait equates 1, that means mds_quota_acquire
+                         * or filter_quota_acquire is calling it. */
+                        rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
+                        if (rc < 0)
+                                RETURN(rc);
                 }
         }
 
@@ -778,93 +1075,170 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                          unsigned short type, int isblk)
 {
         struct lustre_qunit *qunit = NULL;
-        struct qunit_waiter qw;
         struct qunit_data qdata;
+        struct timeval work_start;
+        struct timeval work_end;
+        long timediff;
         struct l_wait_info lwi = { 0 };
+        int rc = 0;
         ENTRY;
 
-        CFS_INIT_LIST_HEAD(&qw.qw_entry);
-        init_waitqueue_head(&qw.qw_waitq);
-        qw.qw_rc = 0;
-
+        do_gettimeofday(&work_start);
         qdata.qd_id = id;
-        qdata.qd_flags = 0;
-        qdata.qd_flags |= type;
-        qdata.qd_flags |= isblk ? QUOTA_IS_BLOCK : 0;
+        qdata.qd_flags = type;
+        if (isblk)
+                QDATA_SET_BLK(&qdata);
         qdata.qd_count = 0;
 
         spin_lock(&qunit_hash_lock);
-
         qunit = dqacq_in_flight(qctxt, &qdata);
-        if (qunit)
-                list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
-
         spin_unlock(&qunit_hash_lock);
 
         if (qunit) {
-                struct qunit_data *p = &qdata;
-                QDATA_DEBUG(p, "wait for dqacq completion.\n");
-                l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
-                QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+                struct qunit_data *p = &qunit->lq_data;
+
+                QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
+                /* keep same as schedule_dqacq() b=17030 */
+                spin_lock(&qunit->lq_lock);
+                rc = qunit->lq_rc;
+                spin_unlock(&qunit->lq_lock);
+                /* this is for dqacq_in_flight() */
+                qunit_put(qunit);
+                do_gettimeofday(&work_end);
+                timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
+                lprocfs_counter_add(qctxt->lqc_stats,
+                                    isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA :
+                                            LQUOTA_WAIT_PENDING_INO_QUOTA,
+                                    timediff);
+        } else {
+                do_gettimeofday(&work_end);
+                timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
+                lprocfs_counter_add(qctxt->lqc_stats,
+                                    isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA :
+                                            LQUOTA_NOWAIT_PENDING_INO_QUOTA,
+                                    timediff);
         }
-        RETURN(0);
+
+        RETURN(rc);
 }
 
 int
-qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb,
-           dqacq_handler_t handler)
+qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
 {
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct obd_device_target *obt = &obd->u.obt;
+        struct super_block *sb = obt->obt_sb;
         int rc = 0;
         ENTRY;
 
+        LASSERT(qctxt);
+
         rc = ptlrpcd_addref();
         if (rc)
                 RETURN(rc);
 
+        cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+        spin_lock_init(&qctxt->lqc_lock);
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_handler = handler;
         qctxt->lqc_sb = sb;
+        qctxt->lqc_obt = obt;
         qctxt->lqc_import = NULL;
         qctxt->lqc_recovery = 0;
-        qctxt->lqc_atype = 0;
-        qctxt->lqc_status= 0;
+        qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */
+        qctxt->lqc_valid = 1;
+        qctxt->lqc_cqs_boundary_factor = 4;
+        qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE;
+        qctxt->lqc_cqs_least_iunit = 2;
+        qctxt->lqc_cqs_qs_factor = 2;
+        qctxt->lqc_flags = 0;
+        QUOTA_MASTER_UNREADY(qctxt);
         qctxt->lqc_bunit_sz = default_bunit_sz;
         qctxt->lqc_btune_sz = default_bunit_sz / 100 * default_btune_ratio;
         qctxt->lqc_iunit_sz = default_iunit_sz;
         qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100;
+        qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes
+                                          * after the last shrinking */
+        qctxt->lqc_sync_blk = 0;
+        spin_unlock(&qctxt->lqc_lock);
+
+        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+                                               HASH_LQS_CUR_BITS,
+                                               HASH_LQS_MAX_BITS,
+                                               &lqs_hash_ops, 0);
+        if (!qctxt->lqc_lqs_hash) {
+                CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
+                RETURN(-ENOMEM);
+        }
 
-        RETURN(0);
+#ifdef LPROCFS
+        rc = lquota_proc_setup(obd, is_master(qctxt));
+        if (rc)
+                CERROR("initialize proc for %s error!\n", obd->obd_name);
+#endif
+
+        RETURN(rc);
 }
 
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
-        struct qunit_waiter *qw, *tmp2;
+        struct list_head tmp_list;
+        struct obd_device_target *obt = qctxt->lqc_obt;
         int i;
         ENTRY;
 
-        spin_lock(&qunit_hash_lock);
+        CFS_INIT_LIST_HEAD(&tmp_list);
+
+        spin_lock(&qctxt->lqc_lock);
+        qctxt->lqc_valid = 0;
+        spin_unlock(&qctxt->lqc_lock);
 
+        spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++) {
                 list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
                         if (qunit->lq_ctxt != qctxt)
                                 continue;
-
                         remove_qunit_nolock(qunit);
-                        /* wake up all waiters */
-                        list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters,
-                                                 qw_entry) {
-                                list_del_init(&qw->qw_entry);
-                                qw->qw_rc = 0;
-                                wake_up(&qw->qw_waitq);
-                        }
-                        qunit_put(qunit);
+                        list_add(&qunit->lq_hash, &tmp_list);
                 }
         }
-
         spin_unlock(&qunit_hash_lock);
 
+        list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
+                list_del_init(&qunit->lq_hash);
+                compute_lqs_after_removing_qunit(qunit);
+
+                /* wake up all waiters */
+                QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0);
+                wake_up_all(&qunit->lq_waitq);
+                qunit_put(qunit);
+        }
+
+        down_write(&obt->obt_rwsem);
+        lustre_hash_exit(qctxt->lqc_lqs_hash);
+        qctxt->lqc_lqs_hash = NULL;
+        up_write(&obt->obt_rwsem);
+
+        /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
+         * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
+        while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
+                cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
+                cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
+                                     cfs_time_seconds(1));
+        }
+
         ptlrpcd_decref();
 
+#ifdef LPROCFS
+        if (lquota_proc_cleanup(qctxt))
+                CERROR("cleanup proc error!\n");
+#endif
+
         EXIT;
 }
 
@@ -886,11 +1260,20 @@ static int qslave_recovery_main(void *arg)
 
         ptlrpc_daemonize("qslave_recovd");
 
+        /* for obdfilter */
+        class_incref(obd, "qslave_recovd_filter", obd);
+
         complete(&data->comp);
 
-        if (qctxt->lqc_recovery)
+        spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_recovery) {
+                spin_unlock(&qctxt->lqc_lock);
+                class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
-        qctxt->lqc_recovery = 1;
+        } else {
+                qctxt->lqc_recovery = 1;
+                spin_unlock(&qctxt->lqc_lock);
+        }
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
@@ -919,24 +1302,27 @@ static int qslave_recovery_main(void *arg)
                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
                         list_del_init(&dqid->di_link);
                         /* skip slave recovery on itself */
-                        if (is_master(obd, qctxt, dqid->di_id, type))
+                        if (is_master(qctxt))
                                 goto free;
                         if (rc && rc != -EBUSY)
                                 goto free;
 
                         qdata.qd_id = dqid->di_id;
-                        qdata.qd_flags = 0;
-                        qdata.qd_flags |= type;
-                        qdata.qd_flags |= QUOTA_IS_BLOCK;
+                        qdata.qd_flags = type;
+                        QDATA_SET_BLK(&qdata);
                         qdata.qd_count = 0;
 
                         ret = check_cur_qunit(obd, qctxt, &qdata);
                         if (ret > 0) {
                                 int opc;
                                 opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
-                                rc = split_before_schedule_dqacq(obd, qctxt, &qdata, opc, 0);
-                        } else
+                                rc = schedule_dqacq(obd, qctxt, &qdata, opc,
+                                                    0, NULL);
+                                if (rc == -EDQUOT)
+                                        rc = 0;
+                        } else {
                                 rc = 0;
+                        }
 
                         if (rc)
                                 CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
@@ -947,7 +1333,10 @@ free:
                 }
         }
 
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
@@ -974,3 +1363,99 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
 exit:
         EXIT;
 }
+
+
+/**
+ * lqs<->qctxt hash operations
+ */
+
+/**
+ * string hashing using djb2 hash algorithm
+ */
+static unsigned
+lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
+{
+        struct quota_adjust_qunit *lqs_key;
+        unsigned hash;
+        ENTRY;
+
+        LASSERT(key);
+        lqs_key = (struct quota_adjust_qunit *)key;
+        hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id;
+
+        RETURN(hash & mask);
+}
+
+static int
+lqs_compare(void *key, struct hlist_node *hnode)
+{
+        struct lustre_qunit_size *q;
+        int rc;
+        ENTRY;
+
+        LASSERT(key);
+        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+
+        spin_lock(&q->lqs_lock);
+        rc = (q->lqs_key == *((unsigned long long *)key));
+        spin_unlock(&q->lqs_lock);
+
+        RETURN(rc);
+}
+
+static void *
+lqs_get(struct hlist_node *hnode)
+{
+        struct lustre_qunit_size *q =
+            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+        ENTRY;
+
+        atomic_inc(&q->lqs_refcount);
+        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
+               q, atomic_read(&q->lqs_refcount));
+
+        RETURN(q);
+}
+
+static void *
+lqs_put(struct hlist_node *hnode)
+{
+        struct lustre_qunit_size *q =
+            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+        ENTRY;
+
+        LASSERT(atomic_read(&q->lqs_refcount) > 0);
+        atomic_dec(&q->lqs_refcount);
+        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
+               q, atomic_read(&q->lqs_refcount));
+
+        RETURN(q);
+}
+
+static void
+lqs_exit(struct hlist_node *hnode)
+{
+        struct lustre_qunit_size *q;
+        ENTRY;
+
+        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+        /*
+         * Nothing should be left. User of lqs put it and
+         * lqs also was deleted from table by this time
+         * so we should have 0 refs.
+         */
+        LASSERTF(atomic_read(&q->lqs_refcount) == 0,
+                 "Busy lqs %p with %d refs\n", q,
+                 atomic_read(&q->lqs_refcount));
+        OBD_FREE_PTR(q);
+        EXIT;
+}
+
+static lustre_hash_ops_t lqs_hash_ops = {
+        .lh_hash    = lqs_hash,
+        .lh_compare = lqs_compare,
+        .lh_get     = lqs_get,
+        .lh_put     = lqs_put,
+        .lh_exit    = lqs_exit
+};
+#endif /* HAVE_QUOTA_SUPPORT */