Whamcloud - gitweb
LU-1842 quota: add acq/rel logic on QMT
authorJohann Lombardi <johann.lombardi@intel.com>
Thu, 4 Oct 2012 08:34:09 +0000 (10:34 +0200)
committerOleg Drokin <green@whamcloud.com>
Sat, 6 Oct 2012 06:31:51 +0000 (02:31 -0400)
This patch adds the quota acquire/release logic on the QMT.
The QMT is in charge of distributing quota space across quota
slaves. It grants a significant amount of quota space (75%) initially
and then tries to balance spare quota space among slaves.
To do so, it sends glimpse on per-ID lock to notify slaves of qunit
shrink. qunit is now the maximal amount of available quota space a
slave can own. As for qtune, this value (typically 1/4 of qunit) is
used on the quota slave to decide when to release/pre-acquire quota
space.

Signed-off-by: Johann Lombardi <johann.lombardi@intel.com>
Change-Id: I8064ce614955421681ca1da7e2bb774001ca3bea
Reviewed-on: http://review.whamcloud.com/4178
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lockd.c
lustre/quota/lquota_disk.c
lustre/quota/lquota_internal.h
lustre/quota/qmt_entry.c
lustre/quota/qmt_handler.c
lustre/quota/qmt_internal.h
lustre/quota/qmt_lock.c
lustre/quota/qmt_pool.c

index 8b0a593..2420817 100644 (file)
@@ -1060,6 +1060,7 @@ int ldlm_get_ref(void);
 void ldlm_put_ref(void);
 int ldlm_init_export(struct obd_export *exp);
 void ldlm_destroy_export(struct obd_export *exp);
 void ldlm_put_ref(void);
 int ldlm_init_export(struct obd_export *exp);
 void ldlm_destroy_export(struct obd_export *exp);
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req);
 
 /* ldlm_lock.c */
 #ifdef HAVE_SERVER_SUPPORT
 
 /* ldlm_lock.c */
 #ifdef HAVE_SERVER_SUPPORT
index d34c464..73a1145 100644 (file)
@@ -405,9 +405,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
 
 #define OBD_FAIL_QUOTA_DQACQ             0xA01
 #define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
 
 #define OBD_FAIL_QUOTA_DQACQ             0xA01
-#define OBD_FAIL_QUOTA_RET_QDATA         0xA02
-#define OBD_FAIL_QUOTA_DELAY_REL         0xA03
-#define OBD_FAIL_QUOTA_DELAY_SD          0xA04
+#define OBD_FAIL_QUOTA_EDQUOT            0xA02
+#define OBD_FAIL_QUOTA_DELAY_REINT       0xA03
+#define OBD_FAIL_QUOTA_RECOVERABLE_ERR   0xA04
 
 #define OBD_FAIL_LPROC_REMOVE            0xB00
 
 
 #define OBD_FAIL_LPROC_REMOVE            0xB00
 
