Whamcloud - gitweb
LU-2211 quota: cap how long a thread can wait for quota
authorJohann Lombardi <johann.lombardi@intel.com>
Fri, 19 Oct 2012 13:59:12 +0000 (15:59 +0200)
committerOleg Drokin <green@whamcloud.com>
Tue, 30 Oct 2012 19:56:45 +0000 (15:56 -0400)
Change qsd_op_begin() path to wait for quota space for less than
obd_timeout / 2.
This patch also abandons the qsd_ops enum in favor of a more generic
qsd_adjust() implementation which will always do the same processing
even if adjustment is delayed because of a quota request in flight.

Signed-off-by: Johann Lombard <johann.lombardi@intel.com>
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I5faf637c5330ca7f503c292e0e28edb84458ee89
Reviewed-on: http://review.whamcloud.com/4314
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
lustre/doc/osd-api.txt
lustre/include/lustre_quota.h
lustre/osd-ldiskfs/osd_handler.c
lustre/quota/qsd_handler.c
lustre/quota/qsd_internal.h
lustre/quota/qsd_lib.c
lustre/quota/qsd_lock.c
lustre/quota/qsd_reint.c
lustre/quota/qsd_request.c
lustre/quota/qsd_writeback.c
lustre/tests/sanity-quota.sh

index a039e45..f423a52 100644 (file)
@@ -1211,8 +1211,8 @@ int qsd_op_begin(const struct lu_env *, struct qsd_instance *,
                  struct lquota_trans *, struct lquota_id_info *, int *);
 void qsd_op_end(const struct lu_env *, struct qsd_instance *,
                 struct lquota_trans *);
                  struct lquota_trans *, struct lquota_id_info *, int *);
 void qsd_op_end(const struct lu_env *, struct qsd_instance *,
                 struct lquota_trans *);
-void qsd_adjust_quota(const struct lu_env *, struct qsd_instance *,
-                      union lquota_id *, int);
+void qsd_op_adjust(const struct lu_env *, struct qsd_instance *,
+                   union lquota_id *, int);
 
 qsd_init
        The OSD module should first allocate a qsd instance via qsd_init.
 
 qsd_init
        The OSD module should first allocate a qsd instance via qsd_init.
@@ -1252,11 +1252,11 @@ qsd_op_end
        each time a new operation is declared, qsd_op_end should be called only
        once for the whole transaction.
 
        each time a new operation is declared, qsd_op_end should be called only
        once for the whole transaction.
 
-qsd_adjust_quota
+qsd_op_adjust
        Trigger pre-acquire/release if necessary, it's only used for ldiskfs osd
        so far. When unlink a file in ldiskfs, the quota accounting isn't
        updated when the transaction stopped. Instead, it'll be updated on the
        Trigger pre-acquire/release if necessary, it's only used for ldiskfs osd
        so far. When unlink a file in ldiskfs, the quota accounting isn't
        updated when the transaction stopped. Instead, it'll be updated on the
-       final iput, so qsd_adjust_quota() will be called then (in
+       final iput, so qsd_op_adjust() will be called then (in
        osd_object_delete()) to trigger quota release if necessary.
 
 Appendix 1. A brief note on Lustre configuration.
        osd_object_delete()) to trigger quota release if necessary.
 
 Appendix 1. A brief note on Lustre configuration.
index 131b7d9..8925380 100644 (file)
@@ -161,7 +161,7 @@ struct qsd_instance;
  *                 operation is declared, qsd_op_end() should be called only
  *                 once for the whole transaction.
  *
  *                 operation is declared, qsd_op_end() should be called only
  *                 once for the whole transaction.
  *
- * - qsd_adjust_quota(): triggers pre-acquire/release if necessary.
+ * - qsd_op_adjust(): triggers pre-acquire/release if necessary.
  *
  * Below are the function prototypes to be used by OSD layer to manage quota
  * enforcement. Arguments are documented where each function is defined.  */
  *
  * Below are the function prototypes to be used by OSD layer to manage quota
  * enforcement. Arguments are documented where each function is defined.  */
@@ -175,8 +175,8 @@ int qsd_op_begin(const struct lu_env *, struct qsd_instance *,
                 struct lquota_trans *, struct lquota_id_info *, int *);
 void qsd_op_end(const struct lu_env *, struct qsd_instance *,
                struct lquota_trans *);
                 struct lquota_trans *, struct lquota_id_info *, int *);
 void qsd_op_end(const struct lu_env *, struct qsd_instance *,
                struct lquota_trans *);
-void qsd_adjust_quota(const struct lu_env *, struct qsd_instance *,
-                     union lquota_id *, int);
+void qsd_op_adjust(const struct lu_env *, struct qsd_instance *,
+                  union lquota_id *, int);
 
 /*
  * Quota information attached to a transaction
 
 /*
  * Quota information attached to a transaction
index 602e4ab..819d963 100644 (file)
@@ -865,10 +865,10 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
                        /* Release granted quota to master if necessary */
                        qi->lqi_id.qid_uid = uid;
 
                        /* Release granted quota to master if necessary */
                        qi->lqi_id.qid_uid = uid;
-                       qsd_adjust_quota(env, qsd, &qi->lqi_id, USRQUOTA);
+                       qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
 
                        qi->lqi_id.qid_uid = gid;
 
                        qi->lqi_id.qid_uid = gid;
-                       qsd_adjust_quota(env, qsd, &qi->lqi_id, GRPQUOTA);
+                       qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
                }
         }
 }
                }
         }
 }
index 5bdb0cf..68803d0 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
-#include <linux/version.h>
-#include <linux/fs.h>
-#include <asm/unistd.h>
-#include <linux/quotaops.h>
-#include <linux/init.h>
-
-#include <obd_class.h>
-#include <lustre_param.h>
-#include <lprocfs_status.h>
-
 #include "qsd_internal.h"
 
 #include "qsd_internal.h"
 
-/*
- * helper function returning how much space is currently reserved for requests
- * in flight.
+/**
+ * helper function bumping lqe_pending_req if there is no quota request in
+ * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
  */
  */
-static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
+static inline int qsd_request_enter(struct lquota_entry *lqe)
 {
 {
-       int     pending;
+       /* is there already a quota request in flight? */
+       if (lqe->lqe_pending_req != 0) {
+               LQUOTA_DEBUG(lqe, "already a request in flight");
+               return -EBUSY;
+       }
 
 
-       lqe_read_lock(lqe);
-       pending = lqe->lqe_pending_req;
-       lqe_read_unlock(lqe);
+       if (lqe->lqe_pending_rel != 0) {
+               LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
+                            lqe->lqe_pending_rel);
+               LBUG();
+       }
 
 
-       return pending;
+       lqe->lqe_pending_req++;
+       return 0;
 }
 
 }
 
-/*
- * helper function returning true when the connection to master is ready to be
- * used.
+/**
+ * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
  */
  */
