Whamcloud - gitweb
LU-14927 quota: move qsd_transfer to lquota module
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index 5bdb0cf..1c8e43a 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
-#include <linux/version.h>
-#include <linux/fs.h>
-#include <asm/unistd.h>
-#include <linux/quotaops.h>
-#include <linux/init.h>
-
-#include <obd_class.h>
-#include <lustre_param.h>
-#include <lprocfs_status.h>
-
 #include "qsd_internal.h"
 
-/*
- * helper function returning how much space is currently reserved for requests
- * in flight.
+/**
+ * helper function bumping lqe_pending_req if there is no quota request in
+ * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
  */
-static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
+static inline int qsd_request_enter(struct lquota_entry *lqe)
 {
-       int     pending;
+       /* is there already a quota request in flight? */
+       if (lqe->lqe_pending_req != 0) {
+               LQUOTA_DEBUG(lqe, "already a request in flight");
+               return -EBUSY;
+       }
 
-       lqe_read_lock(lqe);
-       pending = lqe->lqe_pending_req;
-       lqe_read_unlock(lqe);
+       if (lqe->lqe_pending_rel != 0) {
+               LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
+                            lqe->lqe_pending_rel);
+               LBUG();
+       }
 
-       return pending;
+       lqe->lqe_pending_req++;
+       return 0;
 }
 
-/*
- * helper function returning true when the connection to master is ready to be
- * used.
+/**
+ * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
  */
-static inline int qsd_ready(struct qsd_instance *qsd)
+static inline void qsd_request_exit(struct lquota_entry *lqe)
 {
+       if (lqe->lqe_pending_req != 1) {
+               LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
+               LBUG();
+       }
+       lqe->lqe_pending_req--;
+       lqe->lqe_pending_rel = 0;
+       wake_up(&lqe->lqe_waiters);
+}
+
+/**
+ * Check whether a qsd instance is all set to send quota request to master.
+ * This includes checking whether:
+ * - the connection to master is set up and usable,
+ * - the qsd isn't stopping
+ * - reintegration has been successfully completed and all indexes are
+ *   up-to-date
+ *
+ * \param lqe - is the lquota entry for which we would like to send an quota
+ *              request
+ * \param lockh - is the remote handle of the global lock returned on success
+ *
+ * \retval 0 on success, appropriate error on failure
+ */
+static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
+{
+       struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
        struct obd_import       *imp = NULL;
+       struct ldlm_lock        *lock;
+       ENTRY;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
+       /* is the qsd about to shut down? */
+       if (qsd->qsd_stopping) {
+               read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
+               /* Target is about to shut down, client will retry */
+               RETURN(-EINPROGRESS);
+       }
+
+       /* is the connection to the quota master ready? */
        if (qsd->qsd_exp_valid)
                imp = class_exp2cliimp(qsd->qsd_exp);
-       cfs_read_unlock(&qsd->qsd_lock);
+       if (imp == NULL || imp->imp_invalid) {
+               read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "connection to master not ready");
+               RETURN(-ENOTCONN);
+       }
+
+       /* In most case, reintegration must have been triggered (when enable
+        * quota or on OST start), however, in rare race condition (enabling
+        * quota when starting OSTs), we might miss triggering reintegration
+        * for some qqi.
+        *
+        * If the previous reintegration failed for some reason, we'll
+        * re-trigger it here as well. */
+       if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
+               read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
+                            "kicking off reintegration");
+               qsd_start_reint_thread(qqi);
+               RETURN(-EINPROGRESS);
+       }
+
+       /* Fill the remote global lock handle, master will check this handle
+        * to see if the slave is sending request with stale lock */
+       lustre_handle_copy(lockh, &qqi->qqi_lockh);
+       read_unlock(&qsd->qsd_lock);
+
+       if (!lustre_handle_is_used(lockh))
+               RETURN(-ENOLCK);
+
+       lock = ldlm_handle2lock(lockh);
+       if (lock == NULL)
+               RETURN(-ENOLCK);
 
-       return (imp == NULL || imp->imp_invalid) ? false : true;
+       /* return remote lock handle to be packed in quota request */
+       lustre_handle_copy(lockh, &lock->l_remote_handle);
+       LDLM_LOCK_PUT(lock);
+
+       RETURN(0);
 }
 
-/*
- * Helper function returning true when quota space need to be adjusted (some
- * unused space should be free or pre-acquire) and false otherwise.
+/**
+ * Check whether any quota space adjustment (pre-acquire/release/report) is
+ * needed for a given quota ID. If a non-null \a qbody is passed, then the
+ * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
+ * to be packed in the quota request.
+ *
+ * \param lqe   - is the lquota entry for which we would like to adjust quota
+ *                space.
+ * \param qbody - is the quota body to fill, if not NULL.
+ *
+ * \retval true  - space adjustment is required and \a qbody is filled, if not
+ *                 NULL
+ * \retval false - no space adjustment required
  */
