This patch adds the quota acquire/release logic on the QMT.
The QMT is in charge of distributing quota space across quota
slaves. It grants a significant amount of quota space (75%) initially
and then tries to balance spare quota space among slaves.
To do so, it sends glimpse on per-ID lock to notify slaves of qunit
shrink. qunit is now the maximal amount of available quota space a
slave can own. As for qtune, this value (typically 1/4 of qunit) is
used on the quota slave to decide when to release/pre-acquire quota
space.
Signed-off-by: Johann Lombardi <johann.lombardi@intel.com>
Change-Id: I8064ce614955421681ca1da7e2bb774001ca3bea
Reviewed-on: http://review.whamcloud.com/4178
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
void ldlm_put_ref(void);
int ldlm_init_export(struct obd_export *exp);
void ldlm_destroy_export(struct obd_export *exp);
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req);
/* ldlm_lock.c */
#ifdef HAVE_SERVER_SUPPORT
#define OBD_FAIL_MGS_PAUSE_TARGET_REG 0x905
#define OBD_FAIL_QUOTA_DQACQ 0xA01
-#define OBD_FAIL_QUOTA_RET_QDATA 0xA02
-#define OBD_FAIL_QUOTA_DELAY_REL 0xA03
-#define OBD_FAIL_QUOTA_DELAY_SD 0xA04
+#define OBD_FAIL_QUOTA_EDQUOT 0xA02
+#define OBD_FAIL_QUOTA_DELAY_REINT 0xA03
+#define OBD_FAIL_QUOTA_RECOVERABLE_ERR 0xA04
#define OBD_FAIL_LPROC_REMOVE 0xB00
RETURN(rc);
}
+/* return ldlm lock associated with a lock callback request */
+struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
+{
+ struct ldlm_cb_async_args *ca;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ ca = ptlrpc_req_async_args(req);
+ lock = ca->ca_lock;
+ if (lock == NULL)
+ RETURN(ERR_PTR(-EFAULT));
+
+ RETURN(lock);
+}
+EXPORT_SYMBOL(ldlm_request_lock);
+
static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
struct lprocfs_stats *srv_stats)
{
/*
* helper function to generate the filename associated with a slave index file
*/
-static inline int lquota_disk_slv_filename(struct lu_fid *glb_fid,
+static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
struct obd_uuid *uuid,
char *filename)
{
struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
struct dt_device *dev,
struct dt_object *parent,
- struct lu_fid *glb_fid,
+ const struct lu_fid *glb_fid,
struct obd_uuid *uuid)
{
struct lquota_thread_info *qti = lquota_info(env);
/* helper routine to get/put reference on lquota_entry */
static inline void lqe_getref(struct lquota_entry *lqe)
{
+ LASSERT(lqe != NULL);
cfs_atomic_inc(&lqe->lqe_ref);
}
static inline void lqe_putref(struct lquota_entry *lqe)
{
+ LASSERT(lqe != NULL);
LASSERT(atomic_read(&lqe->lqe_ref) > 0);
if (atomic_dec_and_test(&lqe->lqe_ref))
OBD_FREE_PTR(lqe);
return info;
}
+#define req_is_acq(flags) ((flags & QUOTA_DQACQ_FL_ACQ) != 0)
+#define req_is_preacq(flags) ((flags & QUOTA_DQACQ_FL_PREACQ) != 0)
+#define req_is_rel(flags) ((flags & QUOTA_DQACQ_FL_REL) != 0)
+#define req_has_rep(flags) ((flags & QUOTA_DQACQ_FL_REPORT) != 0)
+
/* debugging macros */
#ifdef LIBCFS_DEBUG
#define lquota_lqe_debug(msgdata, mask, cdls, lqe, fmt, a...) do { \
struct lu_fid *, lquota_disk_slv_cb_t, void *);
struct dt_object *lquota_disk_slv_find(const struct lu_env *,
struct dt_device *, struct dt_object *,
- struct lu_fid *, struct obd_uuid *);
+ const struct lu_fid *,
+ struct obd_uuid *);
int lquota_disk_read(const struct lu_env *, struct dt_object *,
union lquota_id *, struct dt_rec *);
int lquota_disk_declare_write(const struct lu_env *, struct thandle *,
RETURN(-EINVAL);
RETURN(0);
}
+
+/*
+ * Set/clear edquot flag after quota space allocation/release or settings
+ * change. Slaves will be notified of changes via glimpse on per-ID lock
+ *
+ * \param lqe - is the quota entry to check
+ * \param now - is the current time in second used for grace time managment
+ */
+void qmt_adjust_edquot(struct lquota_entry *lqe, __u64 now)
+{
+ struct qmt_pool_info *pool = lqe2qpi(lqe);
+
+ if (!lqe->lqe_enforced)
+ RETURN_EXIT;
+
+ if (!lqe->lqe_edquot) {
+ /* space exhausted flag not set, let's check whether it is time
+ * to set the flag */
+
+ if (!qmt_space_exhausted(lqe, now))
+ /* the qmt still has available space */
+ RETURN_EXIT;
+
+ if (lqe->lqe_qunit != pool->qpi_least_qunit)
+ /* we haven't reached the minimal qunit yet, so there is
+ * still hope that the rebalancing process might free up
+ * some quota space */
+ RETURN_EXIT;
+
+ if (lqe->lqe_may_rel != 0 &&
+ cfs_time_beforeq_64(lqe->lqe_revoke_time,
+ cfs_time_shift_64(-QMT_REBA_TIMEOUT)))
+ /* Let's give more time to slave to release space */
+ RETURN_EXIT;
+
+ /* set edquot flag */
+ lqe->lqe_edquot = true;
+ } else {
+ /* space exhausted flag set, let's check whether it is time to
+ * clear it */
+
+ if (qmt_space_exhausted(lqe, now))
+ /* the qmt still has not space */
+ RETURN_EXIT;
+
+ if (lqe->lqe_hardlimit != 0 &&
+ lqe->lqe_granted + pool->qpi_least_qunit >
+ lqe->lqe_hardlimit)
+ /* we clear the flag only once at least one least qunit
+ * is available */
+ RETURN_EXIT;
+
+ /* clear edquot flag */
+ lqe->lqe_edquot = false;
+ }
+
+ LQUOTA_DEBUG(lqe, "changing edquot flag");
+
+ /* let's notify slave by issuing glimpse on per-ID lock.
+ * the rebalance thread will take care of this */
+ qmt_id_lock_notify(pool->qpi_qmt, lqe);
+}
+
+/*
+ * Try to grant more quota space back to slave.
+ *
+ * \param lqe - is the quota entry for which we would like to allocate more
+ * space
+ * \param granted - is how much was already granted as part of the request
+ * processing
+ * \param spare - is how much unused quota space the slave already owns
+ *
+ * \retval return how additional space can be granted to the slave
+ */
+__u64 qmt_alloc_expand(struct lquota_entry *lqe, __u64 granted, __u64 spare)
+{
+ struct qmt_pool_info *pool = lqe2qpi(lqe);
+ __u64 remaining, qunit;
+ int slv_cnt;
+
+ LASSERT(lqe->lqe_enforced && lqe->lqe_qunit != 0);
+
+ slv_cnt = lqe2qpi(lqe)->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
+ qunit = lqe->lqe_qunit;
+
+ if (lqe->lqe_softlimit != 0)
+ remaining = lqe->lqe_softlimit;
+ else
+ remaining = lqe->lqe_hardlimit;
+
+ if (lqe->lqe_granted >= remaining)
+ RETURN(0);
+
+ remaining -= lqe->lqe_granted;
+
+ do {
+ if (spare >= qunit)
+ break;
+
+ granted &= (qunit - 1);
+
+ if (remaining > (slv_cnt * qunit) >> 1) {
+ /* enough room to grant more space w/o additional
+ * shrinking ... at least for now */
+ remaining -= (slv_cnt * qunit) >> 1;
+ } else if (qunit != pool->qpi_least_qunit) {
+ qunit >>= 2;
+ continue;
+ }
+
+ granted &= (qunit - 1);
+ if (spare > 0)
+ RETURN(min_t(__u64, qunit - spare, remaining));
+ else
+ RETURN(min_t(__u64, qunit - granted, remaining));
+ } while (qunit >= pool->qpi_least_qunit);
+
+ RETURN(0);
+}
+
+/*
+ * Adjust qunit size according to quota limits and total granted count.
+ * The caller must have locked the lqe.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the qid entry to be adjusted
+ */
+void qmt_adjust_qunit(const struct lu_env *env, struct lquota_entry *lqe)
+{
+ struct qmt_pool_info *pool = lqe2qpi(lqe);
+ int slv_cnt;
+ __u64 qunit, limit;
+ ENTRY;
+
+ LASSERT(lqe_is_locked(lqe));
+
+ if (!lqe->lqe_enforced)
+ /* no quota limits */
+ RETURN_EXIT;
+
+ /* record how many slaves have already registered */
+ slv_cnt = pool->qpi_slv_nr[lqe->lqe_site->lqs_qtype];
+ if (slv_cnt == 0)
+ /* wait for at least one slave to join */
+ RETURN_EXIT;
+
+ /* Qunit calculation is based on soft limit, if any, hard limit
+ * otherwise. This means that qunit is shrunk to the minimum when
+ * beyond the soft limit. This will impact performance, but that's the
+ * price of an accurate grace time management. */
+ if (lqe->lqe_softlimit != 0) {
+ limit = lqe->lqe_softlimit;
+ } else if (lqe->lqe_hardlimit != 0) {
+ limit = lqe->lqe_hardlimit;
+ } else {
+ LQUOTA_ERROR(lqe, "enforced bit set, but neither hard nor soft "
+ "limit are set");
+ RETURN_EXIT;
+ }
+
+ qunit = lqe->lqe_qunit == 0 ? pool->qpi_least_qunit : lqe->lqe_qunit;
+
+ /* The qunit value is computed as follows: limit / (2 * slv_cnt).
+ * Then 75% of the quota space can be granted with current qunit value.
+ * The remaining 25% are then used with reduced qunit size (by a factor
+ * of 4) which is then divided in a similar manner.
+ *
+ * |---------------------limit---------------------|
+ * |-------limit / 2-------|-limit / 4-|-limit / 4-|
+ * |qunit|qunit|qunit|qunit| | |
+ * |----slv_cnt * qunit----| | |
+ * |-grow limit-| | | |
+ * |--------------shrink limit---------| |
+ * |---space granted in qunit chunks---|-remaining-|
+ * / \
+ * / \
+ * / \
+ * / \
+ * / \
+ * qunit >>= 2; |qunit*slv_cnt|qunit*slv_cnt|
+ * |---space in qunit---|remain|
+ * ... */
+ if (qunit == pool->qpi_least_qunit ||
+ limit >= lqe->lqe_granted + ((slv_cnt * qunit) >> 1)) {
+ /* current qunit value still fits, let's see if we can afford to
+ * increase qunit now ...
+ * To increase qunit again, we have to be under 25% */
+ while (limit >= lqe->lqe_granted + 6 * qunit * slv_cnt)
+ qunit <<= 2;
+ } else {
+ /* shrink qunit until we find a suitable value */
+ while (qunit > pool->qpi_least_qunit &&
+ limit < lqe->lqe_granted + ((slv_cnt * qunit) >> 1))
+ qunit >>= 2;
+ }
+
+ if (lqe->lqe_qunit == qunit)
+ /* keep current qunit */
+ RETURN_EXIT;
+
+ LQUOTA_DEBUG(lqe, "%s qunit to "LPU64,
+ lqe->lqe_qunit < qunit ? "increasing" : "decreasing",
+ qunit);
+
+ /* store new qunit value */
+ swap(lqe->lqe_qunit, qunit);
+
+ /* reset revoke time */
+ lqe->lqe_revoke_time = 0;
+
+ if (lqe->lqe_qunit < qunit)
+ /* let's notify slave of qunit shrinking */
+ qmt_id_lock_notify(pool->qpi_qmt, lqe);
+ else if (lqe->lqe_qunit == pool->qpi_least_qunit)
+ /* initial qunit value is the smallest one */
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ EXIT;
+}
+
+/*
+ * Adjust qunit & edquot flag in case it wasn't initialized already (e.g.
+ * limit set while no slaves were connected yet)
+ */
+void qmt_revalidate(const struct lu_env *env, struct lquota_entry *lqe)
+{
+ if (lqe->lqe_qunit == 0) {
+ /* lqe was read from disk, but neither qunit, nor edquot flag
+ * were initialized */
+ qmt_adjust_qunit(env, lqe);
+ if (lqe->lqe_qunit != 0)
+ qmt_adjust_edquot(lqe, cfs_time_current_sec());
+ }
+}
__u16 pool_id, __u8 restype, __u8 qtype, __u64 *time)
{
struct qmt_thread_info *qti = qmt_info(env);
- union lquota_id *id = &qti->qti_id_bis;
+ union lquota_id *id = &qti->qti_id;
struct lquota_entry *lqe;
ENTRY;
__u16 pool_id, __u8 restype, __u8 qtype, __u64 time)
{
struct qmt_thread_info *qti = qmt_info(env);
- union lquota_id *id = &qti->qti_id_bis;
+ union lquota_id *id = &qti->qti_id;
struct lquota_entry *lqe;
struct thandle *th = NULL;
int rc;
struct qmt_thread_info *qti = qmt_info(env);
struct lquota_entry *lqe;
struct thandle *th = NULL;
- __u64 grace, ver;
+ __u64 ver, now;
bool dirtied = false, bump_version = false;
int rc = 0;
ENTRY;
- /* fetch global grace time */
- rc = qmt_getinfo(env, qmt, pool_id, restype, qtype, &grace);
- if (rc)
- RETURN(rc);
-
/* look-up quota entry associated with this ID */
lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, restype, qtype, id);
if (IS_ERR(lqe))
if (IS_ERR(th))
GOTO(out_nolock, rc = PTR_ERR(th));
+ now = cfs_time_current_sec();
+
lqe_write_lock(lqe);
LQUOTA_DEBUG(lqe, "setquota valid:%x hard:"LPU64" soft:"LPU64
" time:"LPU64, valid, hard, soft, time);
if (rc)
GOTO(out, rc);
+ /* recompute qunit in case it was never initialized */
+ qmt_revalidate(env, lqe);
+
/* change quota limits */
lqe->lqe_hardlimit = hard;
lqe->lqe_softlimit = soft;
lqe->lqe_gracetime = 0;
else if ((valid & QIF_TIMES) == 0)
/* set grace only if user hasn't provided his own */
- lqe->lqe_gracetime = cfs_time_current_sec() + grace;
+ lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
/* change enforced status based on new parameters */
if (lqe->lqe_hardlimit == 0 && lqe->lqe_softlimit == 0)
qmt_restore(lqe, &qti->qti_restore);
GOTO(out, rc);
}
+
+ /* compute new qunit value now that we have modified the quota
+ * settings */
+ qmt_adjust_qunit(env, lqe);
+
+ /* clear/set edquot flag as needed */
+ qmt_adjust_edquot(lqe, now);
}
EXIT;
out:
}
/*
+ * Helper function to handle quota request from slave.
+ *
+ * \param env - is the environment passed by the caller
+ * \param lqe - is the lquota_entry subject to the quota request
+ * \param qmt - is the master device
+ * \param uuid - is the uuid associated with the slave
+ * \param qb_flags - are the quota request flags as packed in the quota_body
+ * \param qb_count - is the amount of quota space the slave wants to
+ * acquire/release
+ * \param qb_usage - is the current space usage on the slave
+ * \param repbody - is the quota_body of reply
+ *
+ * \retval 0 : success
+ * \retval -EDQUOT : out of quota
+ * -EINPROGRESS : inform client to retry write/create
+ * -ve : other appropriate errors
+ */
+int qmt_dqacq0(const struct lu_env *env, struct lquota_entry *lqe,
+ struct qmt_device *qmt, struct obd_uuid *uuid, __u32 qb_flags,
+ __u64 qb_count, __u64 qb_usage, struct quota_body *repbody)
+{
+ struct qmt_thread_info *qti = qmt_info(env);
+ __u64 now, count;
+ struct dt_object *slv_obj = NULL;
+ __u64 slv_granted, slv_granted_bck;
+ struct thandle *th = NULL;
+ int rc, ret;
+ ENTRY;
+
+ LASSERT(uuid != NULL);
+
+ /* initialize reply */
+ memset(repbody, 0, sizeof(*repbody));
+ memcpy(&repbody->qb_id, &lqe->lqe_id, sizeof(repbody->qb_id));
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RECOVERABLE_ERR))
+ RETURN(-cfs_fail_val);
+
+ /* look-up index file associated with acquiring slave */
+ slv_obj = lquota_disk_slv_find(env, qmt->qmt_child, LQE_ROOT(lqe),
+ lu_object_fid(&LQE_GLB_OBJ(lqe)->do_lu),
+ uuid);
+ if (IS_ERR(slv_obj))
+ GOTO(out, rc = PTR_ERR(slv_obj));
+
+ /* pack slave fid in reply just for sanity check */
+ memcpy(&repbody->qb_slv_fid, lu_object_fid(&slv_obj->do_lu),
+ sizeof(struct lu_fid));
+
+ /* allocate & start transaction with enough credits to update
+ * global & slave indexes */
+ th = qmt_trans_start_with_slv(env, lqe, slv_obj, &qti->qti_restore);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ lqe_write_lock(lqe);
+ LQUOTA_DEBUG(lqe, "dqacq starts uuid:%s flags:0x%x wanted:"LPU64
+ " usage:"LPU64, obd_uuid2str(uuid), qb_flags, qb_count,
+ qb_usage);
+
+ /* Legal race, limits have been removed on master, but slave didn't
+ * receive the change yet. Just return EINPROGRESS until the slave gets
+ * notified. */
+ if (!lqe->lqe_enforced && !req_is_rel(qb_flags))
+ GOTO(out_locked, rc = -EINPROGRESS);
+
+ /* recompute qunit in case it was never initialized */
+ qmt_revalidate(env, lqe);
+
+ /* slave just wants to acquire per-ID lock */
+ if (req_is_acq(qb_flags) && qb_count == 0)
+ GOTO(out_locked, rc = 0);
+
+ /* fetch how much quota space is already granted to this slave */
+ rc = qmt_slv_read(env, lqe, slv_obj, &slv_granted);
+ if (rc) {
+ LQUOTA_ERROR(lqe, "Failed to get granted for slave %s, rc=%d",
+ obd_uuid2str(uuid), rc);
+ GOTO(out_locked, rc);
+ }
+ /* recall how much space this slave currently owns in order to restore
+ * it in case of failure */
+ slv_granted_bck = slv_granted;
+
+ /* record current time for soft limit & grace time management */
+ now = (__u64)cfs_time_current_sec();
+
+ if (req_is_rel(qb_flags)) {
+ /* Slave would like to release quota space */
+ if (slv_granted < qb_count ||
+ lqe->lqe_granted < qb_count) {
+ /* can't release more than granted */
+ LQUOTA_ERROR(lqe, "Release too much! uuid:%s release:"
+ LPU64" granted:"LPU64", total:"LPU64,
+ obd_uuid2str(uuid), qb_count,
+ slv_granted, lqe->lqe_granted);
+ GOTO(out_locked, rc = -EINVAL);
+ }
+
+ repbody->qb_count = qb_count;
+ /* put released space back to global pool */
+ QMT_REL(lqe, slv_granted, qb_count);
+ GOTO(out_write, rc = 0);
+ }
+
+ if (req_has_rep(qb_flags) && slv_granted < qb_usage) {
+ /* Slave is reporting space usage in quota request and it turns
+ * out to be using more quota space than owned, so we adjust
+ * granted space regardless of the current state of affairs */
+ repbody->qb_count = qb_usage - slv_granted;
+ QMT_GRANT(lqe, slv_granted, repbody->qb_count);
+ }
+
+ if (!req_is_acq(qb_flags) && !req_is_preacq(qb_flags))
+ GOTO(out_write, rc = 0);
+
+ qmt_adjust_edquot(lqe, now);
+ if (lqe->lqe_edquot)
+ /* no hope to claim further space back */
+ GOTO(out_write, rc = -EDQUOT);
+
+ if (qmt_space_exhausted(lqe, now)) {
+ /* might have some free space once rebalancing is completed */
+ rc = req_is_acq(qb_flags) ? -EINPROGRESS : -EDQUOT;
+ GOTO(out_write, rc);
+ }
+
+ if (req_is_preacq(qb_flags)) {
+ /* slave would like to pre-acquire quota space. To do so, it
+ * reports in qb_count how much spare quota space it owns and we
+ * can grant back quota space which is consistent with qunit
+ * value. */
+
+ if (qb_count >= lqe->lqe_qunit)
+ /* slave already own the maximum it should */
+ GOTO(out_write, rc = 0);
+
+ count = qmt_alloc_expand(lqe, slv_granted, qb_count);
+ if (count == 0)
+ GOTO(out_write, rc = -EDQUOT);
+
+ repbody->qb_count += count;
+ QMT_GRANT(lqe, slv_granted, count);
+ GOTO(out_write, rc = 0);
+ }
+
+ /* processing acquire request with clients waiting */
+ if (lqe->lqe_hardlimit != 0 &&
+ lqe->lqe_granted + qb_count > lqe->lqe_hardlimit) {
+ /* cannot grant as much as asked, but can still afford to grant
+ * some quota space back */
+ count = lqe->lqe_hardlimit - lqe->lqe_granted;
+ repbody->qb_count += count;
+ QMT_GRANT(lqe, slv_granted, count);
+ GOTO(out_write, rc = 0);
+ }
+
+ /* Whouhou! we can satisfy the slave request! */
+ repbody->qb_count += qb_count;
+ QMT_GRANT(lqe, slv_granted, qb_count);
+
+ /* Try to expand the acquired count for DQACQ */
+ count = qmt_alloc_expand(lqe, slv_granted, 0);
+ if (count != 0) {
+ /* can even grant more than asked, it is like xmas ... */
+ repbody->qb_count += count;
+ QMT_GRANT(lqe, slv_granted, count);
+ GOTO(out_write, rc = 0);
+ }
+
+ GOTO(out_write, rc = 0);
+out_write:
+ if (repbody->qb_count == 0)
+ GOTO(out_locked, rc);
+
+ /* start/stop grace timer if required */
+ if (lqe->lqe_softlimit != 0) {
+ if (lqe->lqe_granted > lqe->lqe_softlimit &&
+ lqe->lqe_gracetime == 0)
+ /* first time over soft limit, let's start grace
+ * timer */
+ lqe->lqe_gracetime = now + qmt_lqe_grace(lqe);
+ else if (lqe->lqe_granted <= lqe->lqe_softlimit &&
+ lqe->lqe_gracetime != 0)
+ /* Clear grace timer */
+ lqe->lqe_gracetime = 0;
+ }
+
+ /* Update slave index first since it is easier to roll back */
+ ret = qmt_slv_write(env, th, lqe, slv_obj, LQUOTA_BUMP_VER,
+ &repbody->qb_slv_ver, slv_granted);
+ if (ret) {
+ /* restore initial quota settings */
+ qmt_restore(lqe, &qti->qti_restore);
+ /* reset qb_count */
+ repbody->qb_count = 0;
+ GOTO(out_locked, rc = ret);
+ }
+
+ /* Update global index, no version bump needed */
+ ret = qmt_glb_write(env, th, lqe, 0, NULL);
+ if (ret) {
+ rc = ret;
+ /* restore initial quota settings */
+ qmt_restore(lqe, &qti->qti_restore);
+ /* reset qb_count */
+ repbody->qb_count = 0;
+
+ /* restore previous granted value */
+ ret = qmt_slv_write(env, th, lqe, slv_obj, 0, NULL,
+ slv_granted_bck);
+ if (ret) {
+ LQUOTA_ERROR(lqe, "failed to restore initial slave "
+ "value rc:%d ret%d", rc, ret);
+ LBUG();
+ }
+ qmt_adjust_edquot(lqe, now);
+ GOTO(out_locked, rc);
+ }
+
+ /* Total granted has been changed, let's try to adjust the qunit
+ * size according to the total granted & limits. */
+ qmt_adjust_qunit(env, lqe);
+
+ /* clear/set edquot flag and notify slaves via glimpse if needed */
+ qmt_adjust_edquot(lqe, now);
+out_locked:
+ LQUOTA_DEBUG(lqe, "dqacq ends count:"LPU64" ver:"LPU64" rc:%d",
+ repbody->qb_count, repbody->qb_slv_ver, rc);
+ lqe_write_unlock(lqe);
+out:
+ if (th != NULL && !IS_ERR(th))
+ dt_trans_stop(env, qmt->qmt_child, th);
+
+ if (slv_obj != NULL && !IS_ERR(slv_obj))
+ lu_object_put(env, &slv_obj->do_lu);
+
+ if ((req_is_acq(qb_flags) || req_is_preacq(qb_flags)) &&
+ OBD_FAIL_CHECK(OBD_FAIL_QUOTA_EDQUOT)) {
+ /* introduce inconsistency between granted value in slave index
+ * and slave index copy of slave */
+ repbody->qb_count = 0;
+ rc = -EDQUOT;
+ }
+
+ RETURN(rc);
+}
+
+/*
* Handle quota request from slave.
*
* \param env - is the environment passed by the caller
static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
struct ptlrpc_request *req)
{
+ struct qmt_device *qmt = lu2qmt_dev(ld);
struct quota_body *qbody, *repbody;
+ struct obd_uuid *uuid;
+ struct ldlm_lock *lock;
+ struct lquota_entry *lqe;
+ int pool_id, pool_type, qtype;
+ int rc;
ENTRY;
qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
if (repbody == NULL)
RETURN(err_serious(-EFAULT));
- /* XXX: to be implemented */
+ /* verify if global lock is stale */
+ if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
+ RETURN(-ENOLCK);
- RETURN(0);
+ lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
+ if (lock == NULL)
+ RETURN(-ENOLCK);
+ LDLM_LOCK_PUT(lock);
+
+ uuid = &req->rq_export->exp_client_uuid;
+
+ if (req_is_rel(qbody->qb_flags) + req_is_acq(qbody->qb_flags) +
+ req_is_preacq(qbody->qb_flags) > 1) {
+ CERROR("%s: malformed quota request with conflicting flags set "
+ "(%x) from slave %s\n", qmt->qmt_svname,
+ qbody->qb_flags, obd_uuid2str(uuid));
+ RETURN(-EPROTO);
+ }
+
+ if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+ /* acquire and pre-acquire should use a valid ID lock */
+
+ if (!lustre_handle_is_used(&qbody->qb_lockh))
+ RETURN(-ENOLCK);
+
+ lock = ldlm_handle2lock(&qbody->qb_lockh);
+ if (lock == NULL)
+ /* no lock associated with this handle */
+ RETURN(-ENOLCK);
+
+ LDLM_DEBUG(lock, "%sacquire request",
+ req_is_preacq(qbody->qb_flags) ? "pre" : "");
+
+ if (!obd_uuid_equals(&lock->l_export->exp_client_uuid, uuid)) {
+ /* sorry, no way to cheat ... */
+ LDLM_LOCK_PUT(lock);
+ RETURN(-ENOLCK);
+ }
+
+ if ((lock->l_flags & LDLM_FL_AST_SENT) != 0) {
+ struct ptlrpc_service_part *svc;
+ unsigned int timeout;
+
+ svc = req->rq_rqbd->rqbd_svcpt;
+ timeout = at_est2timeout(at_get(&svc->scp_at_estimate));
+ timeout = max(timeout, ldlm_timeout);
+
+ /* lock is being cancelled, prolong timeout */
+ ldlm_refresh_waiting_lock(lock, timeout);
+ }
+ LDLM_LOCK_PUT(lock);
+ }
+
+ /* extract pool & quota information from global index FID packed in the
+ * request */
+ rc = lquota_extract_fid(&qbody->qb_fid, &pool_id, &pool_type, &qtype);
+ if (rc)
+ RETURN(-EINVAL);
+
+ /* Find the quota entry associated with the quota id */
+ lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
+ &qbody->qb_id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+
+ /* process quota request */
+ rc = qmt_dqacq0(env, lqe, qmt, uuid, qbody->qb_flags, qbody->qb_count,
+ qbody->qb_usage, repbody);
+
+ if (lustre_handle_is_used(&qbody->qb_lockh))
+ /* return current qunit value only to slaves owning an per-ID
+ * quota lock. For enqueue, the qunit value will be returned in
+ * the LVB */
+ repbody->qb_qunit = lqe->lqe_qunit;
+ lqe_putref(lqe);
+ RETURN(rc);
}
/* Vector of quota request handlers. This vector is used by the MDT to forward
/* number of slaves registered for each quota types */
int qpi_slv_nr[MAXQUOTAS];
+ /* reference on lqe (ID 0) storing grace time. */
+ struct lquota_entry *qpi_grace_lqe[MAXQUOTAS];
+
/* procfs root directory for this pool */
cfs_proc_dir_entry_t *qpi_proc;
struct qmt_thread_info {
union lquota_rec qti_rec;
union lquota_id qti_id;
- union lquota_id qti_id_bis;
char qti_buf[MTI_NAME_MAXLEN];
struct lu_fid qti_fid;
struct ldlm_res_id qti_resid;
union ldlm_gl_desc qti_gl_desc;
struct quota_body qti_body;
- struct quota_body qti_repbody;
struct qmt_lqe_restore qti_restore;
};
#define LQE_ROOT(lqe) (lqe2qpi(lqe)->qpi_root)
#define LQE_GLB_OBJ(lqe) (lqe2qpi(lqe)->qpi_glb_obj[lqe->lqe_site->lqs_qtype])
+/* helper function returning grace time to use for a given lquota entry */
+static inline __u64 qmt_lqe_grace(struct lquota_entry *lqe)
+{
+ struct qmt_pool_info *pool = lqe2qpi(lqe);
+ struct lquota_entry *grace_lqe;
+
+ grace_lqe = pool->qpi_grace_lqe[lqe->lqe_site->lqs_qtype];
+ LASSERT(grace_lqe != NULL);
+
+ return grace_lqe->lqe_gracetime;
+}
+
static inline void qmt_restore(struct lquota_entry *lqe,
struct qmt_lqe_restore *restore)
{
lqe->lqe_qunit = restore->qlr_qunit;
}
+#define QMT_GRANT(lqe, slv, cnt) \
+ do { \
+ (lqe)->lqe_granted += (cnt); \
+ (slv) += (cnt); \
+ } while (0)
+#define QMT_REL(lqe, slv, cnt) \
+ do { \
+ (lqe)->lqe_granted -= (cnt); \
+ (slv) -= (cnt); \
+ } while (0)
+
+/* helper routine returning true when the id has run out of quota space, which
+ * means that it has either:
+ * - reached hardlimit
+ * OR
+ * - reached softlimit and grace time expired already */
+static inline bool qmt_space_exhausted(struct lquota_entry *lqe, __u64 now)
+{
+ if (lqe->lqe_hardlimit != 0 && lqe->lqe_granted >= lqe->lqe_hardlimit)
+ return true;
+ if (lqe->lqe_softlimit != 0 && lqe->lqe_granted > lqe->lqe_softlimit &&
+ lqe->lqe_gracetime != 0 && now >= lqe->lqe_gracetime)
+ return true;
+ return false;
+}
+
+/* number of seconds to wait for slaves to release quota space after
+ * rebalancing */
+#define QMT_REBA_TIMEOUT 2
+
/* qmt_pool.c */
void qmt_pool_fini(const struct lu_env *, struct qmt_device *);
int qmt_pool_init(const struct lu_env *, struct qmt_device *);
int qmt_slv_read(const struct lu_env *, struct lquota_entry *,
struct dt_object *, __u64 *);
int qmt_validate_limits(struct lquota_entry *, __u64, __u64);
+void qmt_adjust_qunit(const struct lu_env *, struct lquota_entry *);
+void qmt_adjust_edquot(struct lquota_entry *, __u64);
+void qmt_revalidate(const struct lu_env *, struct lquota_entry *);
+__u64 qmt_alloc_expand(struct lquota_entry *, __u64, __u64);
+
+/* qmt_handler.c */
+int qmt_dqacq0(const struct lu_env *, struct lquota_entry *,
+ struct qmt_device *, struct obd_uuid *, __u32, __u64, __u64,
+ struct quota_body *);
/* qmt_lock.c */
int qmt_intent_policy(const struct lu_env *, struct lu_device *,
struct quota_body *repbody;
struct obd_uuid *uuid;
struct lquota_lvb *lvb;
+ struct ldlm_resource *res = (*lockp)->l_resource;
int rc;
ENTRY;
uuid = &(*lockp)->l_export->exp_client_uuid;
switch (it->opc) {
- case IT_QUOTA_DQACQ:
- /* XXX: to be added in a next patch */
- GOTO(out, -EOPNOTSUPP);
+ case IT_QUOTA_DQACQ: {
+ struct lquota_entry *lqe;
+ struct ldlm_lock *lock;
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+ /* acquire on global lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
+ /* verify global lock isn't stale */
+ if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+ GOTO(out, rc = -ENOLCK);
+
+ lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+ if (lock == NULL)
+ GOTO(out, rc = -ENOLCK);
+ LDLM_LOCK_PUT(lock);
+
+ lqe = res->lr_lvb_data;
+ LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ /* acquire quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
+ reqbody->qb_count, reqbody->qb_usage,
+ repbody);
+ lqe_putref(lqe);
+ if (rc)
+ GOTO(out, rc);
break;
+ }
case IT_QUOTA_CONN:
/* new connection from slave */
+
+ if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+ /* connection on per-ID lock? something is wrong ... */
+ GOTO(out, rc = -EPROTO);
+
rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
&repbody->qb_slv_fid,
&repbody->qb_slv_ver, uuid);
struct qmt_device *qmt = lu2qmt_dev(ld);
struct lquota_entry *lqe;
struct lquota_lvb *lvb;
+ struct ldlm_lock *lock;
+ struct obd_export *exp;
int rc = 0;
ENTRY;
/* no need to update lvb for global quota locks */
RETURN(0);
+ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+ if (lvb == NULL) {
+ CERROR("%s: failed to extract lvb from request\n",
+ qmt->qmt_svname);
+ RETURN(-EFAULT);
+ }
+
lqe = res->lr_lvb_data;
LASSERT(lqe != NULL);
+ lqe_getref(lqe);
+
+ LQUOTA_DEBUG(lqe, "releasing:"LPU64" may release:"LPU64,
+ lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+ if (lvb->lvb_id_rel == 0) {
+ /* nothing to release */
+ if (lvb->lvb_id_may_rel != 0)
+ /* but might still release later ... */
+ lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+ GOTO(out_lqe, rc = 0);
+ }
/* allocate environement */
OBD_ALLOC_PTR(env);
if (env == NULL)
- RETURN(-ENOMEM);
+ GOTO(out_lqe, rc = -ENOMEM);
/* initialize environment */
rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc) {
- OBD_FREE_PTR(env);
- RETURN(rc);
- }
+ if (rc)
+ GOTO(out_env, rc);
qti = qmt_info(env);
- lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
- if (lvb == NULL) {
- CERROR("%s: failed to extract lvb from request\n",
+ /* The request is a glimpse callback which was sent via the
+ * reverse import to the slave. What we care about here is the
+ * export associated with the slave and req->rq_export is
+ * definitely not what we are looking for (it is actually set to
+ * NULL here).
+ * Therefore we extract the lock from the request argument
+ * and use lock->l_export. */
+ lock = ldlm_request_lock(req);
+ if (IS_ERR(lock)) {
+ CERROR("%s: failed to get lock from request!\n",
qmt->qmt_svname);
- GOTO(out, rc);
+ GOTO(out_env_init, rc = PTR_ERR(lock));
}
- /* XXX: Space release handling to be added in a next patch */
+ exp = class_export_get(lock->l_export);
+ if (exp == NULL) {
+ CERROR("%s: failed to get export from lock!\n",
+ qmt->qmt_svname);
+ GOTO(out_env_init, rc = -EFAULT);
+ }
+ /* release quota space */
+ rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
+ QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
+ if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+ LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
+ LPU64"!="LPU64" rc:%d\n", qti->qti_body.qb_count,
+ lvb->lvb_id_rel, rc);
+ class_export_put(exp);
+ if (rc)
+ GOTO(out_env_init, rc);
EXIT;
-out:
+out_env_init:
lu_env_fini(env);
+out_env:
OBD_FREE_PTR(env);
+out_lqe:
+ lqe_putref(lqe);
return rc;
}
* opportunity to enqueue quota lock yet. */
LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
"lock "DFID, PFID(&qti->qti_fid));
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ lqe->lqe_qunit == pool->qpi_least_qunit)
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ lqe_write_unlock(lqe);
RETURN_EXIT;
}
- lqe_read_lock(lqe);
+ lqe_write_lock(lqe);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
else
qti->qti_gl_desc.lquota_desc.gl_flags = 0;
qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
- lqe_read_unlock(lqe);
+
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+ /* reset lqe_may_rel, it will be updated on glimpse callback
+ * replies if needed */
+ lqe->lqe_may_rel = 0;
/* The rebalance thread is the only thread which can issue glimpses */
LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
+ lqe_write_unlock(lqe);
/* issue glimpse callback to slaves */
rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+ lqe_write_lock(lqe);
+ if (lqe->lqe_revoke_time == 0 &&
+ qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
+ lqe->lqe_qunit == pool->qpi_least_qunit) {
+ lqe->lqe_revoke_time = cfs_time_current_64();
+ qmt_adjust_edquot(lqe, cfs_time_current_sec());
+ }
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
+ lqe_write_unlock(lqe);
ldlm_resource_putref(res);
EXIT;
/* release per-quota type site used to manage quota entries as well as
* references to global index files */
for (qtype = 0; qtype < MAXQUOTAS; qtype++) {
+ /* release lqe storing grace time */
+ if (pool->qpi_grace_lqe[qtype] != NULL)
+ lqe_putref(pool->qpi_grace_lqe[qtype]);
+
/* release site */
if (pool->qpi_site[qtype] != NULL &&
!IS_ERR(pool->qpi_site[qtype]))
/* iterate over each pool in the hash and allocate a quota site for each
* one. This involves creating a global index file on disk */
cfs_list_for_each(pos, &qmt->qmt_pool_list) {
- struct dt_object *obj;
- int pool_type, pool_id;
+ struct dt_object *obj;
+ int pool_type, pool_id;
+ struct lquota_entry *lqe;
pool = cfs_list_entry(pos, struct qmt_pool_info,
qpi_linkage);
qmt->qmt_svname, QTYPE_NAME(qtype), rc);
RETURN(rc);
}
+
+ /* Global grace time is stored in quota settings of
+ * ID 0. */
+ qti->qti_id.qid_uid = 0;
+
+ /* look-up quota entry storing grace time */
+ lqe = lqe_locate(env, pool->qpi_site[qtype],
+ &qti->qti_id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+ pool->qpi_grace_lqe[qtype] = lqe;
#ifdef LPROCFS
/* add procfs file to dump the global index, mostly for
* debugging purpose */
if (IS_ERR(pool))
RETURN((void *)pool);
+ if (qid->qid_uid == 0) {
+ /* caller wants to access grace time, no need to look up the
+ * entry since we keep a reference on ID 0 all the time */
+ lqe = pool->qpi_grace_lqe[qtype];
+ lqe_getref(lqe);
+ GOTO(out, 0);
+ }
+
/* now that we have the pool, let's look-up the quota entry in the
* right quota site */
lqe = lqe_locate(env, pool->qpi_site[qtype], qid);
-
+out:
qpi_putref(env, pool);
RETURN(lqe);
}