-static inline int qsd_ready(struct qsd_instance *qsd)
+static inline void qsd_request_exit(struct lquota_entry *lqe)
 {
 {
+       if (lqe->lqe_pending_req != 1) {
+               LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
+               LBUG();
+       }
+       lqe->lqe_pending_req--;
+       lqe->lqe_pending_rel = 0;
+       cfs_waitq_broadcast(&lqe->lqe_waiters);
+}
+
+/**
+ * Check whether a qsd instance is all set to send quota request to master.
+ * This includes checking whether:
+ * - the connection to master is set up and usable,
+ * - the qsd isn't stopping
+ * - reintegration has been successfully completed and all indexes are
+ *   up-to-date
+ *
+ * \param lqe - is the lquota entry for which we would like to send an quota
+ *              request
+ * \param lockh - is the remote handle of the global lock returned on success
+ *
+ * \retval 0 on success, appropriate error on failure
+ */
+static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
+{
+       struct qsd_qtype_info   *qqi = lqe2qqi(lqe);
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
        struct obd_import       *imp = NULL;
        struct obd_import       *imp = NULL;
+       struct ldlm_lock        *lock;
+       ENTRY;
 
        cfs_read_lock(&qsd->qsd_lock);
 
        cfs_read_lock(&qsd->qsd_lock);
+       /* is the qsd about to shut down? */
+       if (qsd->qsd_stopping) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
+               /* Target is about to shut down, client will retry */
+               RETURN(-EINPROGRESS);
+       }
+
+       /* is the connection to the quota master ready? */
        if (qsd->qsd_exp_valid)
                imp = class_exp2cliimp(qsd->qsd_exp);
        if (qsd->qsd_exp_valid)
                imp = class_exp2cliimp(qsd->qsd_exp);
+       if (imp == NULL || imp->imp_invalid) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "connection to master not ready");
+               RETURN(-ENOTCONN);
+       }
+
+       /* In most case, reintegration must have been triggered (when enable
+        * quota or on OST start), however, in rare race condition (enabling
+        * quota when starting OSTs), we might miss triggering reintegration
+        * for some qqi.
+        *
+        * If the previous reintegration failed for some reason, we'll
+        * re-trigger it here as well. */
+       if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
+                            "kicking off reintegration");
+               qsd_start_reint_thread(qqi);
+               RETURN(-EINPROGRESS);
+       }
+
+       /* Fill the remote global lock handle, master will check this handle
+        * to see if the slave is sending request with stale lock */
+       lustre_handle_copy(lockh, &qqi->qqi_lockh);
        cfs_read_unlock(&qsd->qsd_lock);
 
        cfs_read_unlock(&qsd->qsd_lock);
 
-       return (imp == NULL || imp->imp_invalid) ? false : true;
+       if (!lustre_handle_is_used(lockh))
+               RETURN(-ENOLCK);
+
+       lock = ldlm_handle2lock(lockh);
+       if (lock == NULL)
+               RETURN(-ENOLCK);
+
+       /* return remote lock handle to be packed in quota request */
+       lustre_handle_copy(lockh, &lock->l_remote_handle);
+       LDLM_LOCK_PUT(lock);
+
+       RETURN(0);
 }
 
 }
 
-/*
- * Helper function returning true when quota space need to be adjusted (some
- * unused space should be free or pre-acquire) and false otherwise.
+/**
+ * Check whether any quota space adjustment (pre-acquire/release/report) is
+ * needed for a given quota ID. If a non-null \a qbody is passed, then the
+ * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
+ * to be packed in the quota request.
+ *
+ * \param lqe   - is the lquota entry for which we would like to adjust quota
+ *                space.
+ * \param qbody - is the quota body to fill, if not NULL.
+ *
+ * \retval true  - space adjustment is required and \a qbody is filled, if not
+ *                 NULL
+ * \retval false - no space adjustment required
  */
  */
-static bool qsd_adjust_needed(struct lquota_entry *lqe)
+static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
 {
 {
-       struct qsd_qtype_info   *qqi;
-       __u64                    usage, granted;
+       __u64   usage, granted;
+       ENTRY;
 
 
-       qqi = lqe2qqi(lqe);
+       usage   = lqe->lqe_usage;
+       usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
+       granted = lqe->lqe_granted;
 
 
-       if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
-               /* if quota isn't enforced for this id, no need to adjust
-                * Similarly, no need to perform adjustment if the target is in
-                * the process of shutting down. */
-               return false;
+       if (qbody != NULL)
+               qbody->qb_flags = 0;
+
+       if (!lqe->lqe_enforced) {
+               /* quota not enforced any more for this ID */
+               if (granted != 0) {
+                       /* release all quota space unconditionally */
+                       LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
+                       if (qbody != NULL) {
+                               qbody->qb_count = granted;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+                       }
+                       RETURN(true);
+               }
+               RETURN(false);
+       }
 
 
-       usage  = lqe->lqe_usage;
-       usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
-       granted = lqe->lqe_granted - lqe->lqe_pending_rel;
+       if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
+               /* No valid per-ID lock
+                * When reporting quota (during reintegration or on setquota
+                * glimpse), we should release granted space if usage is 0.
+                * Otherwise, if the usage is less than granted, we need to
+                * acquire the per-ID lock to make sure the unused grant can be
+                * reclaimed by per-ID lock glimpse. */
+               if (usage == 0) {
+                       /* no on-disk usage and no outstanding activity, release
+                        * space */
+                       if (granted != 0) {
+                               LQUOTA_DEBUG(lqe, "no usage, releasing all "
+                                            "space");
+                               if (qbody != NULL) {
+                                       qbody->qb_count = granted;
+                                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+                               }
+                               RETURN(true);
+                       }
+                       LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
+                                    "do");
+                       RETURN(false);
+               }
 
 
-       /* need to re-acquire per-ID lock or release all grant */
-       if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
-           lqe->lqe_granted > lqe->lqe_usage)
-               return true;
+               if (lqe->lqe_usage < lqe->lqe_granted) {
+                       /* holding quota space w/o any lock, enqueue per-ID lock
+                        * again */
+                       LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
+                       if (qbody != NULL) {
+                               qbody->qb_count = 0;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
+                       }
+                       RETURN(true);
+               }
 
 
-       /* good old quota qunit adjustment logic which has been around since
-        * lustre 1.4:
-        * 1. Need to release some space? */
-       if (granted > usage + lqe->lqe_qunit)
-               return true;
+               if (lqe->lqe_usage > lqe->lqe_granted) {
+                       /* quota overrun, report usage */
+                       LQUOTA_DEBUG(lqe, "overrun, reporting usage");
+                       if (qbody != NULL) {
+                               qbody->qb_usage = lqe->lqe_usage;
+                               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                       }
+                       RETURN(true);
+               }
+               LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
+               RETURN(false);
+       }
+
+       /* valid per-ID lock
+        * Apply good old quota qunit adjustment logic which has been around
+        * since lustre 1.4:
+        * 1. release spare quota space? */
+       if (granted > usage + lqe->lqe_qunit) {
+               /* pre-release quota space */
+               if (qbody == NULL)
+                       RETURN(true);
+               qbody->qb_count = granted - usage;
+               /* if usage == 0, release all granted space */
+               if (usage) {
+                       /* try to keep one qunit of quota space */
+                       qbody->qb_count -= lqe->lqe_qunit;
+                       /* but don't release less than qtune to avoid releasing
+                        * space too often */
+                       if (qbody->qb_count < lqe->lqe_qtune)
+                               qbody->qb_count = lqe->lqe_qtune;
+               }
+               qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+               RETURN(true);
+       }
 
        /* 2. Any quota overrun? */
 
        /* 2. Any quota overrun? */