-static bool qsd_adjust_needed(struct lquota_entry *lqe)
+static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
 {
-       struct qsd_qtype_info   *qqi;
-       __u64                    usage, granted;
+       __u64   usage, granted;
+       ENTRY;
 
-       qqi = lqe2qqi(lqe);
+       usage   = lqe->lqe_usage;
+       usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
+       granted = lqe->lqe_granted;
 
-       if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
-               /* if quota isn't enforced for this id, no need to adjust
-                * Similarly, no need to perform adjustment if the target is in
-                * the process of shutting down. */
-               return false;
+       if (qbody != NULL)
+               qbody->qb_flags = 0;
+
+       if (!lqe->lqe_enforced) {
+               /* quota not enforced any more for this ID */
+               if (granted != 0) {
+                       /* release all quota space unconditionally */
+                       LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
+                       if (qbody != NULL) {
+                               qbody->qb_count = granted;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+                       }
+                       RETURN(true);
+               }
+               RETURN(false);
+       }
+
+       if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
+               /* No valid per-ID lock
+                * When reporting quota (during reintegration or on setquota
+                * glimpse), we should release granted space if usage is 0.
+                * Otherwise, if the usage is less than granted, we need to
+                * acquire the per-ID lock to make sure the unused grant can be
+                * reclaimed by per-ID lock glimpse. */
+               if (usage == 0) {
+                       /* no on-disk usage and no outstanding activity, release
+                        * space */
+                       if (granted != 0) {
+                               LQUOTA_DEBUG(lqe, "no usage, releasing all "
+                                            "space");
+                               if (qbody != NULL) {
+                                       qbody->qb_count = granted;
+                                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+                               }
+                               RETURN(true);
+                       }
+                       LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
+                                    "do");
+                       RETURN(false);
+               }
 
-       usage  = lqe->lqe_usage;
-       usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
-       granted = lqe->lqe_granted - lqe->lqe_pending_rel;
+               if (lqe->lqe_usage < lqe->lqe_granted) {
+                       /* holding quota space w/o any lock, enqueue per-ID lock
+                        * again */
+                       LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
+                       if (qbody != NULL) {
+                               qbody->qb_count = 0;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
+                       }
+                       RETURN(true);
+               }
 
-       /* need to re-acquire per-ID lock or release all grant */
-       if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
-           lqe->lqe_granted > lqe->lqe_usage)
-               return true;
+               if (lqe->lqe_usage > lqe->lqe_granted) {
+                       /* quota overrun, report usage */
+                       LQUOTA_DEBUG(lqe, "overrun, reporting usage");
+                       if (qbody != NULL) {
+                               qbody->qb_usage = lqe->lqe_usage;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                       }
+                       RETURN(true);
+               }
+               LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
+               RETURN(false);
+       }
 
-       /* good old quota qunit adjustment logic which has been around since
-        * lustre 1.4:
-        * 1. Need to release some space? */
-       if (granted > usage + lqe->lqe_qunit)
-               return true;
+       /* valid per-ID lock
+        * Apply good old quota qunit adjustment logic which has been around
+        * since lustre 1.4:
+        * 1. release spare quota space? */
+       if (granted > usage + lqe->lqe_qunit) {
+               /* pre-release quota space */
+               if (qbody == NULL)
+                       RETURN(true);
+               qbody->qb_count = granted - usage;
+               /* if usage == 0, release all granted space */
+               if (usage) {
+                       /* try to keep one qunit of quota space */
+                       qbody->qb_count -= lqe->lqe_qunit;
+                       /* but don't release less than qtune to avoid releasing
+                        * space too often */
+                       if (qbody->qb_count < lqe->lqe_qtune)
+                               qbody->qb_count = lqe->lqe_qtune;
+               }
+               qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+               RETURN(true);
+       }
 
        /* 2. Any quota overrun? */
-       if (lqe->lqe_usage > lqe->lqe_granted)
-               /* we ended up consuming more than we own, we need to have this
-                * fixed ASAP */
-               return true;
+       if (lqe->lqe_usage > lqe->lqe_granted) {
+               /* we overconsumed quota space, we report usage in request so
+                * that master can adjust it unconditionally */
+               if (qbody == NULL)
+                       RETURN(true);
+               qbody->qb_usage = lqe->lqe_usage;
+               granted         = lqe->lqe_usage;
+               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+       }
 
        /* 3. Time to pre-acquire? */
-       if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
-           granted < usage + lqe->lqe_qtune)
-               /* need to pre-acquire some space if we don't want to block
-                * client's requests */
-               return true;
+       if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
+           lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
+               /* To pre-acquire quota space, we report how much spare quota
+                * space the slave currently owns, then the master will grant us
+                * back how much we can pretend given the current state of
+                * affairs */
+               if (qbody == NULL)
+                       RETURN(true);
+               if (granted <= usage)
+                       qbody->qb_count = 0;
+               else
+                       qbody->qb_count = granted - usage;
+               qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
+               RETURN(true);
+       }
 
-       return false;
+       if (qbody != NULL)
+               RETURN(qbody->qb_flags != 0);
+       else
+               RETURN(false);
 }
 
-/*
+/**
+ * Helper function returning true when quota space need to be adjusted (some
+ * unused space should be free or pre-acquire) and false otherwise.
+ */
+static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
+{
+       return qsd_calc_adjust(lqe, NULL);
+}
+
+/**
  * Callback function called when an acquire/release request sent to the master
  * is completed
  */