index d5e4323..e960325 100644 (file)
@@ -1081,6 +1081,22 @@ int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/* return ldlm lock associated with a lock callback request */
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
+{
+       struct ldlm_cb_async_args       *ca;
+       struct ldlm_lock                *lock;
+       ENTRY;
+
+       ca = ptlrpc_req_async_args(req);
+       lock = ca->ca_lock;
+       if (lock == NULL)
+               RETURN(ERR_PTR(-EFAULT));
+
+       RETURN(lock);
+}
+EXPORT_SYMBOL(ldlm_request_lock);
+
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
                        struct lprocfs_stats *srv_stats)
 {
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
                        struct lprocfs_stats *srv_stats)
 {
index 777b2ab..2b037e8 100644 (file)
@@ -100,7 +100,7 @@ out:
 /*
  * helper function to generate the filename associated with a slave index file
  */
 /*
  * helper function to generate the filename associated with a slave index file
  */
-static inline int lquota_disk_slv_filename(struct lu_fid *glb_fid,
+static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
                                           struct obd_uuid *uuid,
                                           char *filename)
 {
                                           struct obd_uuid *uuid,
                                           char *filename)
 {
@@ -320,7 +320,7 @@ struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
                                       struct dt_device *dev,
                                       struct dt_object *parent,
 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
                                       struct dt_device *dev,
                                       struct dt_object *parent,
-                                      struct lu_fid *glb_fid,
+                                      const struct lu_fid *glb_fid,
                                       struct obd_uuid *uuid)
 {
        struct lquota_thread_info       *qti = lquota_info(env);
                                       struct obd_uuid *uuid)
 {
        struct lquota_thread_info       *qti = lquota_info(env);
index 7fedd29..7b2bc57 100644 (file)
@@ -207,11 +207,13 @@ struct lquota_site {
 /* helper routine to get/put reference on lquota_entry */
 static inline void lqe_getref(struct lquota_entry *lqe)
 {
 /* helper routine to get/put reference on lquota_entry */
 static inline void lqe_getref(struct lquota_entry *lqe)
 {
+       LASSERT(lqe != NULL);
        cfs_atomic_inc(&lqe->lqe_ref);
 }
 
 static inline void lqe_putref(struct lquota_entry *lqe)
 {
        cfs_atomic_inc(&lqe->lqe_ref);
 }
 
 static inline void lqe_putref(struct lquota_entry *lqe)
 {
+       LASSERT(lqe != NULL);
        LASSERT(atomic_read(&lqe->lqe_ref) > 0);
        if (atomic_dec_and_test(&lqe->lqe_ref))
                OBD_FREE_PTR(lqe);
        LASSERT(atomic_read(&lqe->lqe_ref) > 0);
        if (atomic_dec_and_test(&lqe->lqe_ref))
                OBD_FREE_PTR(lqe);
@@ -299,6 +301,11 @@ struct lquota_thread_info *lquota_info(const struct lu_env *env)
        return info;
 }
 
        return info;
 }
 
+#define req_is_acq(flags)    ((flags & QUOTA_DQACQ_FL_ACQ) != 0)
+#define req_is_preacq(flags) ((flags & QUOTA_DQACQ_FL_PREACQ) != 0)
+#define req_is_rel(flags)    ((flags & QUOTA_DQACQ_FL_REL) != 0)
+#define req_has_rep(flags)   ((flags & QUOTA_DQACQ_FL_REPORT) != 0)
+
 /* debugging macros */
 #ifdef LIBCFS_DEBUG
 #define lquota_lqe_debug(msgdata, mask, cdls, lqe, fmt, a...) do {      \
 /* debugging macros */
 #ifdef LIBCFS_DEBUG
 #define lquota_lqe_debug(msgdata, mask, cdls, lqe, fmt, a...) do {      \
@@ -376,7 +383,8 @@ int lquota_disk_for_each_slv(const struct lu_env *, struct dt_object *,
                             struct lu_fid *, lquota_disk_slv_cb_t, void *);
 struct dt_object *lquota_disk_slv_find(const struct lu_env *,
                                       struct dt_device *, struct dt_object *,
                             struct lu_fid *, lquota_disk_slv_cb_t, void *);
 struct dt_object *lquota_disk_slv_find(const struct lu_env *,
                                       struct dt_device *, struct dt_object *,
-                                      struct lu_fid *, struct obd_uuid *);
+                                      const struct lu_fid *,
+                                      struct obd_uuid *);
 int lquota_disk_read(const struct lu_env *, struct dt_object *,
                     union lquota_id *, struct dt_rec *);
 int lquota_disk_declare_write(const struct lu_env *, struct thandle *,
 int lquota_disk_read(const struct lu_env *, struct dt_object *,
                     union lquota_id *, struct dt_rec *);
 int lquota_disk_declare_write(const struct lu_env *, struct thandle *,
index ee57900..588440a 100644 (file)
@@ -412,3 +412,236 @@ int qmt_validate_limits(struct lquota_entry *lqe, __u64 hard, __u64 soft)
                RETURN(-EINVAL);
        RETURN(0);
 }
                RETURN(-EINVAL);
        RETURN(0);
 }
+
+/*
+ * Set/clear edquot flag after quota space allocation/release or settings
+ * change. Slaves will be notified of changes via glimpse on per-ID lock
+ *
+ * \param lqe - is the quota entry to check
+ * \param now - is the current time in second used for grace time managment
+ */
+void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
+{
+       struct qmt_pool_info    *pool = lqe2qpi(lqe);
+
+       if (!lqe->lqe_enforced)
+               RETURN_EXIT;
+
+       if (!lqe->lqe_edquot) {
+               /* space exhausted flag not set, let's check whether it is time
+                * to set the flag */
+
+               if (!qmt_space_exhausted(lqe, now))
+                       /* the qmt still has available space */
+                       RETURN_EXIT;
+
+               if (lqe->lqe_qunit != pool->qpi_least_qunit)
+                       /* we haven't reached the minimal qunit yet, so there is
+                        * still hope that the rebalancing process might free up
+                        * some quota space */
+                       RETURN_EXIT;
+
+               if (lqe->lqe_may_rel != 0 &&
+                   cfs_time_beforeq_64(lqe->lqe_revoke_time,
+                                       cfs_time_shift_64(-QMT_REBA_TIMEOUT)))
+                       /* Let's give more time to slave to release space */
+                       RETURN_EXIT;
+
+               /* set edquot flag */
+               lqe->lqe_edquot = true;
+       } else {
+               /* space exhausted flag set, let's check whether it is time to
+                * clear it */
+
+               if (qmt_space_exhausted(lqe, now))
+                       /* the qmt still has not space */
+                       RETURN_EXIT;
+
+               if (lqe->lqe_hardlimit != 0 &&
+                   lqe->lqe_granted + pool->qpi_least_qunit >
+                                                       lqe->lqe_hardlimit)
+                       /* we clear the flag only once at least one least qunit
+                        * is available */
+                       RETURN_EXIT;
+
+               /* clear edquot flag */
+               lqe->lqe_edquot = false;
+       }
+
+       LQUOTA_DEBUG(lqe, "changing edquot flag");
+
+       /* let's notify slave by issuing glimpse on per-ID lock.
+        * the rebalance thread will take care of this */
+       qmt_id_lock_notify(pool->qpi_qmt, lqe);
+}
+
+/*
+ * Try to grant more quota space back to slave.
+ *
+ * \param lqe     - is the quota entry for which we would like to allocate more
+ *                  space
+ * \param granted - is how much was already granted as part of the request
+ *                  processing
+ * \param spare   - is how much unused quota space the slave already owns
+ *
+ * \retval return how additional space can be granted to the slave
+ */
+__u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
+{
+       struct qmt_pool_info    *pool = lqe2qpi(lqe);
+       __u64                    remaining, qunit;
+       int                      slv_cnt;
+
+       LASSERT(lqe->lqe_enforced && lqe->lqe_qunit != 0);
+
+       slv_cnt = lqe2qpi(lqe)->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
+       qunit   = lqe->lqe_qunit;
+
+       if (lqe->lqe_softlimit != 0)
+               remaining = lqe->lqe_softlimit;
+       else
+               remaining = lqe->lqe_hardlimit;
+
+       if (lqe->lqe_granted >= remaining)
+               RETURN(0);
+
+       remaining -= lqe->lqe_granted;
+
+       do {
+               if (spare >= qunit)
+                       break;
+
+               granted &= (qunit - 1);
+
+               if (remaining > (slv_cnt * qunit) >> 1) {
+                       /* enough room to grant more space w/o additional
+                        * shrinking ... at least for now */
+                       remaining -= (slv_cnt * qunit) >> 1;
+               } else if (qunit != pool->qpi_least_qunit) {
+                       qunit >>= 2;
+                       continue;
+               }
+
+               granted &= (qunit - 1);
+               if (spare > 0)
+                       RETURN(min_t(__u64, qunit - spare, remaining));
+               else
+                       RETURN(min_t(__u64, qunit - granted, remaining));
+       } while (qunit >= pool->qpi_least_qunit);
+
+       RETURN(0);
+}
+
+/*
+ * Adjust qunit size according to quota limits and total granted count.
+ * The caller must have locked the lqe.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the qid entry to be adjusted
+ */
+void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       struct qmt_pool_info    *pool = lqe2qpi(lqe);
+       int                      slv_cnt;
+       __u64                    qunit, limit;
+       ENTRY;
+
+       LASSERT(lqe_is_locked(lqe));
+
+       if (!lqe->lqe_enforced)
+               /* no quota limits */
+               RETURN_EXIT;
+
+       /* record how many slaves have already registered */
+       slv_cnt = pool->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
+       if (slv_cnt == 0)
+               /* wait for at least one slave to join */
+               RETURN_EXIT;
+
+       /* Qunit calculation is based on soft limit, if any, hard limit
+        * otherwise. This means that qunit is shrunk to the minimum when
+        * beyond the soft limit. This will impact performance, but that's the
+        * price of an accurate grace time management. */
+       if (lqe->lqe_softlimit != 0) {
+               limit = lqe->lqe_softlimit;
+       } else if (lqe->lqe_hardlimit != 0) {
+               limit = lqe->lqe_hardlimit;
+       } else {
+               LQUOTA_ERROR(lqe, "enforced bit set, but neither hard nor soft "
+                            "limit are set");
+               RETURN_EXIT;
+       }
+
+       qunit = lqe->lqe_qunit == 0 ? pool->qpi_least_qunit : lqe->lqe_qunit;
+
+       /* The qunit value is computed as follows: limit / (2 * slv_cnt).
+        * Then 75% of the quota space can be granted with current qunit value.
+        * The remaining 25% are then used with reduced qunit size (by a factor
+        * of 4) which is then divided in a similar manner.
+        *
+        * |---------------------limit---------------------|
+        * |-------limit / 2-------|-limit / 4-|-limit / 4-|
+        * |qunit|qunit|qunit|qunit|           |           |
+        * |----slv_cnt * qunit----|           |           |
+        * |-grow limit-|          |           |           |
+        * |--------------shrink limit---------|           |
+        * |---space granted in qunit chunks---|-remaining-|
+        *                                    /             \
+        *                                   /               \
+        *                                  /                 \
+        *                                 /                   \
+        *                                /                     \
+        *     qunit >>= 2;            |qunit*slv_cnt|qunit*slv_cnt|
+        *                             |---space in qunit---|remain|
+        *                                  ...                               */
+       if (qunit == pool->qpi_least_qunit ||
+           limit >= lqe->lqe_granted + ((slv_cnt * qunit) >> 1)) {
+               /* current qunit value still fits, let's see if we can afford to
+                * increase qunit now ...
+                * To increase qunit again, we have to be under 25% */
+               while (limit >= lqe->lqe_granted + 6 * qunit * slv_cnt)
+                       qunit <<= 2;
+       } else {
+               /* shrink qunit until we find a suitable value */
+               while (qunit > pool->qpi_least_qunit &&
+                      limit < lqe->lqe_granted + ((slv_cnt * qunit) >> 1))
+                       qunit >>= 2;
+       }
+
+       if (lqe->lqe_qunit == qunit)
+               /* keep current qunit */
+               RETURN_EXIT;
+
+       LQUOTA_DEBUG(lqe, "%s qunit to "LPU64,
+                    lqe->lqe_qunit < qunit ? "increasing" : "decreasing",
+                    qunit);
+
+       /* store new qunit value */
+       swap(lqe->lqe_qunit, qunit);
+
+       /* reset revoke time */
+       lqe->lqe_revoke_time = 0;
+
+       if (lqe->lqe_qunit < qunit)
+               /* let's notify slave of qunit shrinking */
+               qmt_id_lock_notify(pool->qpi_qmt, lqe);
+       else if (lqe->lqe_qunit == pool->qpi_least_qunit)
+               /* initial qunit value is the smallest one */
+               lqe->lqe_revoke_time = cfs_time_current_64();
+       EXIT;
+}
+
+/*
+ * Adjust qunit & edquot flag in case it wasn't initialized already (e.g.
+ * limit set while no slaves were connected yet)
+ */
+void qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       if (lqe->lqe_qunit == 0) {
+               /* lqe was read from disk, but neither qunit, nor edquot flag
+                * were initialized */
+               qmt_adjust_qunit(env, lqe);
+               if (lqe->lqe_qunit != 0)
+                       qmt_adjust_edquot(lqe, cfs_time_current_sec());
+       }
+}
index 9a4c416..dc3f280 100644 (file)
@@ -52,7 +52,7 @@ static int qmt_getinfo(const struct lu_env *env, struct qmt_device *qmt,
                       __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
                       __u16 pool_id, __u8 restype, __u8 qtype, __u64 *time)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