-       if (lqe->lqe_usage > lqe->lqe_granted)
-               /* we ended up consuming more than we own, we need to have this
-                * fixed ASAP */
-               return true;
+       if (lqe->lqe_usage > lqe->lqe_granted) {
+               /* we overconsumed quota space, we report usage in request so
+                * that master can adjust it unconditionally */
+               if (qbody == NULL)
+                       RETURN(true);
+               qbody->qb_usage = lqe->lqe_usage;
+               granted         = lqe->lqe_usage;
+               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+       }
 
        /* 3. Time to pre-acquire? */
 
        /* 3. Time to pre-acquire? */
-       if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
-           granted < usage + lqe->lqe_qtune)
-               /* need to pre-acquire some space if we don't want to block
-                * client's requests */
-               return true;
+       if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
+           lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
+               /* To pre-acquire quota space, we report how much spare quota
+                * space the slave currently owns, then the master will grant us
+                * back how much we can pretend given the current state of
+                * affairs */
+               if (qbody == NULL)
+                       RETURN(true);
+               if (granted <= usage)
+                       qbody->qb_count = 0;
+               else
+                       qbody->qb_count = granted - usage;
+               qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
+               RETURN(true);
+       }
 
 
-       return false;
+       if (qbody != NULL)
+               RETURN(qbody->qb_flags != 0);
+       else
+               RETURN(false);
 }
 
 }
 
-/*
+/**
+ * Helper function returning true when quota space need to be adjusted (some
+ * unused space should be free or pre-acquire) and false otherwise.
+ */
+static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
+{
+       return qsd_calc_adjust(lqe, NULL);
+}
+
+/**
  * Callback function called when an acquire/release request sent to the master
  * is completed
  */
  * Callback function called when an acquire/release request sent to the master
  * is completed
  */
-static void qsd_dqacq_completion(const struct lu_env *env,
-                                struct qsd_qtype_info *qqi,
-                                struct quota_body *reqbody,
-                                struct quota_body *repbody,
-                                struct lustre_handle *lockh,
-                                union ldlm_wire_lvb *lvb,
-                                void *arg, int ret)
+static void qsd_req_completion(const struct lu_env *env,
+                              struct qsd_qtype_info *qqi,
+                              struct quota_body *reqbody,
+                              struct quota_body *repbody,
+                              struct lustre_handle *lockh,
+                              union ldlm_wire_lvb *lvb,
+                              void *arg, int ret)
 {
        struct lquota_entry     *lqe = (struct lquota_entry *)arg;
        struct qsd_thread_info  *qti;
 {
        struct lquota_entry     *lqe = (struct lquota_entry *)arg;
        struct qsd_thread_info  *qti;
@@ -157,7 +331,6 @@ static void qsd_dqacq_completion(const struct lu_env *env,
        qti = qsd_info(env);
 
        lqe_write_lock(lqe);
        qti = qsd_info(env);
 
        lqe_write_lock(lqe);
-
        LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
                     reqbody->qb_flags);
 
        LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
                     reqbody->qb_flags);
 
@@ -235,12 +408,9 @@ static void qsd_dqacq_completion(const struct lu_env *env,
 out:
        adjust = qsd_adjust_needed(lqe);
 out_noadjust:
 out:
        adjust = qsd_adjust_needed(lqe);
 out_noadjust:
-       lqe->lqe_pending_req--;
-       lqe->lqe_pending_rel = 0;
+       qsd_request_exit(lqe);
        lqe_write_unlock(lqe);
 
        lqe_write_unlock(lqe);
 
-       cfs_waitq_broadcast(&lqe->lqe_waiters);
-
        /* release reference on per-ID lock */
        if (lustre_handle_is_used(lockh))
                ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
        /* release reference on per-ID lock */
        if (lustre_handle_is_used(lockh))
                ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
@@ -253,15 +423,23 @@ out_noadjust:
                else
                        qsd_adjust_schedule(lqe, true, false);
        }
                else
                        qsd_adjust_schedule(lqe, true, false);
        }
+       lqe_putref(lqe);
 
        if (lvb)
 
        if (lvb)
-               /* free lvb allocated in qsd_dqacq */
                OBD_FREE_PTR(lvb);
                OBD_FREE_PTR(lvb);
-
-       lqe_putref(lqe);
        EXIT;
 }
 
        EXIT;
 }
 