-static void qsd_dqacq_completion(const struct lu_env *env,
-                                struct qsd_qtype_info *qqi,
-                                struct quota_body *reqbody,
-                                struct quota_body *repbody,
-                                struct lustre_handle *lockh,
-                                union ldlm_wire_lvb *lvb,
-                                void *arg, int ret)
+static void qsd_req_completion(const struct lu_env *env,
+                              struct qsd_qtype_info *qqi,
+                              struct quota_body *reqbody,
+                              struct quota_body *repbody,
+                              struct lustre_handle *lockh,
+                              struct lquota_lvb *lvb,
+                              void *arg, int ret)
 {
        struct lquota_entry     *lqe = (struct lquota_entry *)arg;
        struct qsd_thread_info  *qti;
@@ -157,7 +327,6 @@ static void qsd_dqacq_completion(const struct lu_env *env,
        qti = qsd_info(env);
 
        lqe_write_lock(lqe);
-
        LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
                     reqbody->qb_flags);
 
@@ -181,12 +350,12 @@ static void qsd_dqacq_completion(const struct lu_env *env,
         * the DQACQ since the limit for this ID has been removed, so we
         * should not update quota entry & slave index copy neither. */
        if (repbody != NULL && repbody->qb_count != 0) {
-               LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
+               LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
 
                if (req_is_rel(reqbody->qb_flags)) {
                        if (lqe->lqe_granted < repbody->qb_count) {
                                LQUOTA_ERROR(lqe, "can't release more space "
-                                            "than owned "LPU64"<"LPU64,
+                                            "than owned %llu<%llu",
                                             lqe->lqe_granted,
                                             repbody->qb_count);
                                lqe->lqe_granted = 0;
@@ -216,13 +385,10 @@ static void qsd_dqacq_completion(const struct lu_env *env,
        }
 
        /* extract information from lvb */
-       if (ret == 0 && lvb != 0) {
-               if (lvb->l_lquota.lvb_id_qunit != 0)
-                       qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
-               if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
-                       lqe->lqe_edquot = true;
-               else
-                       lqe->lqe_edquot = false;
+       if (ret == 0 && lvb != NULL) {
+               if (lvb->lvb_id_qunit != 0)
+                       qsd_set_qunit(lqe, lvb->lvb_id_qunit);
+               qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
        } else if (repbody != NULL && repbody->qb_qunit != 0) {
                qsd_set_qunit(lqe, repbody->qb_qunit);
        }
@@ -234,13 +400,14 @@ static void qsd_dqacq_completion(const struct lu_env *env,
                lqe->lqe_nopreacq = true;
 out:
        adjust = qsd_adjust_needed(lqe);
+       if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
+               lqe->lqe_acq_rc = ret;
+               lqe->lqe_acq_time = ktime_get_seconds();
+       }
 out_noadjust:
-       lqe->lqe_pending_req--;
-       lqe->lqe_pending_rel = 0;
+       qsd_request_exit(lqe);
        lqe_write_unlock(lqe);
 
-       cfs_waitq_broadcast(&lqe->lqe_waiters);
-
        /* release reference on per-ID lock */
        if (lustre_handle_is_used(lockh))
                ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
@@ -253,15 +420,23 @@ out_noadjust:
                else
                        qsd_adjust_schedule(lqe, true, false);
        }
+       lqe_putref(lqe);
 
        if (lvb)
-               /* free lvb allocated in qsd_dqacq */
                OBD_FREE_PTR(lvb);
-
-       lqe_putref(lqe);
        EXIT;
 }
 
+/**
+ * Try to consume local quota space.
+ *
+ * \param lqe   - is the qid entry to be processed
+ * \param space - is the amount of quota space needed to complete the operation
+ *
+ * \retval 0       - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EAGAIN - need to acquire space from master
+ */
 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
 {
        __u64   usage;
@@ -270,7 +445,7 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
 
        if (!lqe->lqe_enforced)
                /* not enforced any more, we are good */
-               RETURN(0);
+               RETURN(-ESRCH);
 
        lqe_write_lock(lqe);
        /* use latest usage */
@@ -283,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
                lqe->lqe_pending_write += space;
                lqe->lqe_waiting_write -= space;
                rc = 0;
-       } else if (lqe->lqe_edquot) {
+       /* lqe_edquot flag is used to avoid flooding dqacq requests when
+        * the user is over quota, however, the lqe_edquot could be stale
+        * sometimes due to the race reply of dqacq vs. id lock glimpse
+        * (see LU-4505), so we revalidate it every 5 seconds. */
+       } else if (lqe->lqe_edquot &&
+                  (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
                rc = -EDQUOT;
-       } else {
+       }else {
                rc = -EAGAIN;
        }
        lqe_write_unlock(lqe);
@@ -293,294 +473,216 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
        RETURN(rc);
 }
 
-static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
-                          struct quota_body *qbody)
+/**
+ * Compute how much quota space should be acquire from the master based
+ * on how much is currently granted to this slave and pending/waiting
+ * operations.
+ *
+ * \param lqe - is the lquota entry for which we would like to adjust quota
+ *              space.
+ * \param qbody - is the quota body of the acquire request to fill
+ *
+ * \retval true  - space acquisition is needed and qbody is filled
+ * \retval false - no space acquisition required
+ */
+static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
+                                   struct quota_body *qbody)
 {
-       struct qsd_qtype_info   *qqi;
-       __u64                    usage, granted;
-
-       if (!lqe->lqe_enforced && op != QSD_REL)
-               return 0;
+       __u64   usage, granted;
 
-       qqi = lqe2qqi(lqe);
-
-       LASSERT(lqe->lqe_pending_rel == 0);
        usage   = lqe->lqe_usage;
        usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
        granted = lqe->lqe_granted;
 
        qbody->qb_flags = 0;
-again:
-       switch (op) {
-       case QSD_ACQ:
-               /* if we overconsumed quota space, we report usage in request
-                * so that master can adjust it unconditionally */
-               if (lqe->lqe_usage > lqe->lqe_granted) {
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                       granted = lqe->lqe_usage;
-               }
-               /* acquire as much as needed, but not more */
-               if (usage > granted) {
-                       qbody->qb_count  = usage - granted;
-                       qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
-               }
-               break;
-       case QSD_REP:
-               /* When reporting quota (during reintegration or on setquota
-                * glimpse), we should release granted space if usage is 0.
-                * Otherwise, if the usage is less than granted, we need to
-                * acquire the per-ID lock to make sure the unused grant can be
-                * reclaimed by per-ID lock glimpse. */
-               if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
-                       LQUOTA_DEBUG(lqe, "Release on report!");
-                       GOTO(again, op = QSD_REL);
-               } else if (lqe->lqe_usage == lqe->lqe_granted) {
-                       LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
-                                    "anything on report!");
-               } else if (lqe->lqe_usage < lqe->lqe_granted) {
-                       LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
-                       qbody->qb_count = 0;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
-               } else {
-                       LASSERT(lqe->lqe_usage > lqe->lqe_granted);
-                       LQUOTA_DEBUG(lqe, "Reporting usage");
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-               }
-               break;
-       case QSD_REL:
-               /* release unused quota space unconditionally */
-               if (lqe->lqe_granted > lqe->lqe_usage) {
-                       qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
-               }
-               break;
-       case QSD_ADJ: {
-               /* need to re-acquire per-ID lock or release all grant */
-               if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
-                   lqe->lqe_granted > lqe->lqe_usage)
-                       GOTO(again, op = QSD_REP);
-
-               /* release spare grant */
-               if (granted > usage + lqe->lqe_qunit) {
-                       /* pre-release quota space */
-                       qbody->qb_count  = granted - usage;
-                       /* if usage == 0, release all granted space */
-                       if (usage) {
-                               /* try to keep one qunit of quota space */
-                               qbody->qb_count -= lqe->lqe_qunit;
-                               /* but don't release less than qtune to avoid
-                                * releasing space too often */
-                               if (qbody->qb_count < lqe->lqe_qtune)
-                                       qbody->qb_count = lqe->lqe_qtune;
-                       }
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
-                       break;
-               }
 
-               /* if we overconsumed quota space, we report usage in request
-                * so that master can adjust it unconditionally */
-               if (lqe->lqe_usage > lqe->lqe_granted) {
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                       granted         = lqe->lqe_usage;
-               }
-
-               if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
-                   lustre_handle_is_used(&lqe->lqe_lockh) && usage > 0 &&
-                   lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
-                       /* To pre-acquire quota space, we report how much spare
-                        * quota space the slave currently owns, then the master
-                        * will grant us back how much we can pretend given the
-                        * current state of affairs */
-                       if (granted <= usage)
-                               qbody->qb_count = 0;
-                       else
-                               qbody->qb_count = granted - usage;
-                       qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
-               }
-               break;
+       /* if we overconsumed quota space, we report usage in request so that
+        * master can adjust it unconditionally */
+       if (lqe->lqe_usage > lqe->lqe_granted) {
+               qbody->qb_usage = lqe->lqe_usage;
+               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+               granted = lqe->lqe_usage;
        }
-       default:
-               CERROR("Invalid qsd operation:%u\n", op);
-               LBUG();
-               break;
+
+       /* acquire as much as needed, but not more */
+       if (usage > granted) {
+               qbody->qb_count  = usage - granted;
+               qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
        }
+
        return qbody->qb_flags != 0;
 }
 