-       union lquota_id         *id  = &qti->qti_id_bis;
+       union lquota_id         *id  = &qti->qti_id;
        struct lquota_entry     *lqe;
        ENTRY;
 
        struct lquota_entry     *lqe;
        ENTRY;
 
@@ -90,7 +90,7 @@ static int qmt_setinfo(const struct lu_env *env, struct qmt_device *qmt,
                       __u16 pool_id, __u8 restype, __u8 qtype, __u64 time)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
                       __u16 pool_id, __u8 restype, __u8 qtype, __u64 time)
 {
        struct qmt_thread_info  *qti = qmt_info(env);
-       union lquota_id         *id  = &qti->qti_id_bis;
+       union lquota_id         *id  = &qti->qti_id;
        struct lquota_entry     *lqe;
        struct thandle          *th = NULL;
        int                      rc;
        struct lquota_entry     *lqe;
        struct thandle          *th = NULL;
        int                      rc;
@@ -204,16 +204,11 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
        struct qmt_thread_info  *qti = qmt_info(env);
        struct lquota_entry     *lqe;
        struct thandle          *th = NULL;
        struct qmt_thread_info  *qti = qmt_info(env);
        struct lquota_entry     *lqe;
        struct thandle          *th = NULL;
-       __u64                    grace, ver;
+       __u64                    ver, now;
        bool                     dirtied = false, bump_version = false;
        int                      rc = 0;
        ENTRY;
 
        bool                     dirtied = false, bump_version = false;
        int                      rc = 0;
        ENTRY;
 
-       /* fetch global grace time */
-       rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace);
-       if (rc)
-               RETURN(rc);
-
        /* look-up quota entry associated with this ID */
        lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
        if (IS_ERR(lqe))
        /* look-up quota entry associated with this ID */
        lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
        if (IS_ERR(lqe))
@@ -225,6 +220,8 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
        if (IS_ERR(th))
                GOTO(out_nolock, rc = PTR_ERR(th));
 
        if (IS_ERR(th))
                GOTO(out_nolock, rc = PTR_ERR(th));
 
+       now = cfs_time_current_sec();
+
        lqe_write_lock(lqe);
        LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64
                     " time:"LPU64, valid, hard, soft, time);
        lqe_write_lock(lqe);
        LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64
                     " time:"LPU64, valid, hard, soft, time);
@@ -243,6 +240,9 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
                if (rc)
                        GOTO(out, rc);
 
                if (rc)
                        GOTO(out, rc);
 
+               /* recompute qunit in case it was never initialized */
+               qmt_revalidate(env, lqe);
+
                /* change quota limits */
                lqe->lqe_hardlimit = hard;
                lqe->lqe_softlimit = soft;
                /* change quota limits */
                lqe->lqe_hardlimit = hard;
                lqe->lqe_softlimit = soft;