+/**
+ * Try to consume local quota space.
+ *
+ * \param lqe   - is the qid entry to be processed
+ * \param space - is the amount of quota space needed to complete the operation
+ *
+ * \retval 0       - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EAGAIN - need to acquire space from master
+ */
 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
 {
        __u64   usage;
 static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
 {
        __u64   usage;
@@ -270,7 +448,7 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
 
        if (!lqe->lqe_enforced)
                /* not enforced any more, we are good */
 
        if (!lqe->lqe_enforced)
                /* not enforced any more, we are good */
-               RETURN(0);
+               RETURN(-ESRCH);
 
        lqe_write_lock(lqe);
        /* use latest usage */
 
        lqe_write_lock(lqe);
        /* use latest usage */
@@ -293,294 +471,189 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
-                          struct quota_body *qbody)
+/**
+ * Compute how much quota space should be acquire from the master based
+ * on how much is currently granted to this slave and pending/waiting
+ * operations.
+ *
+ * \param lqe - is the lquota entry for which we would like to adjust quota
+ *              space.
+ * \param qbody - is the quota body of the acquire request to fill
+ *
+ * \retval true  - space acquisition is needed and qbody is filled
+ * \retval false - no space acquisition required
+ */
+static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
+                                   struct quota_body *qbody)
 {
 {
-       struct qsd_qtype_info   *qqi;
-       __u64                    usage, granted;
+       __u64   usage, granted;
 
 
-       if (!lqe->lqe_enforced && op != QSD_REL)
-               return 0;
-
-       qqi = lqe2qqi(lqe);
-
-       LASSERT(lqe->lqe_pending_rel == 0);
        usage   = lqe->lqe_usage;
        usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
        granted = lqe->lqe_granted;
 
        qbody->qb_flags = 0;
        usage   = lqe->lqe_usage;
        usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
        granted = lqe->lqe_granted;
 
        qbody->qb_flags = 0;
-again:
-       switch (op) {
-       case QSD_ACQ:
-               /* if we overconsumed quota space, we report usage in request
-                * so that master can adjust it unconditionally */
-               if (lqe->lqe_usage > lqe->lqe_granted) {
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                       granted = lqe->lqe_usage;
-               }
-               /* acquire as much as needed, but not more */
-               if (usage > granted) {
-                       qbody->qb_count  = usage - granted;
-                       qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
-               }
-               break;
-       case QSD_REP:
-               /* When reporting quota (during reintegration or on setquota
-                * glimpse), we should release granted space if usage is 0.
-                * Otherwise, if the usage is less than granted, we need to
-                * acquire the per-ID lock to make sure the unused grant can be
-                * reclaimed by per-ID lock glimpse. */
-               if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
-                       LQUOTA_DEBUG(lqe, "Release on report!");
-                       GOTO(again, op = QSD_REL);
-               } else if (lqe->lqe_usage == lqe->lqe_granted) {
-                       LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
-                                    "anything on report!");
-               } else if (lqe->lqe_usage < lqe->lqe_granted) {
-                       LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
-                       qbody->qb_count = 0;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
-               } else {
-                       LASSERT(lqe->lqe_usage > lqe->lqe_granted);
-                       LQUOTA_DEBUG(lqe, "Reporting usage");
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-               }
-               break;
-       case QSD_REL:
-               /* release unused quota space unconditionally */
-               if (lqe->lqe_granted > lqe->lqe_usage) {
-                       qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
-               }
-               break;
-       case QSD_ADJ: {
-               /* need to re-acquire per-ID lock or release all grant */
-               if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
-                   lqe->lqe_granted > lqe->lqe_usage)
-                       GOTO(again, op = QSD_REP);
-
-               /* release spare grant */
-               if (granted > usage + lqe->lqe_qunit) {
-                       /* pre-release quota space */
-                       qbody->qb_count  = granted - usage;
-                       /* if usage == 0, release all granted space */
-                       if (usage) {
-                               /* try to keep one qunit of quota space */
-                               qbody->qb_count -= lqe->lqe_qunit;
-                               /* but don't release less than qtune to avoid
-                                * releasing space too often */
-                               if (qbody->qb_count < lqe->lqe_qtune)
-                                       qbody->qb_count = lqe->lqe_qtune;
-                       }
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
-                       break;
-               }
 
 
-               /* if we overconsumed quota space, we report usage in request
-                * so that master can adjust it unconditionally */
-               if (lqe->lqe_usage > lqe->lqe_granted) {
-                       qbody->qb_usage = lqe->lqe_usage;
-                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                       granted         = lqe->lqe_usage;
-               }
-
-               if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
-                   lustre_handle_is_used(&lqe->lqe_lockh) && usage > 0 &&
-                   lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
-                       /* To pre-acquire quota space, we report how much spare
-                        * quota space the slave currently owns, then the master
-                        * will grant us back how much we can pretend given the
-                        * current state of affairs */
-                       if (granted <= usage)
-                               qbody->qb_count = 0;
-                       else
-                               qbody->qb_count = granted - usage;
-                       qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
-               }
-               break;
+       /* if we overconsumed quota space, we report usage in request so that
+        * master can adjust it unconditionally */
+       if (lqe->lqe_usage > lqe->lqe_granted) {
+               qbody->qb_usage = lqe->lqe_usage;
+               qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+               granted = lqe->lqe_usage;
        }
        }
-       default:
-               CERROR("Invalid qsd operation:%u\n", op);
-               LBUG();
-               break;
+
+       /* acquire as much as needed, but not more */
+       if (usage > granted) {
+               qbody->qb_count  = usage - granted;
+               qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
        }
        }
+
        return qbody->qb_flags != 0;
 }
 
        return qbody->qb_flags != 0;
 }
 
-/*
- * Acquire/release quota space from master.
+/**
+ * Acquire quota space from master.
  * There are at most 1 in-flight dqacq/dqrel.
  *
  * \param env    - the environment passed by the caller
  * \param lqe    - is the qid entry to be processed
  * There are at most 1 in-flight dqacq/dqrel.
  *
  * \param env    - the environment passed by the caller
  * \param lqe    - is the qid entry to be processed
- * \param op     - operation that want to be performed by the caller
  *
  *
- * \retval 0     - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write/create
- *         -ve          : other appropriate errors
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write/create
+ * \retval -EBUSY       - already a quota request in flight
+ * \retval -ve          - other appropriate errors
  */
  */
-int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
-             enum qsd_ops op)
+static int qsd_acquire_remote(const struct lu_env *env,
+                             struct lquota_entry *lqe)
 {
        struct qsd_thread_info  *qti = qsd_info(env);
        struct quota_body       *qbody = &qti->qti_body;
        struct qsd_instance     *qsd;
        struct qsd_qtype_info   *qqi;
 {
        struct qsd_thread_info  *qti = qsd_info(env);
        struct quota_body       *qbody = &qti->qti_body;
        struct qsd_instance     *qsd;
        struct qsd_qtype_info   *qqi;
-       struct ldlm_lock        *lock;
        int                      rc;
        int                      rc;
-       bool                     intent = false, sync;
        ENTRY;
 
        ENTRY;
 
+       memset(qbody, 0, sizeof(*qbody));
+       rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+       if (rc)
+               RETURN(rc);
+
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
-       if (qsd->qsd_stopping) {
-               LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
-               /* Target is about to shut down, client will retry */
-               RETURN(-EINPROGRESS);
-       }
-
-       if (!qsd_ready(qsd)) {
-               LQUOTA_DEBUG(lqe, "Connection to master not ready");
-               RETURN(-ENOTCONN);
-       }
-
-       /* In most case, reintegration must have been triggered (when enable
-        * quota or on OST start), however, in rare race condition (enabling
-        * quota when starting OSTs), we might miss triggering reintegration
-        * for some qqi.
-        *
-        * If the previous reintegration failed for some reason, we'll
-        * re-trigger it here as well. */
-       if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
-               LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
-                            " off reintegration");
-               qsd_start_reint_thread(qqi);
-               RETURN(-EINPROGRESS);
-       }
-
-       LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
-
-       /* Fill the remote global lock handle, master will check this handle
-        * to see if the slave is sending request with stale lock */
-       cfs_read_lock(&qsd->qsd_lock);
-       lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
-       cfs_read_unlock(&qsd->qsd_lock);
-
-       if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
-               RETURN(-ENOLCK);
-
-       lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
-       if (lock == NULL)
-               RETURN(-ENOLCK);
-       lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
-       LDLM_LOCK_PUT(lock);
-
-       /* We allow only one in-flight dqacq/dqrel for specified qid, if
-        * there is already in-flight dqacq/dqrel:
-        *
-        * - For QSD_ADJ: we should just abort it, since local limit is going
-        *   to be changed soon;
-        * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
-        *   finished, and return success to the caller. The caller is
-        *   responsible for retrying;
-        * - For QSD_REP: we should just abort it, since slave has already
-        *   acquired/released grant; */
-       sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
-       LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
-                lqe->lqe_pending_req);
-
        lqe_write_lock(lqe);
        lqe_write_lock(lqe);