-/*
- * Acquire/release quota space from master.
+/**
+ * Acquire quota space from master.
  * There are at most 1 in-flight dqacq/dqrel.
  *
  * \param env    - the environment passed by the caller
  * \param lqe    - is the qid entry to be processed
- * \param op     - operation that want to be performed by the caller
  *
- * \retval 0     - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write/create
- *         -ve          : other appropriate errors
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write/create
+ * \retval -EBUSY       - already a quota request in flight
+ * \retval -ve          - other appropriate errors
  */
-int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
-             enum qsd_ops op)
+static int qsd_acquire_remote(const struct lu_env *env,
+                             struct lquota_entry *lqe)
 {
        struct qsd_thread_info  *qti = qsd_info(env);
        struct quota_body       *qbody = &qti->qti_body;
        struct qsd_instance     *qsd;
        struct qsd_qtype_info   *qqi;
-       struct ldlm_lock        *lock;
        int                      rc;
-       bool                     intent = false, sync;
        ENTRY;
 
+       memset(qbody, 0, sizeof(*qbody));
+       rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+       if (rc)
+               RETURN(rc);
+
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
-       if (qsd->qsd_stopping) {
-               LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
-               /* Target is about to shut down, client will retry */
-               RETURN(-EINPROGRESS);
-       }
-
-       if (!qsd_ready(qsd)) {
-               LQUOTA_DEBUG(lqe, "Connection to master not ready");
-               RETURN(-ENOTCONN);
-       }
-
-       /* In most case, reintegration must have been triggered (when enable
-        * quota or on OST start), however, in rare race condition (enabling
-        * quota when starting OSTs), we might miss triggering reintegration
-        * for some qqi.
-        *
-        * If the previous reintegration failed for some reason, we'll
-        * re-trigger it here as well. */
-       if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
-               LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
-                            " off reintegration");
-               qsd_start_reint_thread(qqi);
-               RETURN(-EINPROGRESS);
-       }
-
-       LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
-
-       /* Fill the remote global lock handle, master will check this handle
-        * to see if the slave is sending request with stale lock */
-       cfs_read_lock(&qsd->qsd_lock);
-       lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
-       cfs_read_unlock(&qsd->qsd_lock);
-
-       if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
-               RETURN(-ENOLCK);
-
-       lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
-       if (lock == NULL)
-               RETURN(-ENOLCK);
-       lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
-       LDLM_LOCK_PUT(lock);
-
-       /* We allow only one in-flight dqacq/dqrel for specified qid, if
-        * there is already in-flight dqacq/dqrel:
-        *
-        * - For QSD_ADJ: we should just abort it, since local limit is going
-        *   to be changed soon;
-        * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
-        *   finished, and return success to the caller. The caller is
-        *   responsible for retrying;
-        * - For QSD_REP: we should just abort it, since slave has already
-        *   acquired/released grant; */
-       sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
-       LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
-                lqe->lqe_pending_req);
-
        lqe_write_lock(lqe);
