* GPL HEADER END
*/
/*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
* Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
-#include <linux/version.h>
-#include <linux/fs.h>
-#include <asm/unistd.h>
-#include <linux/quotaops.h>
-#include <linux/init.h>
-
-#include <obd_class.h>
-#include <lustre_param.h>
-#include <lprocfs_status.h>
-
#include "qsd_internal.h"
-/*
- * helper function returning how much space is currently reserved for requests
- * in flight.
+/**
+ * helper function bumping lqe_pending_req if there is no quota request in
+ * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned.
*/
-static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
+static inline int qsd_request_enter(struct lquota_entry *lqe)
{
- int pending;
+ /* is there already a quota request in flight? */
+ if (lqe->lqe_pending_req != 0) {
+ LQUOTA_DEBUG(lqe, "already a request in flight");
+ return -EBUSY;
+ }
- lqe_read_lock(lqe);
- pending = lqe->lqe_pending_req;
- lqe_read_unlock(lqe);
+ if (lqe->lqe_pending_rel != 0) {
+ LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
+ lqe->lqe_pending_rel);
+ LBUG();
+ }
- return pending;
+ lqe->lqe_pending_req++;
+ return 0;
}
-/*
- * helper function returning true when the connection to master is ready to be
- * used.
+/**
+ * Companion of qsd_request_enter() dropping lqe_pending_req to 0.
*/
-static inline int qsd_ready(struct qsd_instance *qsd)
+static inline void qsd_request_exit(struct lquota_entry *lqe)
{
+ if (lqe->lqe_pending_req != 1) {
+ LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!");
+ LBUG();
+ }
+ lqe->lqe_pending_req--;
+ lqe->lqe_pending_rel = 0;
+ cfs_waitq_broadcast(&lqe->lqe_waiters);
+}
+
+/**
+ * Check whether a qsd instance is all set to send quota request to master.
+ * This includes checking whether:
+ * - the connection to master is set up and usable,
+ * - the qsd isn't stopping
+ * - reintegration has been successfully completed and all indexes are
+ * up-to-date
+ *
+ * \param lqe - is the lquota entry for which we would like to send an quota
+ * request
+ * \param lockh - is the remote handle of the global lock returned on success
+ *
+ * \retval 0 on success, appropriate error on failure
+ */
+static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
+{
+ struct qsd_qtype_info *qqi = lqe2qqi(lqe);
+ struct qsd_instance *qsd = qqi->qqi_qsd;
struct obd_import *imp = NULL;
+ struct ldlm_lock *lock;
+ ENTRY;
+
+ read_lock(&qsd->qsd_lock);
+ /* is the qsd about to shut down? */
+ if (qsd->qsd_stopping) {
+ read_unlock(&qsd->qsd_lock);
+ LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
+ /* Target is about to shut down, client will retry */
+ RETURN(-EINPROGRESS);
+ }
- cfs_read_lock(&qsd->qsd_lock);
+ /* is the connection to the quota master ready? */
if (qsd->qsd_exp_valid)
imp = class_exp2cliimp(qsd->qsd_exp);
- cfs_read_unlock(&qsd->qsd_lock);
+ if (imp == NULL || imp->imp_invalid) {
+ read_unlock(&qsd->qsd_lock);
+ LQUOTA_DEBUG(lqe, "connection to master not ready");
+ RETURN(-ENOTCONN);
+ }
+
+ /* In most case, reintegration must have been triggered (when enable
+ * quota or on OST start), however, in rare race condition (enabling
+ * quota when starting OSTs), we might miss triggering reintegration
+ * for some qqi.
+ *
+ * If the previous reintegration failed for some reason, we'll
+ * re-trigger it here as well. */
+ if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
+ read_unlock(&qsd->qsd_lock);
+ LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
+ "kicking off reintegration");
+ qsd_start_reint_thread(qqi);
+ RETURN(-EINPROGRESS);
+ }
- return (imp == NULL || imp->imp_invalid) ? false : true;
+ /* Fill the remote global lock handle, master will check this handle
+ * to see if the slave is sending request with stale lock */
+ lustre_handle_copy(lockh, &qqi->qqi_lockh);
+ read_unlock(&qsd->qsd_lock);
+
+ if (!lustre_handle_is_used(lockh))
+ RETURN(-ENOLCK);
+
+ lock = ldlm_handle2lock(lockh);
+ if (lock == NULL)
+ RETURN(-ENOLCK);
+
+ /* return remote lock handle to be packed in quota request */
+ lustre_handle_copy(lockh, &lock->l_remote_handle);
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(0);
}
-/*
- * Helper function returning true when quota space need to be adjusted (some
- * unused space should be free or pre-acquire) and false otherwise.
+/**
+ * Check whether any quota space adjustment (pre-acquire/release/report) is
+ * needed for a given quota ID. If a non-null \a qbody is passed, then the
+ * \a qbody structure (qb_count/flags/usage) is filled with appropriate data
+ * to be packed in the quota request.
+ *
+ * \param lqe - is the lquota entry for which we would like to adjust quota
+ * space.
+ * \param qbody - is the quota body to fill, if not NULL.
+ *
+ * \retval true - space adjustment is required and \a qbody is filled, if not
+ * NULL
+ * \retval false - no space adjustment required
*/
-static bool qsd_adjust_needed(struct lquota_entry *lqe)
+static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody)
{
- struct qsd_qtype_info *qqi;
- __u64 usage, granted;
+ __u64 usage, granted;
+ ENTRY;
- qqi = lqe2qqi(lqe);
+ usage = lqe->lqe_usage;
+ usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
+ granted = lqe->lqe_granted;
- if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
- /* if quota isn't enforced for this id, no need to adjust
- * Similarly, no need to perform adjustment if the target is in
- * the process of shutting down. */
- return false;
+ if (qbody != NULL)
+ qbody->qb_flags = 0;
+
+ if (!lqe->lqe_enforced) {
+ /* quota not enforced any more for this ID */
+ if (granted != 0) {
+ /* release all quota space unconditionally */
+ LQUOTA_DEBUG(lqe, "not enforced, releasing all space");
+ if (qbody != NULL) {
+ qbody->qb_count = granted;
+ qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+ }
+ RETURN(true);
+ }
+ RETURN(false);
+ }
- usage = lqe->lqe_usage;
- usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
- granted = lqe->lqe_granted - lqe->lqe_pending_rel;
+ if (!lustre_handle_is_used(&lqe->lqe_lockh)) {
+ /* No valid per-ID lock
+ * When reporting quota (during reintegration or on setquota
+ * glimpse), we should release granted space if usage is 0.
+ * Otherwise, if the usage is less than granted, we need to
+ * acquire the per-ID lock to make sure the unused grant can be
+ * reclaimed by per-ID lock glimpse. */
+ if (usage == 0) {
+ /* no on-disk usage and no outstanding activity, release
+ * space */
+ if (granted != 0) {
+ LQUOTA_DEBUG(lqe, "no usage, releasing all "
+ "space");
+ if (qbody != NULL) {
+ qbody->qb_count = granted;
+ qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+ }
+ RETURN(true);
+ }
+ LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to "
+ "do");
+ RETURN(false);
+ }
- /* need to re-acquire per-ID lock or release all grant */
- if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
- lqe->lqe_granted > lqe->lqe_usage)
- return true;
+ if (lqe->lqe_usage < lqe->lqe_granted) {
+ /* holding quota space w/o any lock, enqueue per-ID lock
+ * again */
+ LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock");
+ if (qbody != NULL) {
+ qbody->qb_count = 0;
+ qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
+ }
+ RETURN(true);
+ }
- /* good old quota qunit adjustment logic which has been around since
- * lustre 1.4:
- * 1. Need to release some space? */
- if (granted > usage + lqe->lqe_qunit)
- return true;
+ if (lqe->lqe_usage > lqe->lqe_granted) {
+ /* quota overrun, report usage */
+ LQUOTA_DEBUG(lqe, "overrun, reporting usage");
+ if (qbody != NULL) {
+ qbody->qb_usage = lqe->lqe_usage;
+ qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+ }
+ RETURN(true);
+ }
+ LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do");
+ RETURN(false);
+ }
+
+ /* valid per-ID lock
+ * Apply good old quota qunit adjustment logic which has been around
+ * since lustre 1.4:
+ * 1. release spare quota space? */
+ if (granted > usage + lqe->lqe_qunit) {
+ /* pre-release quota space */
+ if (qbody == NULL)
+ RETURN(true);
+ qbody->qb_count = granted - usage;
+ /* if usage == 0, release all granted space */
+ if (usage) {
+ /* try to keep one qunit of quota space */
+ qbody->qb_count -= lqe->lqe_qunit;
+ /* but don't release less than qtune to avoid releasing
+ * space too often */
+ if (qbody->qb_count < lqe->lqe_qtune)
+ qbody->qb_count = lqe->lqe_qtune;
+ }
+ qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+ RETURN(true);
+ }
/* 2. Any quota overrun? */
- if (lqe->lqe_usage > lqe->lqe_granted)
- /* we ended up consuming more than we own, we need to have this
- * fixed ASAP */
- return true;
+ if (lqe->lqe_usage > lqe->lqe_granted) {
+ /* we overconsumed quota space, we report usage in request so
+ * that master can adjust it unconditionally */
+ if (qbody == NULL)
+ RETURN(true);
+ qbody->qb_usage = lqe->lqe_usage;
+ granted = lqe->lqe_usage;
+ qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+ }
/* 3. Time to pre-acquire? */
- if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
- granted < usage + lqe->lqe_qtune)
- /* need to pre-acquire some space if we don't want to block
- * client's requests */
- return true;
+ if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 &&
+ lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
+ /* To pre-acquire quota space, we report how much spare quota
+ * space the slave currently owns, then the master will grant us
+ * back how much we can pretend given the current state of
+ * affairs */
+ if (qbody == NULL)
+ RETURN(true);
+ if (granted <= usage)
+ qbody->qb_count = 0;
+ else
+ qbody->qb_count = granted - usage;
+ qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
+ RETURN(true);
+ }
- return false;
+ if (qbody != NULL)
+ RETURN(qbody->qb_flags != 0);
+ else
+ RETURN(false);
}
-/*
+/**
+ * Helper function returning true when quota space need to be adjusted (some
+ * unused space should be free or pre-acquire) and false otherwise.
+ */
+static inline bool qsd_adjust_needed(struct lquota_entry *lqe)
+{
+ return qsd_calc_adjust(lqe, NULL);
+}
+
+/**
* Callback function called when an acquire/release request sent to the master
* is completed
*/
-static void qsd_dqacq_completion(const struct lu_env *env,
- struct qsd_qtype_info *qqi,
- struct quota_body *reqbody,
- struct quota_body *repbody,
- struct lustre_handle *lockh,
- union ldlm_wire_lvb *lvb,
- void *arg, int ret)
+static void qsd_req_completion(const struct lu_env *env,
+ struct qsd_qtype_info *qqi,
+ struct quota_body *reqbody,
+ struct quota_body *repbody,
+ struct lustre_handle *lockh,
+ struct lquota_lvb *lvb,
+ void *arg, int ret)
{
struct lquota_entry *lqe = (struct lquota_entry *)arg;
struct qsd_thread_info *qti;
qti = qsd_info(env);
lqe_write_lock(lqe);
-
- LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:%x", ret,
+ LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret,
reqbody->qb_flags);
/* despite -EDQUOT & -EINPROGRESS errors, the master might still
if (ret != -ETIMEDOUT && ret != -ENOTCONN &&
ret != -ESHUTDOWN && ret != -EAGAIN)
/* print errors only if return code is unexpected */
- LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:%x", ret,
- reqbody->qb_flags);
+ LQUOTA_ERROR(lqe, "DQACQ failed with %d, flags:0x%x",
+ ret, reqbody->qb_flags);
GOTO(out, ret);
}
/* extract information from lvb */
if (ret == 0 && lvb != 0) {
- if (lvb->l_lquota.lvb_id_qunit != 0)
- qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
- if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
+ if (lvb->lvb_id_qunit != 0)
+ qsd_set_qunit(lqe, lvb->lvb_id_qunit);
+ if (lvb->lvb_flags & LQUOTA_FL_EDQUOT)
lqe->lqe_edquot = true;
else
lqe->lqe_edquot = false;
lqe->lqe_nopreacq = true;
out:
adjust = qsd_adjust_needed(lqe);
+ if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
+ lqe->lqe_acq_rc = ret;
+ lqe->lqe_acq_time = cfs_time_current_64();
+ }
out_noadjust:
- lqe->lqe_pending_req--;
- lqe->lqe_pending_rel = 0;
+ qsd_request_exit(lqe);
lqe_write_unlock(lqe);
- cfs_waitq_broadcast(&lqe->lqe_waiters);
-
/* release reference on per-ID lock */
if (lustre_handle_is_used(lockh))
ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
else
qsd_adjust_schedule(lqe, true, false);
}
+ lqe_putref(lqe);
if (lvb)
- /* free lvb allocated in qsd_dqacq */
OBD_FREE_PTR(lvb);
-
- lqe_putref(lqe);
EXIT;
}
+/**
+ * Try to consume local quota space.
+ *
+ * \param lqe - is the qid entry to be processed
+ * \param space - is the amount of quota space needed to complete the operation
+ *
+ * \retval 0 - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EAGAIN - need to acquire space from master
+ */
static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
{
__u64 usage;
if (!lqe->lqe_enforced)
/* not enforced any more, we are good */
- RETURN(0);
+ RETURN(-ESRCH);
lqe_write_lock(lqe);
/* use latest usage */
RETURN(rc);
}
-static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
- struct quota_body *qbody)
+/**
+ * Compute how much quota space should be acquire from the master based
+ * on how much is currently granted to this slave and pending/waiting
+ * operations.
+ *
+ * \param lqe - is the lquota entry for which we would like to adjust quota
+ * space.
+ * \param qbody - is the quota body of the acquire request to fill
+ *
+ * \retval true - space acquisition is needed and qbody is filled
+ * \retval false - no space acquisition required
+ */
+static inline bool qsd_calc_acquire(struct lquota_entry *lqe,
+ struct quota_body *qbody)
{
- struct qsd_qtype_info *qqi;
- __u64 usage, granted;
-
- if (!lqe->lqe_enforced && op != QSD_REL)
- return 0;
-
- qqi = lqe2qqi(lqe);
+ __u64 usage, granted;
- LASSERT(lqe->lqe_pending_rel == 0);
usage = lqe->lqe_usage;
usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
granted = lqe->lqe_granted;
qbody->qb_flags = 0;
-again:
- switch (op) {
- case QSD_ACQ:
- /* if we overconsumed quota space, we report usage in request
- * so that master can adjust it unconditionally */
- if (lqe->lqe_usage > lqe->lqe_granted) {
- qbody->qb_usage = lqe->lqe_usage;
- qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
- granted = lqe->lqe_usage;
- }
- /* acquire as much as needed, but not more */
- if (usage > granted) {
- qbody->qb_count = usage - granted;
- qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
- }
- break;
- case QSD_REP:
- /* When reporting quota (during reintegration or on setquota
- * glimpse), we should release granted space if usage is 0.
- * Otherwise, if the usage is less than granted, we need to
- * acquire the per-ID lock to make sure the unused grant can be
- * reclaimed by per-ID lock glimpse. */
- if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
- LQUOTA_DEBUG(lqe, "Release on report!");
- GOTO(again, op = QSD_REL);
- } else if (lqe->lqe_usage == lqe->lqe_granted) {
- LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
- "anything on report!");
- } else if (lqe->lqe_usage < lqe->lqe_granted) {
- LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
- qbody->qb_count = 0;
- qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
- } else {
- LASSERT(lqe->lqe_usage > lqe->lqe_granted);
- LQUOTA_DEBUG(lqe, "Reporting usage");
- qbody->qb_usage = lqe->lqe_usage;
- qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
- }
- break;
- case QSD_REL:
- /* release unused quota space unconditionally */
- if (lqe->lqe_granted > lqe->lqe_usage) {
- qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
- qbody->qb_flags = QUOTA_DQACQ_FL_REL;
- }
- break;
- case QSD_ADJ: {
- /* need to re-acquire per-ID lock or release all grant */
- if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
- lqe->lqe_granted > lqe->lqe_usage)
- GOTO(again, op = QSD_REP);
-
- /* release spare grant */
- if (granted > usage + lqe->lqe_qunit) {
- /* pre-release quota space */
- qbody->qb_count = granted - usage;
- /* if usage == 0, release all granted space */
- if (usage) {
- /* try to keep one qunit of quota space */
- qbody->qb_count -= lqe->lqe_qunit;
- /* but don't release less than qtune to avoid
- * releasing space too often */
- if (qbody->qb_count < lqe->lqe_qtune)
- qbody->qb_count = lqe->lqe_qtune;
- }
- qbody->qb_flags = QUOTA_DQACQ_FL_REL;
- break;
- }
- /* if we overconsumed quota space, we report usage in request
- * so that master can adjust it unconditionally */
- if (lqe->lqe_usage > lqe->lqe_granted) {
- qbody->qb_usage = lqe->lqe_usage;
- qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
- granted = lqe->lqe_usage;
- }
-
- if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
- lustre_handle_is_used(&lqe->lqe_lockh) &&
- lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
- /* To pre-acquire quota space, we report how much spare
- * quota space the slave currently owns, then the master
- * will grant us back how much we can pretend given the
- * current state of affairs */
- if (granted <= usage)
- qbody->qb_count = 0;
- else
- qbody->qb_count = granted - usage;
- qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
- }
- break;
+ /* if we overconsumed quota space, we report usage in request so that
+ * master can adjust it unconditionally */
+ if (lqe->lqe_usage > lqe->lqe_granted) {
+ qbody->qb_usage = lqe->lqe_usage;
+ qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+ granted = lqe->lqe_usage;
}
- default:
- CERROR("Invalid qsd operation:%u\n", op);
- LBUG();
- break;
+
+ /* acquire as much as needed, but not more */
+ if (usage > granted) {
+ qbody->qb_count = usage - granted;
+ qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
}
+
return qbody->qb_flags != 0;
}
-/*
- * Acquire/release quota space from master.
+/**
+ * Acquire quota space from master.
* There are at most 1 in-flight dqacq/dqrel.
*
* \param env - the environment passed by the caller
* \param lqe - is the qid entry to be processed
- * \param op - operation that want to be performed by the caller
*
- * \retval 0 - success
- * \retval -EDQUOT : out of quota
- * -EINPROGRESS : inform client to retry write/create
- * -ve : other appropriate errors
+ * \retval 0 - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EINPROGRESS - inform client to retry write/create
+ * \retval -EBUSY - already a quota request in flight
+ * \retval -ve - other appropriate errors
*/
-int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
- enum qsd_ops op)
+static int qsd_acquire_remote(const struct lu_env *env,
+ struct lquota_entry *lqe)
{
struct qsd_thread_info *qti = qsd_info(env);
struct quota_body *qbody = &qti->qti_body;
struct qsd_instance *qsd;
struct qsd_qtype_info *qqi;
- struct ldlm_lock *lock;
int rc;
- bool intent = false, sync;
ENTRY;
+ memset(qbody, 0, sizeof(*qbody));
+ rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+ if (rc)
+ RETURN(rc);
+
qqi = lqe2qqi(lqe);
qsd = qqi->qqi_qsd;
- if (qsd->qsd_stopping) {
- LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
- /* Target is about to shut down, client will retry */
- RETURN(-EINPROGRESS);
- }
-
- if (!qsd_ready(qsd)) {
- LQUOTA_DEBUG(lqe, "Connection to master not ready");
- RETURN(-ENOTCONN);
- }
-
- /* In most case, reintegration must have been triggered (when enable
- * quota or on OST start), however, in rare race condition (enabling
- * quota when starting OSTs), we might miss triggering reintegration
- * for some qqi.
- *
- * If the previous reintegration failed for some reason, we'll
- * re-trigger it here as well. */
- if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
- LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
- " off reintegration");
- qsd_start_reint_thread(qqi);
- RETURN(-EINPROGRESS);
- }
-
- LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
-
- /* Fill the remote global lock handle, master will check this handle
- * to see if the slave is sending request with stale lock */
- cfs_read_lock(&qsd->qsd_lock);
- lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
- cfs_read_unlock(&qsd->qsd_lock);
-
- if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
- RETURN(-ENOLCK);
-
- lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
- if (lock == NULL)
- RETURN(-ENOLCK);
- lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
- LDLM_LOCK_PUT(lock);
-
- /* We allow only one in-flight dqacq/dqrel for specified qid, if
- * there is already in-flight dqacq/dqrel:
- *
- * - For QSD_ADJ: we should just abort it, since local limit is going
- * to be changed soon;
- * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
- * finished, and return success to the caller. The caller is
- * responsible for retrying;
- * - For QSD_REP: we should just abort it, since slave has already
- * acquired/released grant; */
- sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
- LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
- lqe->lqe_pending_req);
-
lqe_write_lock(lqe);
- if (lqe->lqe_pending_req != 0) {
- struct l_wait_info lwi = { 0 };
+ /* is quota really enforced for this id? */
+ if (!lqe->lqe_enforced) {
lqe_write_unlock(lqe);
- if (!sync) {
- LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
- RETURN(0);
- }
-
- LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
- l_wait_event(lqe->lqe_waiters,
- !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
- &lwi);
+ LQUOTA_DEBUG(lqe, "quota not enforced any more");
RETURN(0);
}
/* fill qb_count & qb_flags */
- if (!qsd_calc_space(lqe, op, qbody)) {
+ if (!qsd_calc_acquire(lqe, qbody)) {
lqe_write_unlock(lqe);
- LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
+ LQUOTA_DEBUG(lqe, "No acquire required");
RETURN(0);
}
- lqe->lqe_pending_req++;
- lqe_write_unlock(lqe);
- /* fill other quota body fields */
- qbody->qb_fid = qqi->qqi_fid;
- qbody->qb_id = lqe->lqe_id;
- memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
- memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+ /* check whether an acquire request completed recently */
+ if (lqe->lqe_acq_rc != 0 &&
+ cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+ lqe_write_unlock(lqe);
+ LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
+ RETURN(lqe->lqe_acq_rc);
+ }
+
+ /* only 1 quota request in flight for a given ID is allowed */
+ rc = qsd_request_enter(lqe);
+ if (rc) {
+ lqe_write_unlock(lqe);
+ RETURN(rc);
+ }
+
+ lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+ lqe_write_unlock(lqe);
/* hold a refcount until completion */
lqe_getref(lqe);
- if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
- /* check whether we already own a lock for this ID */
- lqe_read_lock(lqe);
- lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
- lqe_read_unlock(lqe);
-
- rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
- if (rc) {
- memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
- if (req_is_preacq(qbody->qb_flags)) {
- if (req_has_rep(qbody->qb_flags))
- /* still want to report usage */
- qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
- else
- /* no pre-acquire if no per-ID lock */
- GOTO(out, rc = -ENOLCK);
- } else {
- /* no lock found, should use intent */
- intent = true;
- }
- } else if (req_is_acq(qbody->qb_flags) &&
- qbody->qb_count == 0) {
- /* found cached lock, no need to acquire */
- GOTO(out, rc = 0);
- }
- }
+ /* fill other quota body fields */
+ qbody->qb_fid = qqi->qqi_fid;
+ qbody->qb_id = lqe->lqe_id;
- if (!intent) {
- rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
- qsd_dqacq_completion, qqi, &qti->qti_lockh,
- lqe);
- } else {
- union ldlm_wire_lvb *lvb;
+ /* check whether we already own a valid lock for this ID */
+ rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+ if (rc) {
+ struct lquota_lvb *lvb;
OBD_ALLOC_PTR(lvb);
- if (lvb == NULL)
- GOTO(out, rc = -ENOMEM);
-
- rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
- IT_QUOTA_DQACQ, qsd_dqacq_completion,
+ if (lvb == NULL) {
+ rc = -ENOMEM;
+ qsd_req_completion(env, qqi, qbody, NULL,
+ &qti->qti_lockh, NULL, lqe, rc);
+ RETURN(rc);
+ }
+ /* no lock found, should use intent */
+ rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true,
+ IT_QUOTA_DQACQ, qsd_req_completion,
qqi, lvb, (void *)lqe);
+ } else {
+ /* lock found, should use regular dqacq */
+ rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true,
+ qsd_req_completion, qqi, &qti->qti_lockh,
+ lqe);
}
+
/* the completion function will be called by qsd_send_dqacq or
* qsd_intent_lock */
RETURN(rc);
-out:
- qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
- rc);
- return rc;
}
-/*
+/**
+ * Acquire \a space of quota space in order to complete an operation.
+ * Try to consume local quota space first and send acquire request to quota
+ * master if required.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the qid entry to be processed
+ * \param space - is the amount of quota required for the operation
+ * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
+ *
+ * \retval true - exit from l_wait_event and real return value in \a ret
+ * \retval false - continue waiting
+ */
+static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
+ long long space, int *ret)
+{
+ int rc = 0, count;
+ ENTRY;
+
+ for (count = 0; rc == 0; count++) {
+ LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
+
+ if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
+ rc = -EINPROGRESS;
+ break;
+ }
+
+ /* refresh disk usage */
+ rc = qsd_refresh_usage(env, lqe);
+ if (rc)
+ break;
+
+ /* try to consume local quota space first */
+ rc = qsd_acquire_local(lqe, space);
+ if (rc != -EAGAIN)
+ /* rc == 0, Wouhou! enough local quota space
+ * rc < 0, something bad happened */
+ break;
+
+ /* need to acquire more quota space from master */
+ rc = qsd_acquire_remote(env, lqe);
+ }
+
+ if (rc == -EBUSY)
+ /* already a request in flight, continue waiting */
+ RETURN(false);
+ *ret = rc;
+ RETURN(true); /* exit from l_wait_event */
+}
+
+/**
* Quota enforcement handler. If local quota can satisfy this operation,
* return success, otherwise, acquire more quota from master.
* (for write operation, if master isn't available at this moment, return
* \param flags - if the operation is write, return caller no user/group
* and sync commit flags
*
- * \retval 0 - success
- * \retval -EDQUOT : out of quota
- * -EINPROGRESS : inform client to retry write
- * -ve : other appropriate errors
+ * \retval 0 - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve - other appropriate errors
*/
static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
struct lquota_id_info *qid, long long space,
int *flags)
{
- struct lquota_entry *lqe;
- int rc = 0, retry_cnt;
+ struct lquota_entry *lqe;
+ int rc, ret = -EINPROGRESS;
+ struct l_wait_info lwi;
ENTRY;
if (qid->lqi_qentry != NULL) {
lqe->lqe_waiting_write += space;
lqe_write_unlock(lqe);
- for (retry_cnt = 0; rc == 0; retry_cnt++) {
- /* refresh disk usage if required */
- rc = qsd_refresh_usage(env, lqe);
- if (rc)
- break;
+ /* acquire quota space for the operation, cap overall wait time to
+ * prevent a service thread from being stuck for too long */
+ lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
+ NULL, NULL);
+ rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+ &lwi);
- /* try to consume local quota space */
- rc = qsd_acquire_local(lqe, space);
- if (rc != -EAGAIN)
- /* rc == 0, Wouhou! enough local quota space
- * rc < 0, something bad happened */
- break;
-
- /* need to acquire more quota space from master, this is done
- * synchronously */
- rc = qsd_dqacq(env, lqe, QSD_ACQ);
- LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
- retry_cnt, rc);
- }
-
- if (rc == 0) {
+ if (rc == 0 && ret == 0) {
qid->lqi_space += space;
} else {
- LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
+ if (rc == 0)
+ rc = ret;
+
+ LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
lqe_write_lock(lqe);
lqe->lqe_waiting_write -= space;
*flags |= QUOTA_FL_SYNC;
lqe_write_unlock(lqe);
- /* convert recoverable error into -EINPROGRESS, and client will
- * retry write on -EINPROGRESS. */
+ /* convert recoverable error into -EINPROGRESS, client will
+ * retry */
if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
- rc == -EAGAIN || rc == -EINTR)
+ rc == -EAGAIN || rc == -EINTR) {
rc = -EINPROGRESS;
+ } else if (rc == -ESRCH) {
+ rc = 0;
+ LQUOTA_ERROR(lqe, "ID isn't enforced on master, it "
+ "probably due to a legeal race, if this "
+ "message is showing up constantly, there "
+ "could be some inconsistence between "
+ "master & slave, and quota reintegration "
+ "needs be re-triggered.");
+ }
}
if (flags != NULL) {
RETURN(rc);
}
+/**
+ * helper function comparing two lquota_id_info structures
+ */
static inline bool qid_equal(struct lquota_id_info *q1,
struct lquota_id_info *q2)
{
return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
}
-/*
+/**
* Enforce quota, it's called in the declaration of each operation.
* qsd_op_end() will then be called later once all the operations have been
* completed in order to release/adjust the quota space.
*
- * \param env - the environment passed by the caller
- * \param qsd - is the qsd instance associated with the device in charge
- * of the operation.
- * \param trans - is the quota transaction information
- * \param qi - qid & space required by current operation
- * \param flags - if the operation is write, return caller no user/group
- * and sync commit flags
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the device in charge of
+ * the operation.
+ * \param trans - is the quota transaction information
+ * \param qi - qid & space required by current operation
+ * \param flags - if the operation is write, return caller no user/group and
+ * sync commit flags
*
- * \retval 0 - success
- * \retval -EDQUOT : out of quota
- * -EINPROGRESS : inform client to retry write
- * -ve : other appropriate errors
+ * \retval 0 - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve - other appropriate errors
*/
int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
struct lquota_trans *trans, struct lquota_id_info *qi,
RETURN(0);
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN(0);
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
/* ignore block quota on MDTs, ignore inode quota on OSTs */
if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
/* ignore quota enforcement request when:
* - quota isn't enforced for this quota type
- * or - the user/group is root */
- if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
+ * or - the user/group is root
+ * or - quota accounting isn't enabled */
+ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+ qsd->qsd_acct_failed)
RETURN(0);
LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
EXPORT_SYMBOL(qsd_op_begin);
/**
+ * Adjust quota space (by acquiring or releasing) hold by the quota slave.
+ * This function is called after each quota request completion and during
+ * reintegration in order to report usage or re-acquire quota locks.
+ * Space adjustment is aborted if there is already a quota request in flight
+ * for this ID.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the qid entry to be processed
+ *
+ * \retval 0 on success, appropriate errors on failure
+ */
+int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
+{
+ struct qsd_thread_info *qti = qsd_info(env);
+ struct quota_body *qbody = &qti->qti_body;
+ struct qsd_instance *qsd;
+ struct qsd_qtype_info *qqi;
+ int rc;
+ bool intent = false;
+ ENTRY;
+
+ memset(qbody, 0, sizeof(*qbody));
+ rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
+ if (rc) {
+ /* add to adjust list again to trigger adjustment later when
+ * slave is ready */
+ LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
+ qsd_adjust_schedule(lqe, true, false);
+ RETURN(0);
+ }
+
+ qqi = lqe2qqi(lqe);
+ qsd = qqi->qqi_qsd;
+
+ lqe_write_lock(lqe);
+
+ /* fill qb_count & qb_flags */
+ if (!qsd_calc_adjust(lqe, qbody)) {
+ lqe_write_unlock(lqe);
+ LQUOTA_DEBUG(lqe, "no adjustment required");
+ RETURN(0);
+ }
+
+ /* only 1 quota request in flight for a given ID is allowed */
+ rc = qsd_request_enter(lqe);
+ if (rc) {
+ /* already a request in flight, space adjustment will be run
+ * again on request completion */
+ lqe_write_unlock(lqe);
+ RETURN(0);
+ }
+
+ if (req_is_rel(qbody->qb_flags))
+ lqe->lqe_pending_rel = qbody->qb_count;
+ lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+ lqe_write_unlock(lqe);
+
+ /* hold a refcount until completion */
+ lqe_getref(lqe);
+
+ /* fill other quota body fields */
+ qbody->qb_fid = qqi->qqi_fid;
+ qbody->qb_id = lqe->lqe_id;
+
+ if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+ /* check whether we own a valid lock for this ID */
+ rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+ if (rc) {
+ memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+ if (req_is_preacq(qbody->qb_flags)) {
+ if (req_has_rep(qbody->qb_flags))
+ /* still want to report usage */
+ qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+ else
+ /* no pre-acquire if no per-ID lock */
+ GOTO(out, rc = -ENOLCK);
+ } else {
+ /* no lock found, should use intent */
+ intent = true;
+ }
+ } else if (req_is_acq(qbody->qb_flags) &&
+ qbody->qb_count == 0) {
+ /* found cached lock, no need to acquire */
+ GOTO(out, rc = 0);
+ }
+ } else {
+ /* release and report don't need a per-ID lock */
+ memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+ }
+
+ if (!intent) {
+ rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false,
+ qsd_req_completion, qqi, &qti->qti_lockh,
+ lqe);
+ } else {
+ struct lquota_lvb *lvb;
+
+ OBD_ALLOC_PTR(lvb);
+ if (lvb == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false,
+ IT_QUOTA_DQACQ, qsd_req_completion,
+ qqi, lvb, (void *)lqe);
+ }
+ /* the completion function will be called by qsd_send_dqacq or
+ * qsd_intent_lock */
+ RETURN(rc);
+out:
+ qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
+ rc);
+ return rc;
+}
+
+/**
* Post quota operation, pre-acquire/release quota from master.
*
* \param env - the environment passed by the caller
if (adjust) {
/* pre-acquire/release quota space is needed */
if (env != NULL)
- qsd_dqacq(env, lqe, QSD_ADJ);
+ qsd_adjust(env, lqe);
else
/* no suitable environment, handle adjustment in
* separate thread context */
EXIT;
}
-/*
+/**
* Post quota operation. It's called after each operation transaction stopped.
*
* \param env - the environment passed by the caller
RETURN_EXIT;
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
LASSERT(trans != NULL);
}
EXPORT_SYMBOL(qsd_op_end);
-void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
- union lquota_id *qid, int qtype)
+/**
+ * Trigger pre-acquire/release if necessary.
+ * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
+ * quota accounting isn't updated when the transaction stopped. Instead, it'll
+ * be updated on the final iput, so qsd_op_adjust() will be called then (in
+ * osd_object_delete()) to trigger quota release if necessary.
+ *
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the device in charge
+ * of the operation.
+ * \param qid - is the lquota ID of the user/group for which to trigger
+ * quota space adjustment
+ * \param qtype - is the quota type (USRQUOTA or GRPQUOTA)
+ */
+void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
+ union lquota_id *qid, int qtype)
{
struct lquota_entry *lqe;
struct qsd_qtype_info *qqi;
RETURN_EXIT;
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
qqi = qsd->qsd_type_array[qtype];
LASSERT(qqi);
qid->qid_uid == 0)
RETURN_EXIT;
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
lqe = lqe_locate(env, qqi->qqi_site, qid);
if (IS_ERR(lqe)) {
lqe_read_unlock(lqe);
if (adjust)
- qsd_dqacq(env, lqe, QSD_ADJ);
+ qsd_adjust(env, lqe);
lqe_putref(lqe);
EXIT;
}
-EXPORT_SYMBOL(qsd_adjust_quota);
+EXPORT_SYMBOL(qsd_op_adjust);