-       if (lqe->lqe_pending_req != 0) {
-               struct l_wait_info lwi = { 0 };
 
 
+       /* is quota really enforced for this id? */
+       if (!lqe->lqe_enforced) {
                lqe_write_unlock(lqe);
                lqe_write_unlock(lqe);
-               if (!sync) {
-                       LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
-                       RETURN(0);
-               }
-
-               LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
-               l_wait_event(lqe->lqe_waiters,
-                            !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
-                            &lwi);
+               LQUOTA_DEBUG(lqe, "quota not enforced any more");
                RETURN(0);
        }
 
        /* fill qb_count & qb_flags */
                RETURN(0);
        }
 
        /* fill qb_count & qb_flags */
-       if (!qsd_calc_space(lqe, op, qbody)) {
+       if (!qsd_calc_acquire(lqe, qbody)) {
                lqe_write_unlock(lqe);
                lqe_write_unlock(lqe);
-               LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
+               LQUOTA_DEBUG(lqe, "No acquire required");
                RETURN(0);
        }
                RETURN(0);
        }
-       lqe->lqe_pending_req++;
-       lqe_write_unlock(lqe);
 
 
-       /* fill other quota body fields */
-       qbody->qb_fid = qqi->qqi_fid;
-       qbody->qb_id  = lqe->lqe_id;
-       memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
-       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+       /* only 1 quota request in flight for a given ID is allowed */
+       rc = qsd_request_enter(lqe);
+       if (rc) {
+               lqe_write_unlock(lqe);
+               RETURN(rc);
+       }
+
+       lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+       lqe_write_unlock(lqe);
 
        /* hold a refcount until completion */
        lqe_getref(lqe);
 
 
        /* hold a refcount until completion */
        lqe_getref(lqe);
 
-       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
-               /* check whether we already own a lock for this ID */
-               lqe_read_lock(lqe);
-               lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
-               lqe_read_unlock(lqe);
-
-               rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
-               if (rc) {
-                       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
-                       if (req_is_preacq(qbody->qb_flags)) {
-                               if (req_has_rep(qbody->qb_flags))
-                                       /* still want to report usage */
-                                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
-                               else
-                                       /* no pre-acquire if no per-ID lock */
-                                       GOTO(out, rc = -ENOLCK);
-                       } else {
-                               /* no lock found, should use intent */
-                               intent = true;
-                       }
-               } else if (req_is_acq(qbody->qb_flags) &&
-                          qbody->qb_count == 0) {
-                       /* found cached lock, no need to acquire */
-                       GOTO(out, rc = 0);
-               }
-       }
+       /* fill other quota body fields */
+       qbody->qb_fid = qqi->qqi_fid;
+       qbody->qb_id  = lqe->lqe_id;
 
 
-       if (!intent) {
-               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
-                                   qsd_dqacq_completion, qqi, &qti->qti_lockh,
-                                   lqe);
-        } else {
+       /* check whether we already own a valid lock for this ID */
+       rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+       if (rc) {
                union ldlm_wire_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
                union ldlm_wire_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
-               if (lvb == NULL)
-                       GOTO(out, rc = -ENOMEM);
-
-               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
-                                    IT_QUOTA_DQACQ, qsd_dqacq_completion,
+               if (lvb == NULL) {
+                       rc = -ENOMEM;
+                       qsd_req_completion(env, qqi, qbody, NULL,
+                                          &qti->qti_lockh, NULL, lqe, rc);
+                       RETURN(rc);
+               }
+               /* no lock found, should use intent */
+               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
+                                    IT_QUOTA_DQACQ, qsd_req_completion,
                                     qqi, lvb, (void *)lqe);
                                     qqi, lvb, (void *)lqe);
+       } else {
+               /* lock found, should use regular dqacq */
+               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
+                                   qsd_req_completion, qqi, &qti->qti_lockh,
+                                   lqe);
        }
        }
+
        /* the completion function will be called by qsd_send_dqacq or
         * qsd_intent_lock */
        RETURN(rc);
        /* the completion function will be called by qsd_send_dqacq or
         * qsd_intent_lock */
        RETURN(rc);
-out:
-       qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
-                            rc);
-       return rc;
 }
 
 }
 