-       if (lqe->lqe_pending_req != 0) {
-               struct l_wait_info lwi = { 0 };
 
+       /* is quota really enforced for this id? */
+       if (!lqe->lqe_enforced) {
                lqe_write_unlock(lqe);
-               if (!sync) {
-                       LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
-                       RETURN(0);
-               }
-
-               LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
-               l_wait_event(lqe->lqe_waiters,
-                            !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
-                            &lwi);
+               LQUOTA_DEBUG(lqe, "quota not enforced any more");
                RETURN(0);
        }
 
        /* fill qb_count & qb_flags */
-       if (!qsd_calc_space(lqe, op, qbody)) {
+       if (!qsd_calc_acquire(lqe, qbody)) {
                lqe_write_unlock(lqe);
-               LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
+               LQUOTA_DEBUG(lqe, "No acquire required");
                RETURN(0);
        }
-       lqe->lqe_pending_req++;
-       lqe_write_unlock(lqe);
 
-       /* fill other quota body fields */
-       qbody->qb_fid = qqi->qqi_fid;
-       qbody->qb_id  = lqe->lqe_id;
-       memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
-       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+       /* check whether an acquire request completed recently */
+       if (lqe->lqe_acq_rc != 0 &&
+           lqe->lqe_acq_time > ktime_get_seconds() - 1) {
+               lqe_write_unlock(lqe);
+               LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
+               RETURN(lqe->lqe_acq_rc);
+       }
+
+       /* only 1 quota request in flight for a given ID is allowed */
+       rc = qsd_request_enter(lqe);
+       if (rc) {
+               lqe_write_unlock(lqe);
+               RETURN(rc);
+       }
+
+       lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+       lqe_write_unlock(lqe);
 
        /* hold a refcount until completion */
        lqe_getref(lqe);
 
-       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
-               /* check whether we already own a lock for this ID */
-               lqe_read_lock(lqe);
-               lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
-               lqe_read_unlock(lqe);
-
-               rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
-               if (rc) {
-                       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
-                       if (req_is_preacq(qbody->qb_flags)) {
-                               if (req_has_rep(qbody->qb_flags))
-                                       /* still want to report usage */
-                                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                               else
-                                       /* no pre-acquire if no per-ID lock */
-                                       GOTO(out, rc = -ENOLCK);
-                       } else {
-                               /* no lock found, should use intent */
-                               intent = true;
-                       }
-               } else if (req_is_acq(qbody->qb_flags) &&
-                          qbody->qb_count == 0) {
-                       /* found cached lock, no need to acquire */
-                       GOTO(out, rc = 0);
-               }
-       }
+       /* fill other quota body fields */
+       qbody->qb_fid = qqi->qqi_fid;
+       qbody->qb_id  = lqe->lqe_id;
 
-       if (!intent) {
-               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
-                                   qsd_dqacq_completion, qqi, &qti->qti_lockh,
-                                   lqe);
-        } else {
-               union ldlm_wire_lvb *lvb;
+       /* check whether we already own a valid lock for this ID */
+       rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+       if (rc) {
+               struct lquota_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
-               if (lvb == NULL)
-                       GOTO(out, rc = -ENOMEM);
-
-               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
-                                    IT_QUOTA_DQACQ, qsd_dqacq_completion,
+               if (lvb == NULL) {
+                       rc = -ENOMEM;
+                       qsd_req_completion(env, qqi, qbody, NULL,
+                                          &qti->qti_lockh, NULL, lqe, rc);
+                       RETURN(rc);
+               }
+               /* no lock found, should use intent */
+               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
+                                    IT_QUOTA_DQACQ, qsd_req_completion,
                                     qqi, lvb, (void *)lqe);
+       } else {
+               /* lock found, should use regular dqacq */
+               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
+                                   qsd_req_completion, qqi, &qti->qti_lockh,
+                                   lqe);
        }
+
        /* the completion function will be called by qsd_send_dqacq or
         * qsd_intent_lock */
        RETURN(rc);
-out:
-       qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
-                            rc);
-       return rc;
 }
 