@@ -255,7 +255,7 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
                        lqe->lqe_gracetime = 0;
                else if ((valid & QIF_TIMES) == 0)
                        /* set grace only if user hasn't provided his own */
                        lqe->lqe_gracetime = 0;
                else if ((valid & QIF_TIMES) == 0)
                        /* set grace only if user hasn't provided his own */
-                        lqe->lqe_gracetime = cfs_time_current_sec() + grace;
+                        lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
 
                /* change enforced status based on new parameters */
                if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
 
                /* change enforced status based on new parameters */
                if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
@@ -281,6 +281,13 @@ static int qmt_setquota(const struct lu_env *env, struct qmt_device *qmt,
                        qmt_restore(lqe, &qti->qti_restore);
                        GOTO(out, rc);
                }
                        qmt_restore(lqe, &qti->qti_restore);
                        GOTO(out, rc);
                }
+
+               /* compute new qunit value now that we have modified the quota
+                * settings */
+               qmt_adjust_qunit(env, lqe);
+
+               /* clear/set edquot flag as needed */
+               qmt_adjust_edquot(lqe, now);
        }
        EXIT;
 out:
        }
        EXIT;
 out:
@@ -431,6 +438,255 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
 }
 
 /*
 }
 
 /*
+ * Helper function to handle quota request from slave.
+ *
+ * \param env     - is the environment passed by the caller
+ * \param lqe     - is the lquota_entry subject to the quota request
+ * \param qmt     - is the master device
+ * \param uuid    - is the uuid associated with the slave
+ * \param qb_flags - are the quota request flags as packed in the quota_body
+ * \param qb_count - is the amount of quota space the slave wants to
+ *                   acquire/release
+ * \param qb_usage - is the current space usage on the slave
+ * \param repbody - is the quota_body of reply
+ *
+ * \retval 0            : success
+ * \retval -EDQUOT      : out of quota
+ *         -EINPROGRESS : inform client to retry write/create
+ *         -ve          : other appropriate errors
+ */
+int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
+              struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags,
+              __u64 qb_count, __u64 qb_usage, struct quota_body *repbody)
+{
+       struct qmt_thread_info  *qti = qmt_info(env);
+       __u64                    now, count;
+       struct dt_object        *slv_obj = NULL;
+       __u64                    slv_granted, slv_granted_bck;
+       struct thandle          *th = NULL;
+       int                      rc, ret;
+       ENTRY;
+
+       LASSERT(uuid != NULL);
+
+       /* initialize reply */
+       memset(repbody, 0, sizeof(*repbody));
+       memcpy(&repbody->qb_id, &lqe->lqe_id, sizeof(repbody->qb_id));
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RECOVERABLE_ERR))
+               RETURN(-cfs_fail_val);
+
+       /* look-up index file associated with acquiring slave */
+       slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, LQE_ROOT(lqe),
+                                      lu_object_fid(&LQE_GLB_OBJ(lqe)->do_lu),
+                                      uuid);
+       if (IS_ERR(slv_obj))
+               GOTO(out, rc = PTR_ERR(slv_obj));
+
+       /* pack slave fid in reply just for sanity check */
+       memcpy(&repbody->qb_slv_fid, lu_object_fid(&slv_obj->do_lu),
+              sizeof(struct lu_fid));
+
+       /* allocate & start transaction with enough credits to update
+        * global & slave indexes */
+       th = qmt_trans_start_with_slv(env, lqe, slv_obj, &qti->qti_restore);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       lqe_write_lock(lqe);
+       LQUOTA_DEBUG(lqe, "dqacq starts uuid:%s flags:0x%x wanted:"LPU64
+                    " usage:"LPU64, obd_uuid2str(uuid), qb_flags, qb_count,
+                    qb_usage);
+
+       /* Legal race, limits have been removed on master, but slave didn't
+        * receive the change yet. Just return EINPROGRESS until the slave gets
+        * notified. */
+       if (!lqe->lqe_enforced && !req_is_rel(qb_flags))
+               GOTO(out_locked, rc = -EINPROGRESS);
+
+       /* recompute qunit in case it was never initialized */
+       qmt_revalidate(env, lqe);
+
+       /* slave just wants to acquire per-ID lock */
+       if (req_is_acq(qb_flags) && qb_count == 0)
+               GOTO(out_locked, rc = 0);
+
+       /* fetch how much quota space is already granted to this slave */
+       rc = qmt_slv_read(env, lqe, slv_obj, &slv_granted);
+       if (rc) {
+               LQUOTA_ERROR(lqe, "Failed to get granted for slave %s, rc=%d",
+                            obd_uuid2str(uuid), rc);
+               GOTO(out_locked, rc);
+       }
+       /* recall how much space this slave currently owns in order to restore
+        * it in case of failure */
+       slv_granted_bck = slv_granted;
+
+       /* record current time for soft limit & grace time management */
+       now = (__u64)cfs_time_current_sec();
+
+       if (req_is_rel(qb_flags)) {
+               /* Slave would like to release quota space */
+               if (slv_granted < qb_count ||
+                   lqe->lqe_granted < qb_count) {
+                       /* can't release more than granted */
+                       LQUOTA_ERROR(lqe, "Release too much! uuid:%s release:"
+                                    LPU64" granted:"LPU64", total:"LPU64,
+                                    obd_uuid2str(uuid), qb_count,
+                                    slv_granted, lqe->lqe_granted);
+                       GOTO(out_locked, rc = -EINVAL);
+               }
+
+               repbody->qb_count = qb_count;
+               /* put released space back to global pool */
+               QMT_REL(lqe, slv_granted, qb_count);
+               GOTO(out_write, rc = 0);
+       }
+
+       if (req_has_rep(qb_flags) && slv_granted < qb_usage) {
+               /* Slave is reporting space usage in quota request and it turns
+                * out to be using more quota space than owned, so we adjust
+                * granted space regardless of the current state of affairs */
+               repbody->qb_count = qb_usage - slv_granted;
+               QMT_GRANT(lqe, slv_granted, repbody->qb_count);
+       }
+
+       if (!req_is_acq(qb_flags) && !req_is_preacq(qb_flags))
+               GOTO(out_write, rc = 0);
+
+       qmt_adjust_edquot(lqe, now);
+       if (lqe->lqe_edquot)
+               /* no hope to claim further space back */
+               GOTO(out_write, rc = -EDQUOT);
+
+       if (qmt_space_exhausted(lqe, now)) {
+               /* might have some free space once rebalancing is completed */
+               rc = req_is_acq(qb_flags) ? -EINPROGRESS : -EDQUOT;
+               GOTO(out_write, rc);
+       }
+
+       if (req_is_preacq(qb_flags)) {
+               /* slave would like to pre-acquire quota space. To do so, it
+                * reports in qb_count how much spare quota space it owns and we
+                * can grant back quota space which is consistent with qunit
+                * value. */
+
+               if (qb_count >= lqe->lqe_qunit)
+                       /* slave already own the maximum it should */
+                       GOTO(out_write, rc = 0);
+
+               count = qmt_alloc_expand(lqe, slv_granted, qb_count);
+               if (count == 0)
+                       GOTO(out_write, rc = -EDQUOT);
+
+               repbody->qb_count += count;
+               QMT_GRANT(lqe, slv_granted, count);
+               GOTO(out_write, rc = 0);
+       }
+
+       /* processing acquire request with clients waiting */
+       if (lqe->lqe_hardlimit != 0 &&
+           lqe->lqe_granted + qb_count > lqe->lqe_hardlimit) {
+               /* cannot grant as much as asked, but can still afford to grant
+                * some quota space back */
+               count = lqe->lqe_hardlimit - lqe->lqe_granted;
+               repbody->qb_count += count;
+               QMT_GRANT(lqe, slv_granted, count);
+               GOTO(out_write, rc = 0);
+       }
+
+       /* Whouhou! we can satisfy the slave request! */
+       repbody->qb_count += qb_count;
+       QMT_GRANT(lqe, slv_granted, qb_count);
+
+       /* Try to expand the acquired count for DQACQ */
+       count = qmt_alloc_expand(lqe, slv_granted, 0);
+       if (count != 0) {
+               /* can even grant more than asked, it is like xmas ... */
+               repbody->qb_count += count;
+               QMT_GRANT(lqe, slv_granted, count);
+               GOTO(out_write, rc = 0);
+       }
+
+       GOTO(out_write, rc = 0);
+out_write:
+       if (repbody->qb_count == 0)
+               GOTO(out_locked, rc);
+
+       /* start/stop grace timer if required */
+       if (lqe->lqe_softlimit != 0) {
+               if (lqe->lqe_granted > lqe->lqe_softlimit &&
+                   lqe->lqe_gracetime == 0)
+                       /* first time over soft limit, let's start grace
+                        * timer */
+                       lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
+               else if (lqe->lqe_granted <= lqe->lqe_softlimit &&
+                        lqe->lqe_gracetime != 0)
+                       /* Clear grace timer */
+                       lqe->lqe_gracetime = 0;
+       }
+
+       /* Update slave index first since it is easier to roll back */
+       ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER,
+                           &repbody->qb_slv_ver, slv_granted);
+       if (ret) {
+               /* restore initial quota settings */
+               qmt_restore(lqe, &qti->qti_restore);
+               /* reset qb_count */
+               repbody->qb_count = 0;
+               GOTO(out_locked, rc = ret);
+       }
+
+       /* Update global index, no version bump needed */
+       ret = qmt_glb_write(env, th, lqe, 0, NULL);
+       if (ret) {
+               rc = ret;
+               /* restore initial quota settings */
+               qmt_restore(lqe, &qti->qti_restore);
+               /* reset qb_count */
+               repbody->qb_count = 0;
+
+               /* restore previous granted value */
+               ret = qmt_slv_write(env, th, lqe, slv_obj, 0, NULL,
+                                   slv_granted_bck);
+               if (ret) {
+                       LQUOTA_ERROR(lqe, "failed to restore initial slave "
+                                    "value rc:%d ret%d", rc, ret);
+                       LBUG();
+               }
+               qmt_adjust_edquot(lqe, now);
+               GOTO(out_locked, rc);
+       }
+
+       /* Total granted has been changed, let's try to adjust the qunit
+        * size according to the total granted & limits. */
+       qmt_adjust_qunit(env, lqe);
+
+       /* clear/set edquot flag and notify slaves via glimpse if needed */
+       qmt_adjust_edquot(lqe, now);
+out_locked:
+       LQUOTA_DEBUG(lqe, "dqacq ends count:"LPU64" ver:"LPU64" rc:%d",
+                    repbody->qb_count, repbody->qb_slv_ver, rc);
+       lqe_write_unlock(lqe);
+out:
+       if (th != NULL && !IS_ERR(th))
+               dt_trans_stop(env, qmt->qmt_child, th);
+
+       if (slv_obj != NULL && !IS_ERR(slv_obj))
+               lu_object_put(env, &slv_obj->do_lu);
+
+       if ((req_is_acq(qb_flags) || req_is_preacq(qb_flags)) &&
+           OBD_FAIL_CHECK(OBD_FAIL_QUOTA_EDQUOT)) {
+               /* introduce inconsistency between granted value in slave index
+                * and slave index copy of slave */
+               repbody->qb_count = 0;
+               rc = -EDQUOT;
+       }
+
+       RETURN(rc);
+}
+
+/*
  * Handle quota request from slave.
  *
  * \param env  - is the environment passed by the caller
  * Handle quota request from slave.
  *
  * \param env  - is the environment passed by the caller
@@ -440,7 +696,13 @@ static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
 static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
                     struct ptlrpc_request *req)
 {
 static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
                     struct ptlrpc_request *req)
 {
+       struct qmt_device       *qmt = lu2qmt_dev(ld);
        struct quota_body       *qbody, *repbody;
        struct quota_body       *qbody, *repbody;
+       struct obd_uuid         *uuid;
+       struct ldlm_lock        *lock;
+       struct lquota_entry     *lqe;
+       int                      pool_id, pool_type, qtype;
+       int                      rc;
        ENTRY;
 
        qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
        ENTRY;
 
        qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
@@ -451,9 +713,82 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
        if (repbody == NULL)
                RETURN(err_serious(-EFAULT));
 
        if (repbody == NULL)
                RETURN(err_serious(-EFAULT));
 
-       /* XXX: to be implemented */
+       /* verify if global lock is stale */
+       if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
+               RETURN(-ENOLCK);
 
 
-       RETURN(0);
+       lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
+       if (lock == NULL)
+               RETURN(-ENOLCK);
+       LDLM_LOCK_PUT(lock);
+
+       uuid = &req->rq_export->exp_client_uuid;
+
+       if (req_is_rel(qbody->qb_flags) + req_is_acq(qbody->qb_flags) +
+           req_is_preacq(qbody->qb_flags) > 1) {
+               CERROR("%s: malformed quota request with conflicting flags set "
+                      "(%x) from slave %s\n", qmt->qmt_svname,
+                      qbody->qb_flags, obd_uuid2str(uuid));
+               RETURN(-EPROTO);
+       }
+
+       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+               /* acquire and pre-acquire should use a valid ID lock */
+
+               if (!lustre_handle_is_used(&qbody->qb_lockh))
+                       RETURN(-ENOLCK);
+
+               lock = ldlm_handle2lock(&qbody->qb_lockh);
+               if (lock == NULL)
+                       /* no lock associated with this handle */
+                       RETURN(-ENOLCK);
+
+               LDLM_DEBUG(lock, "%sacquire request",
+                          req_is_preacq(qbody->qb_flags) ? "pre" : "");
+
+               if (!obd_uuid_equals(&lock->l_export->exp_client_uuid, uuid)) {
+                       /* sorry, no way to cheat ... */
+                       LDLM_LOCK_PUT(lock);
+                       RETURN(-ENOLCK);
+               }
+
+               if ((lock->l_flags & LDLM_FL_AST_SENT) != 0) {
+                       struct ptlrpc_service_part      *svc;
+                       unsigned int                     timeout;
+
+                       svc = req->rq_rqbd->rqbd_svcpt;
+                       timeout = at_est2timeout(at_get(&svc->scp_at_estimate));
+                       timeout = max(timeout, ldlm_timeout);
+
+                       /* lock is being cancelled, prolong timeout */
+                       ldlm_refresh_waiting_lock(lock, timeout);
+               }
+               LDLM_LOCK_PUT(lock);
+       }
+
+       /* extract pool & quota information from global index FID packed in the
+        * request */
+       rc = lquota_extract_fid(&qbody->qb_fid, &pool_id, &pool_type, &qtype);
+       if (rc)
+               RETURN(-EINVAL);
+
+       /* Find the quota entry associated with the quota id */
+       lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
+                                 &qbody->qb_id);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       /* process quota request */
+       rc = qmt_dqacq0(env, lqe, qmt, uuid, qbody->qb_flags, qbody->qb_count,
+                       qbody->qb_usage, repbody);
+
+       if (lustre_handle_is_used(&qbody->qb_lockh))
+               /* return current qunit value only to slaves owning an per-ID
+                * quota lock. For enqueue, the qunit value will be returned in
+                * the LVB */
+                repbody->qb_qunit = lqe->lqe_qunit;
+       lqe_putref(lqe);
+       RETURN(rc);
 }
 
 /* Vector of quota request handlers. This vector is used by the MDT to forward
 }
 
 /* Vector of quota request handlers. This vector is used by the MDT to forward
index e3291c6..7cad493 100644 (file)
@@ -126,6 +126,9 @@ struct qmt_pool_info {
        /* number of slaves registered for each quota types */
        int                      qpi_slv_nr[MAXQUOTAS];
 
        /* number of slaves registered for each quota types */
        int                      qpi_slv_nr[MAXQUOTAS];
 