-/*
+/**
+ * Acquire \a space of quota space in order to complete an operation.
+ * Try to consume local quota space first and send acquire request to quota
+ * master if required.
+ *
+ * \param env   - the environment passed by the caller
+ * \param lqe   - is the qid entry to be processed
+ * \param space - is the amount of quota required for the operation
+ * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
+ *
+ * \retval true  - exit from l_wait_event and real return value in \a ret
+ * \retval false - continue waiting
+ */
+static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
+                       long long space, int *ret)
+{
+       int rc = 0, count;
+       ENTRY;
+
+       for (count = 0; rc == 0; count++) {
+               LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
+
+               if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
+                       rc = -EINPROGRESS;
+                       break;
+               }
+
+               /* refresh disk usage */
+               rc = qsd_refresh_usage(env, lqe);
+               if (rc)
+                       break;
+
+               /* try to consume local quota space first */
+               rc = qsd_acquire_local(lqe, space);
+               if (rc != -EAGAIN)
+                       /* rc == 0, Wouhou! enough local quota space
+                        * rc < 0, something bad happened */
+                        break;
+
+               /* need to acquire more quota space from master */
+               rc = qsd_acquire_remote(env, lqe);
+       }
+
+       if (rc == -EBUSY)
+               /* already a request in flight, continue waiting */
+               RETURN(false);
+       *ret = rc;
+       RETURN(true); /* exit from l_wait_event */
+}
+
+/**
  * Quota enforcement handler. If local quota can satisfy this operation,
  * return success, otherwise, acquire more quota from master.
  * (for write operation, if master isn't available at this moment, return
  * Quota enforcement handler. If local quota can satisfy this operation,
  * return success, otherwise, acquire more quota from master.
  * (for write operation, if master isn't available at this moment, return
@@ -594,17 +667,18 @@ out:
  * \param flags - if the operation is write, return caller no user/group
  *                and sync commit flags
  *
  * \param flags - if the operation is write, return caller no user/group
  *                and sync commit flags
  *
- * \retval 0        - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write
- *         -ve          : other appropriate errors
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
                         int *flags)
 {
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
                         int *flags)
 {
-       struct lquota_entry *lqe;
-       int                  rc = 0, retry_cnt;
+       struct lquota_entry     *lqe;
+       int                      rc, ret = -EINPROGRESS;
+       struct l_wait_info       lwi;
        ENTRY;
 
        if (qid->lqi_qentry != NULL) {
        ENTRY;
 
        if (qid->lqi_qentry != NULL) {
@@ -643,30 +717,20 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
        lqe->lqe_waiting_write += space;
        lqe_write_unlock(lqe);
 
        lqe->lqe_waiting_write += space;
        lqe_write_unlock(lqe);
 
-       for (retry_cnt = 0; rc == 0; retry_cnt++) {
-               /* refresh disk usage if required */
-               rc = qsd_refresh_usage(env, lqe);
-               if (rc)
-                       break;
-
-               /* try to consume local quota space */
-               rc = qsd_acquire_local(lqe, space);
-               if (rc != -EAGAIN)
-                       /* rc == 0, Wouhou! enough local quota space
-                        * rc < 0, something bad happened */
-                       break;
-
-               /* need to acquire more quota space from master, this is done
-                * synchronously */
-               rc = qsd_dqacq(env, lqe, QSD_ACQ);
-               LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
-                            retry_cnt, rc);
-       }
+       /* acquire quota space for the operation, cap overall wait time to
+        * prevent a service thread from being stuck for too long */
+       lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
+                         NULL, NULL);
+       rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+                         &lwi);
 
 
-       if (rc == 0) {
+       if (rc == 0 && ret == 0) {
                qid->lqi_space += space;
        } else {
                qid->lqi_space += space;
        } else {
-               LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
+               if (rc == 0)
+                       rc = ret;
+
+               LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
@@ -677,11 +741,20 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         *flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
                         *flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
-               /* convert recoverable error into -EINPROGRESS, and client will
-                * retry write on -EINPROGRESS. */
+               /* convert recoverable error into -EINPROGRESS, client will
+                * retry */
                if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
                if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
-                   rc == -EAGAIN || rc == -EINTR)
+                   rc == -EAGAIN || rc == -EINTR) {
                        rc = -EINPROGRESS;
                        rc = -EINPROGRESS;
+               } else if (rc == -ESRCH) {
+                       rc = 0;
+                       LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
+                                    "probably due to a legeal race, if this "
+                                    "message is showing up constantly, there "
+                                    "could be some inconsistence between "
+                                    "master & slave, and quota reintegration "
+                                    "needs be re-triggered.");
+               }
        }
 
        if (flags != NULL) {
        }
 
        if (flags != NULL) {
@@ -709,6 +782,9 @@ out_flags:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/**
+ * helper function comparing two lquota_id_info structures
+ */
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
@@ -717,23 +793,23 @@ static inline bool qid_equal(struct lquota_id_info *q1,
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
 
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
 
-/*
+/**
  * Enforce quota, it's called in the declaration of each operation.
  * qsd_op_end() will then be called later once all the operations have been
  * completed in order to release/adjust the quota space.
  *
  * Enforce quota, it's called in the declaration of each operation.
  * qsd_op_end() will then be called later once all the operations have been
  * completed in order to release/adjust the quota space.
  *
- * \param env        - the environment passed by the caller
- * \param qsd        - is the qsd instance associated with the device in charge
- *                     of the operation.
- * \param trans      - is the quota transaction information
- * \param qi         - qid & space required by current operation
- * \param flags      - if the operation is write, return caller no user/group
- *                     and sync commit flags
+ * \param env   - the environment passed by the caller
+ * \param qsd   - is the qsd instance associated with the device in charge of
+ *                the operation.
+ * \param trans - is the quota transaction information
+ * \param qi    - qid & space required by current operation
+ * \param flags - if the operation is write, return caller no user/group and
+ *                sync commit flags
  *
  *
- * \retval 0        - success
- * \retval -EDQUOT      : out of quota
- *         -EINPROGRESS : inform client to retry write
- *         -ve          : other appropriate errors
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
@@ -799,6 +875,116 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 EXPORT_SYMBOL(qsd_op_begin);
 
 /**
 EXPORT_SYMBOL(qsd_op_begin);
 
 /**
+ * Adjust quota space (by acquiring or releasing) hold by the quota slave.
+ * This function is called after each quota request completion and during
+ * reintegration in order to report usage or re-acquire quota locks.
+ * Space adjustment is aborted if there is already a quota request in flight
+ * for this ID.
+ *
+ * \param env    - the environment passed by the caller
+ * \param lqe    - is the qid entry to be processed
+ *
+ * \retval 0 on success, appropriate errors on failure
+ */
+int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct quota_body       *qbody = &qti->qti_body;
+       struct qsd_instance     *qsd;
+       struct qsd_qtype_info   *qqi;
+       int                      rc;
+       bool                     intent = false;
+       ENTRY;
+
+       memset(qbody, 0, sizeof(*qbody));
+       rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+       if (rc)
+               RETURN(rc);
+
+       qqi = lqe2qqi(lqe);
+       qsd = qqi->qqi_qsd;
+
+       lqe_write_lock(lqe);
+
+       /* fill qb_count & qb_flags */
+       if (!qsd_calc_adjust(lqe, qbody)) {
+               lqe_write_unlock(lqe);
+               LQUOTA_DEBUG(lqe, "No adjust required");
+               RETURN(0);
+       }
+
+       /* only 1 quota request in flight for a given ID is allowed */
+       rc = qsd_request_enter(lqe);
+       if (rc) {
+               /* already a request in flight, space adjustment will be run
+                * again on request completion */
+               lqe_write_unlock(lqe);
+               RETURN(0);
+       }
+
+       if (req_is_rel(qbody->qb_flags))
+               lqe->lqe_pending_rel = qbody->qb_count;
+       lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+       lqe_write_unlock(lqe);
+
+       /* hold a refcount until completion */
+       lqe_getref(lqe);
+
+       /* fill other quota body fields */
+       qbody->qb_fid = qqi->qqi_fid;
+       qbody->qb_id  = lqe->lqe_id;
+
+       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+               /* check whether we own a valid lock for this ID */
+               rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+               if (rc) {
+                       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+                       if (req_is_preacq(qbody->qb_flags)) {
+                               if (req_has_rep(qbody->qb_flags))
+                                       /* still want to report usage */
+                                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                               else
+                                       /* no pre-acquire if no per-ID lock */
+                                       GOTO(out, rc = -ENOLCK);
+                       } else {
+                               /* no lock found, should use intent */
+                               intent = true;
+                       }
+               } else if (req_is_acq(qbody->qb_flags) &&
+                          qbody->qb_count == 0) {
+                       /* found cached lock, no need to acquire */
+                       GOTO(out, rc = 0);
+               }
+       } else {
+               /* release and report don't need a per-ID lock */
+               memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+       }
+
+       if (!intent) {
+               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
+                                   qsd_req_completion, qqi, &qti->qti_lockh,
+                                   lqe);
+       } else {
+               union ldlm_wire_lvb *lvb;
+
+               OBD_ALLOC_PTR(lvb);
+               if (lvb == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
+                                    IT_QUOTA_DQACQ, qsd_req_completion,
+                                    qqi, lvb, (void *)lqe);
+       }
+       /* the completion function will be called by qsd_send_dqacq or
+        * qsd_intent_lock */
+       RETURN(rc);
+out:
+       qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
+                          rc);
+       return rc;
+}
+
+/**
  * Post quota operation, pre-acquire/release quota from master.
  *
  * \param  env  - the environment passed by the caller
  * Post quota operation, pre-acquire/release quota from master.
  *
  * \param  env  - the environment passed by the caller
@@ -840,7 +1026,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
        if (adjust) {
                /* pre-acquire/release quota space is needed */
                if (env != NULL)
        if (adjust) {
                /* pre-acquire/release quota space is needed */
                if (env != NULL)
-                       qsd_dqacq(env, lqe, QSD_ADJ);
+                       qsd_adjust(env, lqe);
                else
                        /* no suitable environment, handle adjustment in
                         * separate thread context */
                else
                        /* no suitable environment, handle adjustment in
                         * separate thread context */
@@ -850,7 +1036,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
        EXIT;
 }
 
        EXIT;
 }
 