-/*
+/**
+ * Acquire \a space of quota space in order to complete an operation.
+ * Try to consume local quota space first and send acquire request to quota
+ * master if required.
+ *
+ * \param env   - the environment passed by the caller
+ * \param lqe   - is the qid entry to be processed
+ * \param space - is the amount of quota required for the operation
+ * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
+ *
+ * \retval true  - stop waiting in wait_event_idle_timeout,
+ *                 and real return value in \a ret
+ * \retval false - continue waiting
+ */
+static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
+                       long long space, int *ret)
+{
+       int rc = 0, count;
+       int wait_pending = 0;
+       struct qsd_qtype_info *qqi = lqe2qqi(lqe);
+
+       ENTRY;
+
+       for (count = 0; rc == 0; count++) {
+               LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
+again:
+               if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
+                       rc = -EINPROGRESS;
+                       break;
+               }
+
+               /* refresh disk usage */
+               rc = qsd_refresh_usage(env, lqe);
+               if (rc)
+                       break;
+
+               /* try to consume local quota space first */
+               rc = qsd_acquire_local(lqe, space);
+               if (rc != -EAGAIN)
+                       /* rc == 0, Wouhou! enough local quota space
+                        * rc < 0, something bad happened */
+                        break;
+               /*
+                * There might be a window that commit transaction
+                * have updated usage but pending write doesn't change
+                * wait for it before acquiring remotely.
+                */
+               if (lqe->lqe_pending_write >= space && !wait_pending) {
+                       wait_pending = 1;
+                       dt_sync(env, qqi->qqi_qsd->qsd_dev);
+                       goto again;
+               }
+
+               /* if we have gotten some quota and stil wait more quota,
+                * it's better to give QMT some time to reclaim from clients */
+               if (count > 0)
+                       schedule_timeout_interruptible(cfs_time_seconds(1));
+
+               /* need to acquire more quota space from master */
+               rc = qsd_acquire_remote(env, lqe);
+       }
+
+       if (rc == -EBUSY)
+               /* already a request in flight, continue waiting */
+               RETURN(false);
+       *ret = rc;
+       RETURN(true);
+}
+
+/**
  * Quota enforcement handler. If local quota can satisfy this operation,
  * return success, otherwise, acquire more quota from master.
  * (for write operation, if master isn't available at this moment, return
@@ -594,17 +696,18 @@ out:
  * \param flags - if the operation is write, return caller no user/group
  *                and sync commit flags
  *
- * \retval 0        - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write
- *         -ve          : other appropriate errors
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
-                        int *flags)
+                        enum osd_quota_local_flags *local_flags)
 {
        struct lquota_entry *lqe;
-       int                  rc = 0, retry_cnt;
+       enum osd_quota_local_flags qtype_flag = 0;
+       int rc, ret = -EINPROGRESS;
        ENTRY;
 
        if (qid->lqi_qentry != NULL) {
@@ -630,114 +733,125 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                 * quota space. That said, we still want to perform space
                 * adjustments in qsd_op_end, so we return here, but with
                 * a reference on the lqe */
-               if (flags != NULL) {
+               if (local_flags != NULL) {
                        rc = qsd_refresh_usage(env, lqe);
                        GOTO(out_flags, rc);
                }
                RETURN(0);
        }
 
-       LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
+       LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
 
        lqe_write_lock(lqe);
        lqe->lqe_waiting_write += space;
        lqe_write_unlock(lqe);
 
-       for (retry_cnt = 0; rc == 0; retry_cnt++) {
-               /* refresh disk usage if required */
-               rc = qsd_refresh_usage(env, lqe);
-               if (rc)
-                       break;
-
-               /* try to consume local quota space */
-               rc = qsd_acquire_local(lqe, space);
-               if (rc != -EAGAIN)
-                       /* rc == 0, Wouhou! enough local quota space
-                        * rc < 0, something bad happened */
-                       break;
+       /* acquire quota space for the operation, cap overall wait time to
+        * prevent a service thread from being stuck for too long */
+       rc = wait_event_idle_timeout(
+               lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+               cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
 
-               /* need to acquire more quota space from master, this is done
-                * synchronously */
-               rc = qsd_dqacq(env, lqe, QSD_ACQ);
-               LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
-                            retry_cnt, rc);
-       }
-
-       if (rc == 0) {
+       if (rc > 0 && ret == 0) {
                qid->lqi_space += space;
+               rc = 0;
        } else {
-               LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
+               if (rc > 0)
+                       rc = ret;
+               else if (rc == 0)
+                       rc = -ETIMEDOUT;
+
+               LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
 
-               if (flags && lqe->lqe_pending_write != 0)
+               if (local_flags && lqe->lqe_pending_write != 0)
                        /* Inform OSD layer that there are pending writes.
                         * It might want to retry after a sync if appropriate */
-                        *flags |= QUOTA_FL_SYNC;
+                        *local_flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
-               /* convert recoverable error into -EINPROGRESS, and client will
-                * retry write on -EINPROGRESS. */
+               /* convert recoverable error into -EINPROGRESS, client will
+                * retry */
                if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
-                   rc == -EAGAIN || rc == -EINTR)
+                   rc == -EAGAIN || rc == -EINTR) {
                        rc = -EINPROGRESS;
+               } else if (rc == -ESRCH) {
+                       rc = 0;
+                       LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
+                                    "probably due to a legeal race, if this "
+                                    "message is showing up constantly, there "
+                                    "could be some inconsistence between "
+                                    "master & slave, and quota reintegration "
+                                    "needs be re-triggered.");
+               }
        }
 
-       if (flags != NULL) {
+       if (local_flags != NULL) {
 out_flags:
                LASSERT(qid->lqi_is_blk);
                if (rc != 0) {
-                       *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                       *local_flags |= lquota_over_fl(qqi->qqi_qtype);
                } else {
                        __u64   usage;
 
                        lqe_read_lock(lqe);
-                       usage  = lqe->lqe_usage;
-                       usage += lqe->lqe_pending_write;
+                       usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+                       /* There is a chance to successfully grant more quota
+                        * but get edquot flag through glimpse. */
+                       if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+                          (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold)))
+                               usage += qqi->qqi_qsd->qsd_sync_threshold;
 
+                       usage += lqe->lqe_usage;
+
+                       qtype_flag = lquota_over_fl(qqi->qqi_qtype);
                        /* if we should notify client to start sync write */
                        if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
-                               *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *local_flags |= qtype_flag;
                        else
-                               *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *local_flags &= ~qtype_flag;
                        lqe_read_unlock(lqe);
                }
        }
        RETURN(rc);
 }
 
+/**
+ * helper function comparing two lquota_id_info structures
+ */
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
 