+       /* reference on lqe (ID 0) storing grace time. */
+       struct lquota_entry     *qpi_grace_lqe[MAXQUOTAS];
+
        /* procfs root directory for this pool */
        cfs_proc_dir_entry_t    *qpi_proc;
 
        /* procfs root directory for this pool */
        cfs_proc_dir_entry_t    *qpi_proc;
 
@@ -172,13 +175,11 @@ struct qmt_lqe_restore {
 struct qmt_thread_info {
        union lquota_rec        qti_rec;
        union lquota_id         qti_id;
 struct qmt_thread_info {
        union lquota_rec        qti_rec;
        union lquota_id         qti_id;
-       union lquota_id         qti_id_bis;
        char                    qti_buf[MTI_NAME_MAXLEN];
        struct lu_fid           qti_fid;
        struct ldlm_res_id      qti_resid;
        union ldlm_gl_desc      qti_gl_desc;
        struct quota_body       qti_body;
        char                    qti_buf[MTI_NAME_MAXLEN];
        struct lu_fid           qti_fid;
        struct ldlm_res_id      qti_resid;
        union ldlm_gl_desc      qti_gl_desc;
        struct quota_body       qti_body;
-       struct quota_body       qti_repbody;
        struct qmt_lqe_restore  qti_restore;
 };
 
        struct qmt_lqe_restore  qti_restore;
 };
 
@@ -214,6 +215,18 @@ static inline struct lu_device *qmt2lu_dev(struct qmt_device *qmt)
 #define LQE_ROOT(lqe)    (lqe2qpi(lqe)->qpi_root)
 #define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
 
 #define LQE_ROOT(lqe)    (lqe2qpi(lqe)->qpi_root)
 #define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
 
+/* helper function returning grace time to use for a given lquota entry */
+static inline __u64 qmt_lqe_grace(struct lquota_entry *lqe)
+{
+       struct qmt_pool_info    *pool = lqe2qpi(lqe);
+       struct lquota_entry     *grace_lqe;
+
+       grace_lqe = pool->qpi_grace_lqe[lqe->lqe_site->lqs_qtype];
+       LASSERT(grace_lqe != NULL);
+
+       return grace_lqe->lqe_gracetime;
+}
+
 static inline void qmt_restore(struct lquota_entry *lqe,
                               struct qmt_lqe_restore *restore)
 {
 static inline void qmt_restore(struct lquota_entry *lqe,
                               struct qmt_lqe_restore *restore)
 {
@@ -224,6 +237,36 @@ static inline void qmt_restore(struct lquota_entry *lqe,
        lqe->lqe_qunit     = restore->qlr_qunit;
 }
 
        lqe->lqe_qunit     = restore->qlr_qunit;
 }
 
+#define QMT_GRANT(lqe, slv, cnt)             \
+       do {                                 \
+               (lqe)->lqe_granted += (cnt); \
+               (slv) += (cnt);              \
+       } while (0)
+#define QMT_REL(lqe, slv, cnt)               \
+       do {                                 \
+               (lqe)->lqe_granted -= (cnt); \
+               (slv) -= (cnt);              \
+       } while (0)
+
+/* helper routine returning true when the id has run out of quota space, which
+ * means that it has either:
+ * - reached hardlimit
+ * OR
+ * - reached softlimit and grace time expired already */
+static inline bool qmt_space_exhausted(struct lquota_entry *lqe, __u64 now)
+{
+       if (lqe->lqe_hardlimit != 0 && lqe->lqe_granted >= lqe->lqe_hardlimit)
+               return true;
+       if (lqe->lqe_softlimit != 0 && lqe->lqe_granted > lqe->lqe_softlimit &&
+           lqe->lqe_gracetime != 0 && now >= lqe->lqe_gracetime)
+               return true;
+       return false;
+}
+
+/* number of seconds to wait for slaves to release quota space after
+ * rebalancing */
+#define QMT_REBA_TIMEOUT 2
+
 /* qmt_pool.c */
 void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
 int qmt_pool_init(const struct lu_env *, struct qmt_device *);
 /* qmt_pool.c */
 void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
 int qmt_pool_init(const struct lu_env *, struct qmt_device *);
@@ -251,6 +294,15 @@ int qmt_slv_write(const struct lu_env *, struct thandle *,
 int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
                 struct dt_object *, __u64 *);
 int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
 int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
                 struct dt_object *, __u64 *);
 int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
+void qmt_adjust_qunit(const struct lu_env *, struct lquota_entry *);
+void qmt_adjust_edquot(struct lquota_entry *, __u64);
+void qmt_revalidate(const struct lu_env *, struct lquota_entry *);
+__u64 qmt_alloc_expand(struct lquota_entry *, __u64, __u64);
+
+/* qmt_handler.c */
+int qmt_dqacq0(const struct lu_env *, struct lquota_entry *,
+              struct qmt_device *, struct obd_uuid *, __u32, __u64, __u64,
+              struct quota_body *);
 
 /* qmt_lock.c */
 int qmt_intent_policy(const struct lu_env *, struct lu_device *,
 
 /* qmt_lock.c */
 int qmt_intent_policy(const struct lu_env *, struct lu_device *,
index e37d041..3067ce7 100644 (file)
@@ -51,6 +51,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        struct quota_body       *repbody;
        struct obd_uuid         *uuid;
        struct lquota_lvb       *lvb;
        struct quota_body       *repbody;
        struct obd_uuid         *uuid;
        struct lquota_lvb       *lvb;
+       struct ldlm_resource    *res = (*lockp)->l_resource;
        int                      rc;
        ENTRY;
 
        int                      rc;
        ENTRY;
 
@@ -79,13 +80,44 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        uuid = &(*lockp)->l_export->exp_client_uuid;
        switch (it->opc) {
 
        uuid = &(*lockp)->l_export->exp_client_uuid;
        switch (it->opc) {
 
-       case IT_QUOTA_DQACQ:
-               /* XXX: to be added in a next patch */
-               GOTO(out, -EOPNOTSUPP);
+       case IT_QUOTA_DQACQ: {
+               struct lquota_entry     *lqe;
+               struct ldlm_lock        *lock;
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+                       /* acquire on global lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
+               /* verify global lock isn't stale */
+               if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+                       GOTO(out, rc = -ENOLCK);
+
+               lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+               if (lock == NULL)
+                       GOTO(out, rc = -ENOLCK);
+               LDLM_LOCK_PUT(lock);
+
+               lqe = res->lr_lvb_data;
+               LASSERT(lqe != NULL);
+               lqe_getref(lqe);
+
+               /* acquire quota space */
+               rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
+                               reqbody->qb_count, reqbody->qb_usage,
+                               repbody);
+               lqe_putref(lqe);
+               if (rc)
+                       GOTO(out, rc);
                break;
                break;
+       }
 
        case IT_QUOTA_CONN:
                /* new connection from slave */
 
        case IT_QUOTA_CONN:
                /* new connection from slave */
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+                       /* connection on per-ID lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
                rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
                                       &repbody->qb_slv_fid,
                                       &repbody->qb_slv_ver, uuid);
                rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
                                       &repbody->qb_slv_fid,
                                       &repbody->qb_slv_ver, uuid);
@@ -206,6 +238,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct qmt_device       *qmt = lu2qmt_dev(ld);
        struct lquota_entry     *lqe;
        struct lquota_lvb       *lvb;
        struct qmt_device       *qmt = lu2qmt_dev(ld);
        struct lquota_entry     *lqe;
        struct lquota_lvb       *lvb;
+       struct ldlm_lock        *lock;
+       struct obd_export       *exp;
        int                      rc = 0;
        ENTRY;
 
        int                      rc = 0;
        ENTRY;
 
@@ -218,35 +252,77 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                /* no need to update lvb for global quota locks */
                RETURN(0);
 
                /* no need to update lvb for global quota locks */
                RETURN(0);
 
+       lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+       if (lvb == NULL) {
+               CERROR("%s: failed to extract lvb from request\n",
+                      qmt->qmt_svname);
+               RETURN(-EFAULT);
+       }
+
        lqe = res->lr_lvb_data;
        LASSERT(lqe != NULL);
        lqe = res->lr_lvb_data;
        LASSERT(lqe != NULL);
+       lqe_getref(lqe);
+
+       LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
+                    lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+       if (lvb->lvb_id_rel == 0) {
+               /* nothing to release */
+               if (lvb->lvb_id_may_rel != 0)
+                       /* but might still release later ... */
+                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+               GOTO(out_lqe, rc = 0);
+       }
 
        /* allocate environement */
        OBD_ALLOC_PTR(env);
        if (env == NULL)
 
        /* allocate environement */
        OBD_ALLOC_PTR(env);
        if (env == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out_lqe, rc = -ENOMEM);
 
        /* initialize environment */
        rc = lu_env_init(env, LCT_MD_THREAD);
 
        /* initialize environment */
        rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               OBD_FREE_PTR(env);
-               RETURN(rc);
-       }
+       if (rc)
+               GOTO(out_env, rc);
        qti = qmt_info(env);
 
        qti = qmt_info(env);
 
-       lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-       if (lvb == NULL) {
-               CERROR("%s: failed to extract lvb from request\n",
+       /* The request is a glimpse callback which was sent via the
+        * reverse import to the slave. What we care about here is the
+        * export associated with the slave and req->rq_export is
+        * definitely not what we are looking for (it is actually set to
+        * NULL here).
+        * Therefore we extract the lock from the request argument
+        * and use lock->l_export. */
+       lock = ldlm_request_lock(req);
+       if (IS_ERR(lock)) {
+               CERROR("%s: failed to get lock from request!\n",
                       qmt->qmt_svname);
                       qmt->qmt_svname);
-               GOTO(out, rc);
+               GOTO(out_env_init, rc = PTR_ERR(lock));
        }
 
        }
 
-       /* XXX: Space release handling to be added in a next patch */
+       exp = class_export_get(lock->l_export);
+       if (exp == NULL) {
+               CERROR("%s: failed to get export from lock!\n",
+                      qmt->qmt_svname);
+               GOTO(out_env_init, rc = -EFAULT);
+       }
 
 
+       /* release quota space */
+       rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
+                       QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
+       if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+               LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
+                            LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
+                            lvb->lvb_id_rel, rc);
+       class_export_put(exp);
+       if (rc)
+               GOTO(out_env_init, rc);
        EXIT;
        EXIT;
-out:
+out_env_init:
        lu_env_fini(env);
        lu_env_fini(env);
+out_env:
        OBD_FREE_PTR(env);
        OBD_FREE_PTR(env);
+out_lqe:
+       lqe_putref(lqe);
        return rc;
 }
 
        return rc;
 }
 