-/*
+/**
  * Post quota operation. It's called after each operation transaction stopped.
  *
  * \param  env   - the environment passed by the caller
  * Post quota operation. It's called after each operation transaction stopped.
  *
  * \param  env   - the environment passed by the caller
@@ -898,8 +1084,22 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
 }
 EXPORT_SYMBOL(qsd_op_end);
 
 }
 EXPORT_SYMBOL(qsd_op_end);
 
-void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
-                     union lquota_id *qid, int qtype)
+/**
+ * Trigger pre-acquire/release if necessary.
+ * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
+ * quota accounting isn't updated when the transaction stopped. Instead, it'll
+ * be updated on the final iput, so qsd_op_adjust() will be called then (in
+ * osd_object_delete()) to trigger quota release if necessary.
+ *
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the device in charge
+ *              of the operation.
+ * \param qid - is the lquota ID of the user/group for which to trigger
+ *              quota space adjustment
+ * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
+ */
+void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
+                  union lquota_id *qid, int qtype)
 {
        struct lquota_entry    *lqe;
        struct qsd_qtype_info  *qqi;
 {
        struct lquota_entry    *lqe;
        struct qsd_qtype_info  *qqi;
@@ -945,9 +1145,9 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
        lqe_read_unlock(lqe);
 
        if (adjust)
        lqe_read_unlock(lqe);
 
        if (adjust)
-               qsd_dqacq(env, lqe, QSD_ADJ);
+               qsd_adjust(env, lqe);
 
        lqe_putref(lqe);
        EXIT;
 }
 
        lqe_putref(lqe);
        EXIT;
 }
-EXPORT_SYMBOL(qsd_adjust_quota);
+EXPORT_SYMBOL(qsd_op_adjust);
index 4b8cde5..c640a69 100644 (file)
@@ -56,10 +56,10 @@ struct qsd_instance {
        cfs_proc_dir_entry_t    *qsd_proc;
 
        /* export used for the connection to quota master */
        cfs_proc_dir_entry_t    *qsd_proc;
 
        /* export used for the connection to quota master */
-       struct obd_export     *qsd_exp;
+       struct obd_export       *qsd_exp;
 
        /* ldlm namespace used for quota locks */
 
        /* ldlm namespace used for quota locks */
-       struct ldlm_namespace *qsd_ns;
+       struct ldlm_namespace   *qsd_ns;
 
        /* on-disk directory where to store index files for this qsd instance */
        struct dt_object        *qsd_root;
 
        /* on-disk directory where to store index files for this qsd instance */
        struct dt_object        *qsd_root;
@@ -102,6 +102,11 @@ struct qsd_instance {
         * should be sync. b=16642 */
        unsigned long            qsd_sync_threshold;
 
         * should be sync. b=16642 */
        unsigned long            qsd_sync_threshold;
 
+       /* how long a service thread can wait for quota space.
+        * value dynamically computed from obd_timeout and at_max if not
+        * enforced here (via procfs) */
+       int                      qsd_timeout;
+
        unsigned long            qsd_is_md:1,    /* managing quota for mdt */
                                 qsd_started:1,  /* instance is now started */
                                 qsd_prepared:1, /* qsd_prepare() successfully
        unsigned long            qsd_is_md:1,    /* managing quota for mdt */
                                 qsd_started:1,  /* instance is now started */
                                 qsd_prepared:1, /* qsd_prepare() successfully
@@ -217,14 +222,6 @@ static inline void qqi_putref(struct qsd_qtype_info *qqi)
        cfs_atomic_dec(&qqi->qqi_ref);
 }
 
        cfs_atomic_dec(&qqi->qqi_ref);
 }
 
-/* all kind of operations supported by qsd_dqacq() */
-enum qsd_ops {
-       QSD_ADJ, /* adjust quota space based on current qunit */
-       QSD_ACQ, /* acquire space for requests */
-       QSD_REL, /* release all space quota space uncondionnally */
-       QSD_REP, /* report space usage during reintegration */
-};
-
 #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
 
 /* udpate record for slave & global index copy */
 #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
 
 /* udpate record for slave & global index copy */
@@ -320,6 +317,15 @@ static inline void qsd_set_qunit(struct lquota_entry *lqe, __u64 qunit)
 
 #define QSD_WB_INTERVAL        60 /* 60 seconds */
 
 
 #define QSD_WB_INTERVAL        60 /* 60 seconds */
 
+/* helper function calculating how long a service thread should be waiting for
+ * quota space */
+static inline int qsd_wait_timeout(struct qsd_instance *qsd)
+{
+       if (qsd->qsd_timeout != 0)
+               return qsd->qsd_timeout;
+       return min_t(int, at_max / 2, obd_timeout / 2);
+}
+
 /* qsd_entry.c */
 extern struct lquota_entry_operations qsd_lqe_ops;
 int qsd_refresh_usage(const struct lu_env *, struct lquota_entry *);
 /* qsd_entry.c */
 extern struct lquota_entry_operations qsd_lqe_ops;
 int qsd_refresh_usage(const struct lu_env *, struct lquota_entry *);
@@ -366,8 +372,7 @@ void qsd_put_fsinfo(struct qsd_fsinfo *);
 int qsd_process_config(struct lustre_cfg *);
 
 /* qsd_handler.c */
 int qsd_process_config(struct lustre_cfg *);
 
 /* qsd_handler.c */
-int qsd_dqacq(const struct lu_env *, struct lquota_entry *, enum qsd_ops);
-__u64 qsd_calc_grants(struct lquota_entry *, __u64, __u32);
+int qsd_adjust(const struct lu_env *, struct lquota_entry *);
 
 /* qsd_writeback.c */
 void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *,
 
 /* qsd_writeback.c */
 void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *,
index 5a8c7b0..2514a63 100644 (file)
@@ -174,10 +174,37 @@ static int lprocfs_qsd_wr_force_reint(struct file *file, const char *buffer,
        return rc == 0 ? count : rc;
 }
 
        return rc == 0 ? count : rc;
 }
 
+static int lprocfs_qsd_rd_timeout(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       LASSERT(qsd != NULL);
+
+       return snprintf(page, count, "%d\n", qsd_wait_timeout(qsd));
+}
+
+static int lprocfs_qsd_wr_timeout(struct file *file, const char *buffer,
+                                 unsigned long count, void *data)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       int                      timeout, rc;
+       LASSERT(qsd != NULL);
+
+       rc = lprocfs_write_helper(buffer, count, &timeout);
+       if (rc)
+               return rc;
+       if (timeout < 0)
+               return -EINVAL;
+
+       qsd->qsd_timeout = timeout;
+       return count;
+}
+
 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
        { "info", lprocfs_qsd_rd_state, 0, 0},
        { "enabled", lprocfs_qsd_rd_enabled, 0, 0},
        { "force_reint", 0, lprocfs_qsd_wr_force_reint, 0},
 static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
        { "info", lprocfs_qsd_rd_state, 0, 0},
        { "enabled", lprocfs_qsd_rd_enabled, 0, 0},
        { "force_reint", 0, lprocfs_qsd_wr_force_reint, 0},