-/*
+/**
  * Enforce quota, it's called in the declaration of each operation.
  * qsd_op_end() will then be called later once all the operations have been
  * completed in order to release/adjust the quota space.
  *
- * \param env        - the environment passed by the caller
- * \param qsd        - is the qsd instance associated with the device in charge
- *                     of the operation.
- * \param trans      - is the quota transaction information
- * \param qi         - qid & space required by current operation
- * \param flags      - if the operation is write, return caller no user/group
- *                     and sync commit flags
- *
- * \retval 0        - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write
- *         -ve          : other appropriate errors
+ * \param env   - the environment passed by the caller
+ * \param qsd   - is the qsd instance associated with the device in charge of
+ *                the operation.
+ * \param trans - is the quota transaction information
+ * \param qi    - qid & space required by current operation
+ * \param flags - if the operation is write, return caller no user/group and
+ *                sync commit flags
+ *
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
-                int *flags)
+                enum osd_quota_local_flags *local_flags)
 {
        int     i, rc;
        bool    found = false;
@@ -746,13 +860,16 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN(0);
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN(0);
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        /* ignore block quota on MDTs, ignore inode quota on OSTs */
        if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
@@ -761,18 +878,18 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* ignore quota enforcement request when:
         *    - quota isn't enforced for this quota type
-        * or - the user/group is root */
-       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
+        * or - the user/group is root
+        * or - quota accounting isn't enabled */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+           (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
                RETURN(0);
 
-       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
+       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
                 trans->lqt_id_cnt);
        /* check whether we already allocated a slot for this id */
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
@@ -793,12 +910,130 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* manage quota enforcement for this ID */
        rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
-                          &trans->lqt_ids[i], qi->lqi_space, flags);
+                          &trans->lqt_ids[i], qi->lqi_space, local_flags);
        RETURN(rc);
 }
 EXPORT_SYMBOL(qsd_op_begin);
 
 /**
+ * Adjust quota space (by acquiring or releasing) hold by the quota slave.
+ * This function is called after each quota request completion and during
+ * reintegration in order to report usage or re-acquire quota locks.
+ * Space adjustment is aborted if there is already a quota request in flight
+ * for this ID.
+ *
+ * \param env    - the environment passed by the caller
+ * \param lqe    - is the qid entry to be processed
+ *
+ * \retval 0 on success, appropriate errors on failure
+ */
+int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct quota_body       *qbody = &qti->qti_body;
+       struct qsd_instance     *qsd;
+       struct qsd_qtype_info   *qqi;
+       int                      rc;
+       bool                     intent = false;
+       ENTRY;
+
+       memset(qbody, 0, sizeof(*qbody));
+       rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+       if (rc) {
+               /* add to adjust list again to trigger adjustment later when
+                * slave is ready */
+               LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
+               qsd_adjust_schedule(lqe, true, false);
+               RETURN(0);
+       }
+
+       qqi = lqe2qqi(lqe);
+       qsd = qqi->qqi_qsd;
+
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
+       lqe_write_lock(lqe);
+
+       /* fill qb_count & qb_flags */
+       if (!qsd_calc_adjust(lqe, qbody)) {
+               lqe_write_unlock(lqe);
+               LQUOTA_DEBUG(lqe, "no adjustment required");
+               RETURN(0);
+       }
+
+       /* only 1 quota request in flight for a given ID is allowed */
+       rc = qsd_request_enter(lqe);
+       if (rc) {
+               /* already a request in flight, space adjustment will be run
+                * again on request completion */
+               lqe_write_unlock(lqe);
+               RETURN(0);
+       }
+
+       if (req_is_rel(qbody->qb_flags))
+               lqe->lqe_pending_rel = qbody->qb_count;
+       lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+       lqe_write_unlock(lqe);
+
+       /* hold a refcount until completion */
+       lqe_getref(lqe);
+
+       /* fill other quota body fields */
+       qbody->qb_fid = qqi->qqi_fid;
+       qbody->qb_id  = lqe->lqe_id;
+
+       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+               /* check whether we own a valid lock for this ID */
+               rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+               if (rc) {
+                       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+                       if (req_is_preacq(qbody->qb_flags)) {
+                               if (req_has_rep(qbody->qb_flags))
+                                       /* still want to report usage */
+                                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                               else
+                                       /* no pre-acquire if no per-ID lock */
+                                       GOTO(out, rc = -ENOLCK);
+                       } else {
+                               /* no lock found, should use intent */
+                               intent = true;
+                       }
+               } else if (req_is_acq(qbody->qb_flags) &&
+                          qbody->qb_count == 0) {
+                       /* found cached lock, no need to acquire */
+                       GOTO(out, rc = 0);
+               }
+       } else {
+               /* release and report don't need a per-ID lock */
+               memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+       }
+
+       if (!intent) {
+               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
+                                   qsd_req_completion, qqi, &qti->qti_lockh,
+                                   lqe);
+       } else {
+               struct lquota_lvb *lvb;
+
+               OBD_ALLOC_PTR(lvb);
+               if (lvb == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
+                                    IT_QUOTA_DQACQ, qsd_req_completion,
+                                    qqi, lvb, (void *)lqe);
+       }
+       /* the completion function will be called by qsd_send_dqacq or
+        * qsd_intent_lock */
+       RETURN(rc);
+out:
+       qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
+                          rc);
+       return rc;
+}
+
+/**
  * Post quota operation, pre-acquire/release quota from master.
  *
  * \param  env  - the environment passed by the caller
@@ -840,7 +1075,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
        if (adjust) {
                /* pre-acquire/release quota space is needed */
                if (env != NULL)
-                       qsd_dqacq(env, lqe, QSD_ADJ);
+                       qsd_adjust(env, lqe);
                else
                        /* no suitable environment, handle adjustment in
                         * separate thread context */
@@ -850,7 +1085,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
        EXIT;
 }
 