@@ -537,10 +613,15 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
                 * opportunity to enqueue quota lock yet. */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
                             "lock "DFID, PFID(&qti->qti_fid));
                 * opportunity to enqueue quota lock yet. */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
                             "lock "DFID, PFID(&qti->qti_fid));
+               lqe_write_lock(lqe);
+               if (lqe->lqe_revoke_time == 0 &&
+                   lqe->lqe_qunit == pool->qpi_least_qunit)
+                       lqe->lqe_revoke_time = cfs_time_current_64();
+               lqe_write_unlock(lqe);
                RETURN_EXIT;
        }
 
                RETURN_EXIT;
        }
 
-       lqe_read_lock(lqe);
+       lqe_write_lock(lqe);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
@@ -553,18 +634,32 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        else
                qti->qti_gl_desc.lquota_desc.gl_flags = 0;
        qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
        else
                qti->qti_gl_desc.lquota_desc.gl_flags = 0;
        qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-       lqe_read_unlock(lqe);
+
+       if (lqe->lqe_revoke_time == 0 &&
+           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+               /* reset lqe_may_rel, it will be updated on glimpse callback
+                * replies if needed */
+               lqe->lqe_may_rel = 0;
 
        /* The rebalance thread is the only thread which can issue glimpses */
        LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
 
        /* The rebalance thread is the only thread which can issue glimpses */
        LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
+       lqe_write_unlock(lqe);
 
        /* issue glimpse callback to slaves */
        rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
                              uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
 
 
        /* issue glimpse callback to slaves */
        rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
                              uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
 
+       lqe_write_lock(lqe);
+       if (lqe->lqe_revoke_time == 0 &&
+           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
+           lqe->lqe_qunit == pool->qpi_least_qunit) {
+               lqe->lqe_revoke_time = cfs_time_current_64();
+               qmt_adjust_edquot(lqe, cfs_time_current_sec());
+       }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
+       lqe_write_unlock(lqe);
 
        ldlm_resource_putref(res);
        EXIT;
 
        ldlm_resource_putref(res);
        EXIT;
index 3bea3d9..c1f8b72 100644 (file)
@@ -273,6 +273,10 @@ static void qmt_pool_free(const struct lu_env *env, struct qmt_pool_info *pool)
        /* release per-quota type site used to manage quota entries as well as
         * references to global index files */
        for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
        /* release per-quota type site used to manage quota entries as well as
         * references to global index files */
        for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+               /* release lqe storing grace time */
+               if (pool->qpi_grace_lqe[qtype] != NULL)
+                       lqe_putref(pool->qpi_grace_lqe[qtype]);
+
                /* release site */
                if (pool->qpi_site[qtype] != NULL &&
                    !IS_ERR(pool->qpi_site[qtype]))
                /* release site */
                if (pool->qpi_site[qtype] != NULL &&
                    !IS_ERR(pool->qpi_site[qtype]))
@@ -460,8 +464,9 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
        /* iterate over each pool in the hash and allocate a quota site for each
         * one. This involves creating a global index file on disk */
        cfs_list_for_each(pos, &qmt->qmt_pool_list) {
        /* iterate over each pool in the hash and allocate a quota site for each
         * one. This involves creating a global index file on disk */
        cfs_list_for_each(pos, &qmt->qmt_pool_list) {
-               struct dt_object *obj;
-               int               pool_type, pool_id;
+               struct dt_object        *obj;
+               int                      pool_type, pool_id;
+               struct lquota_entry     *lqe;
 
                pool = cfs_list_entry(pos, struct qmt_pool_info,
                                      qpi_linkage);
 
                pool = cfs_list_entry(pos, struct qmt_pool_info,
                                      qpi_linkage);
@@ -525,6 +530,17 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                       qmt->qmt_svname, QTYPE_NAME(qtype), rc);
                                RETURN(rc);
                        }
                                       qmt->qmt_svname, QTYPE_NAME(qtype), rc);
                                RETURN(rc);
                        }