+       { "timeout", lprocfs_qsd_rd_timeout, lprocfs_qsd_wr_timeout, 0},
        { NULL }
 };
 
        { NULL }
 };
 
index ae916c7..f49abb6 100644 (file)
@@ -339,7 +339,7 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de
                 * which means there could be a short window that slave is
                 * holding spare grant wihtout per-ID lock. */
                if (rel)
                 * which means there could be a short window that slave is
                 * holding spare grant wihtout per-ID lock. */
                if (rel)
-                       rc = qsd_dqacq(env, lqe, QSD_REL);
+                       rc = qsd_adjust(env, lqe);
 
                /* release lqe reference grabbed by qsd_id_ast_data_get() */
                lqe_putref(lqe);
 
                /* release lqe reference grabbed by qsd_id_ast_data_get() */
                lqe_putref(lqe);
index 936117e..85c97cb 100644 (file)
@@ -344,38 +344,32 @@ static int qsd_reconciliation(const struct lu_env *env,
 
                lqe = lqe_locate(env, qqi->qqi_site, qid);
                if (IS_ERR(lqe)) {
 
                lqe = lqe_locate(env, qqi->qqi_site, qid);
                if (IS_ERR(lqe)) {
-                       CWARN("%s: Fail to locate lqe. "DFID", %ld\n",
+                       CWARN("%s: failed to locate lqe. "DFID", %ld\n",
                              qsd->qsd_svname, PFID(&qqi->qqi_fid),
                              PTR_ERR(lqe));
                        GOTO(out, rc = PTR_ERR(lqe));
                }
 
                              qsd->qsd_svname, PFID(&qqi->qqi_fid),
                              PTR_ERR(lqe));
                        GOTO(out, rc = PTR_ERR(lqe));
                }
 
-               if (!lqe->lqe_enforced) {
-                       lqe_putref(lqe);
-                       goto next;
-               }
-
                rc = qsd_refresh_usage(env, lqe);
                if (rc) {
                rc = qsd_refresh_usage(env, lqe);
                if (rc) {
-                       CWARN("%s: Fail to get usage. "DFID", %d\n",
+                       CWARN("%s: failed to get usage. "DFID", %d\n",
                              qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
                        lqe_putref(lqe);
                        GOTO(out, rc);
                }
 
                              qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
                        lqe_putref(lqe);
                        GOTO(out, rc);
                }
 
-               rc = qsd_dqacq(env, lqe, QSD_REP);
+               rc = qsd_adjust(env, lqe);
                lqe_putref(lqe);
                lqe_putref(lqe);
-
                if (rc) {
                if (rc) {
-                       CWARN("%s: Fail to report quota. "DFID", %d\n",
+                       CWARN("%s: failed to report quota. "DFID", %d\n",
                              qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
                        GOTO(out, rc);
                }
 next:
                rc = iops->next(env, it);
                if (rc < 0)
                              qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
                        GOTO(out, rc);
                }
 next:
                rc = iops->next(env, it);
                if (rc < 0)
-                       CWARN("%s: Error next "DFID". %d\n", qsd->qsd_svname,
-                             PFID(&qqi->qqi_fid), rc);
+                       CWARN("%s: failed to parse index, ->next error:%d "DFID
+                             "\n", qsd->qsd_svname, rc, PFID(&qqi->qqi_fid));
        } while (rc == 0);
 
        /* reach the end */
        } while (rc == 0);
 
        /* reach the end */
index 485e2ce..9697bbe 100644 (file)
@@ -325,7 +325,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
 
        RETURN(rc);
 out:
 
        RETURN(rc);
 out:
-       completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
+       completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
        return rc;
 }
 
        return rc;
 }
 
index 6eb8931..546bd37 100644 (file)
@@ -276,12 +276,12 @@ static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
                rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
                if (rc)
                        GOTO(out, rc);
                rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
                if (rc)
                        GOTO(out, rc);
+               /* refresh usage */
+               qsd_refresh_usage(env, lqe);
                /* Report usage asynchronously */
                /* Report usage asynchronously */
-               if (lqe->lqe_enforced &&
-                   !qsd_refresh_usage(env, lqe)) {
-                       rc = qsd_dqacq(env, lqe, QSD_REP);
-                       LQUOTA_DEBUG(lqe, "Report usage. rc:%d", rc);
-               }
+               rc = qsd_adjust(env, lqe);
+               if (rc)
+                       LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
        }
 
        rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
        }
 
        rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
@@ -440,7 +440,7 @@ static int qsd_upd_thread(void *arg)
                                if (lqe->lqe_adjust_time == 0)
                                        qsd_id_lock_cancel(env, lqe);
                                else
                                if (lqe->lqe_adjust_time == 0)
                                        qsd_id_lock_cancel(env, lqe);
                                else
-                                       qsd_dqacq(env, lqe, QSD_ADJ);
+                                       qsd_adjust(env, lqe);
                        }
 
                        lqe_putref(lqe);
                        }
 
                        lqe_putref(lqe);
index ced8a28..366bbdf 100644 (file)
@@ -118,12 +118,8 @@ resetquota() {
                error "resetquota: wrong specifier $1 passed"
 
        $LFS setquota "$1" "$2" -b 0 -B 0 -i 0 -I 0 $MOUNT
                error "resetquota: wrong specifier $1 passed"
 
        $LFS setquota "$1" "$2" -b 0 -B 0 -i 0 -I 0 $MOUNT
-       # The usage should be 0 now, enforce quota to trigger quota
-       # release for each slave, sleep 3 seconds to wait for the release
-       # done, then we can make sure granted quota for each slave is 0.
-       $LFS setquota "$1" "$2" -b 0 -B 1 -i 0 -I 1 $MOUNT
-       sleep 3
-       $LFS setquota "$1" "$2" -b 0 -B 0 -i 0 -I 0 $MOUNT
+       # give a chance to slave to release space
+       sleep 1
 }
 
 quota_scan() {
 }
 
 quota_scan() {
@@ -855,8 +851,9 @@ test_6() {
        $RUNAS $DD of=$TESTFILE count=$LIMIT seek=1 oflag=sync conv=notrunc &
        DDPID=$!
 
        $RUNAS $DD of=$TESTFILE count=$LIMIT seek=1 oflag=sync conv=notrunc &
        DDPID=$!
 
-       echo "Sleep for $TIMEOUT"
-       sleep $TIMEOUT
+       # watchdog timer uses a factor of 2
+       echo "Sleep for $((TIMEOUT * 2 + 1)) seconds ..."
+       sleep $((TIMEOUT * 2 + 1))
 
        # write should be blocked and never finished
        if ! ps -p $DDPID  > /dev/null 2>&1; then
 
        # write should be blocked and never finished
        if ! ps -p $DDPID  > /dev/null 2>&1; then