-/*
+/**
  * Post quota operation. It's called after each operation transaction stopped.
  *
  * \param  env   - the environment passed by the caller
@@ -871,13 +1106,16 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN_EXIT;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN_EXIT;
+
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        LASSERT(trans != NULL);
 
@@ -898,8 +1136,88 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
 }
 EXPORT_SYMBOL(qsd_op_end);
 
-void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
-                     union lquota_id *qid, int qtype)
+/* Simple wrapper on top of qsd API which implement quota transfer for osd
+ * setattr needs. As a reminder, only the root user can change ownership of
+ * a file, that's why EDQUOT & EINPROGRESS errors are discarded
+ */
+int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
+                struct lquota_trans *trans, unsigned int qtype,
+                u64 orig_id, u64 new_id, u64 bspace,
+                struct lquota_id_info *qi)
+{
+       int rc;
+
+       if (unlikely(!qsd))
+               return 0;
+
+       LASSERT(qtype < LL_MAXQUOTAS);
+       if (qtype == PRJQUOTA)
+               if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
+                       return -EINVAL;
+
+       qi->lqi_type = qtype;
+
+       /* inode accounting */
+       qi->lqi_is_blk = false;
+
+       /* one more inode for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space = 1;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and one less inode for the current id */
+       qi->lqi_id.qid_uid = orig_id;
+       qi->lqi_space = -1;
+       /* can't get EDQUOT when reducing usage */
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* block accounting */
+       qi->lqi_is_blk = true;
+
+       /* more blocks for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space = bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and finally less blocks for the current owner */
+       qi->lqi_id.qid_uid = orig_id;
+       qi->lqi_space = -bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       /* can't get EDQUOT when reducing usage */
+       if (rc == -EINPROGRESS)
+               rc = 0;
+       return rc;
+}
+EXPORT_SYMBOL(qsd_transfer);
+
+/**
+ * Trigger pre-acquire/release if necessary.
+ * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
+ * quota accounting isn't updated when the transaction stopped. Instead, it'll
+ * be updated on the final iput, so qsd_op_adjust() will be called then (in
+ * osd_object_delete()) to trigger quota release if necessary.
+ *
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the device in charge
+ *              of the operation.
+ * \param qid - is the lquota ID of the user/group for which to trigger
+ *              quota space adjustment
+ * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
+ */
+void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
+                  union lquota_id *qid, int qtype)
 {
        struct lquota_entry    *lqe;
        struct qsd_qtype_info  *qqi;
@@ -910,12 +1228,12 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
                RETURN_EXIT;
 
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        qqi = qsd->qsd_type_array[qtype];
        LASSERT(qqi);
@@ -924,16 +1242,16 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
            qid->qid_uid == 0)
                RETURN_EXIT;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        lqe = lqe_locate(env, qqi->qqi_site, qid);
        if (IS_ERR(lqe)) {
-               CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
+               CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
                       qsd->qsd_svname, qid->qid_uid, qtype);
                RETURN_EXIT;
        }
@@ -945,9 +1263,104 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
        lqe_read_unlock(lqe);
 
        if (adjust)
-               qsd_dqacq(env, lqe, QSD_ADJ);
+               qsd_adjust(env, lqe);
 
        lqe_putref(lqe);
        EXIT;
 }
-EXPORT_SYMBOL(qsd_adjust_quota);
+EXPORT_SYMBOL(qsd_op_adjust);
+
+/**
+ * Reserve or free quota.
+ *
+ * Currently, It's used to reserve quota space before changing the file's group
+ * for normal user and free the reserved quota after the group change.
+ *
+ * \param env     - the environment passed by the caller
+ * \param qsd     - is the qsd instance associated with the device in charge of
+ *                  the operation.
+ * \param qi      - qid & space required by current operation
+ *
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
+ */
+int qsd_reserve_or_free_quota(const struct lu_env *env,
+                             struct qsd_instance *qsd,
+                             struct lquota_id_info *qi)
+{
+       struct lquota_entry *lqe;
+       struct qsd_qtype_info  *qqi;
+       int rc = 0;
+       bool is_free = qi->lqi_space < 0;
+
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               RETURN(0);
+
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
+       if (is_free)
+               qi->lqi_space *= -1;
+
+       /* We don't enforce quota until the qsd_instance is started */
+       read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               read_unlock(&qsd->qsd_lock);
+               RETURN(0);
+       }
+       read_unlock(&qsd->qsd_lock);
+
+       qqi = qsd->qsd_type_array[qi->lqi_type];
+       LASSERT(qqi);
+
+       CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
+              qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
+              (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
+              "succeed", is_free, qi->lqi_space);
+
+       /* ignore quota enforcement request when:
+        *    - quota isn't enforced for this quota type
+        * or - the user/group is root
+        * or - quota accounting isn't enabled
+        */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+           (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
+               RETURN(0);
+
+       if (is_free) {
+               /* look up lquota entry associated with qid */
+               lqe = lqe_locate(env, qqi->qqi_site, &qi->lqi_id);
+               if (IS_ERR(lqe))
+                       RETURN(PTR_ERR(lqe));
+               if (!lqe->lqe_enforced) {
+                       lqe_putref(lqe);
+                       RETURN(0);
+               }
+
+               qi->lqi_qentry = lqe;
+
+               /* lqe will be put in qsd_op_end0 */
+               qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
+               qi->lqi_qentry = NULL;
+       } else {
+               /* manage quota enforcement for this ID */
+               rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
+                                  qi->lqi_space, NULL);
+
+               if (qi->lqi_qentry != NULL) {
+                       lqe_putref(qi->lqi_qentry);
+                       qi->lqi_qentry = NULL;
+               }
+       }
+
+       CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
+              is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
+              qi->lqi_id.qid_gid, qi->lqi_space);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(qsd_reserve_or_free_quota);