+
+                       /* Global grace time is stored in quota settings of
+                        * ID 0. */
+                       qti->qti_id.qid_uid = 0;
+
+                       /* look-up quota entry storing grace time */
+                       lqe = lqe_locate(env, pool->qpi_site[qtype],
+                                        &qti->qti_id);
+                       if (IS_ERR(lqe))
+                               RETURN(PTR_ERR(lqe));
+                       pool->qpi_grace_lqe[qtype] = lqe;
 #ifdef LPROCFS
                        /* add procfs file to dump the global index, mostly for
                         * debugging purpose */
 #ifdef LPROCFS
                        /* add procfs file to dump the global index, mostly for
                         * debugging purpose */
@@ -633,10 +649,18 @@ struct lquota_entry *qmt_pool_lqe_lookup(const struct lu_env *env,
        if (IS_ERR(pool))
                RETURN((void *)pool);
 
        if (IS_ERR(pool))
                RETURN((void *)pool);
 
+       if (qid->qid_uid == 0) {
+               /* caller wants to access grace time, no need to look up the
+                * entry since we keep a reference on ID 0 all the time */
+               lqe = pool->qpi_grace_lqe[qtype];
+               lqe_getref(lqe);
+               GOTO(out, 0);
+       }
+
        /* now that we have the pool, let's look-up the quota entry in the
         * right quota site */
        lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
        /* now that we have the pool, let's look-up the quota entry in the
         * right quota site */
        lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
-
+out:
        qpi_putref(env, pool);
        RETURN(lqe);
 }
        qpi_putref(env, pool);
        RETURN(lqe);
 }