* GPL HEADER END
*/
/*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
* Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
#include "qsd_internal.h"
}
lqe->lqe_pending_req--;
lqe->lqe_pending_rel = 0;
- cfs_waitq_broadcast(&lqe->lqe_waiters);
+ wake_up_all(&lqe->lqe_waiters);
}
/**
struct ldlm_lock *lock;
ENTRY;
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
/* is the qsd about to shut down? */
if (qsd->qsd_stopping) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
/* Target is about to shut down, client will retry */
RETURN(-EINPROGRESS);
if (qsd->qsd_exp_valid)
imp = class_exp2cliimp(qsd->qsd_exp);
if (imp == NULL || imp->imp_invalid) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
LQUOTA_DEBUG(lqe, "connection to master not ready");
RETURN(-ENOTCONN);
}
* If the previous reintegration failed for some reason, we'll
* re-trigger it here as well. */
if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
"kicking off reintegration");
qsd_start_reint_thread(qqi);
/* Fill the remote global lock handle, master will check this handle
* to see if the slave is sending request with stale lock */
lustre_handle_copy(lockh, &qqi->qqi_lockh);
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
if (!lustre_handle_is_used(lockh))
RETURN(-ENOLCK);
struct quota_body *reqbody,
struct quota_body *repbody,
struct lustre_handle *lockh,
- union ldlm_wire_lvb *lvb,
+ struct lquota_lvb *lvb,
void *arg, int ret)
{
struct lquota_entry *lqe = (struct lquota_entry *)arg;
/* extract information from lvb */
if (ret == 0 && lvb != 0) {
- if (lvb->l_lquota.lvb_id_qunit != 0)
- qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
- if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
- lqe->lqe_edquot = true;
- else
- lqe->lqe_edquot = false;
+ if (lvb->lvb_id_qunit != 0)
+ qsd_set_qunit(lqe, lvb->lvb_id_qunit);
+ qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
} else if (repbody != NULL && repbody->qb_qunit != 0) {
qsd_set_qunit(lqe, repbody->qb_qunit);
}
lqe->lqe_nopreacq = true;
out:
adjust = qsd_adjust_needed(lqe);
+ if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
+ lqe->lqe_acq_rc = ret;
+ lqe->lqe_acq_time = cfs_time_current_64();
+ }
out_noadjust:
qsd_request_exit(lqe);
lqe_write_unlock(lqe);
lqe->lqe_pending_write += space;
lqe->lqe_waiting_write -= space;
rc = 0;
- } else if (lqe->lqe_edquot) {
+ /* lqe_edquot flag is used to avoid flooding dqacq requests when
+ * the user is over quota, however, the lqe_edquot could be stale
+ * sometimes due to the race reply of dqacq vs. id lock glimpse
+ * (see LU-4505), so we revalidate it every 5 seconds. */
+ } else if (lqe->lqe_edquot &&
+ cfs_time_before_64(cfs_time_shift_64(-5),
+ lqe->lqe_edquot_time)) {
rc = -EDQUOT;
- } else {
+ }else {
rc = -EAGAIN;
}
lqe_write_unlock(lqe);
RETURN(0);
}
+ /* check whether an acquire request completed recently */
+ if (lqe->lqe_acq_rc != 0 &&
+ cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+ lqe_write_unlock(lqe);
+ LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
+ RETURN(lqe->lqe_acq_rc);
+ }
+
/* only 1 quota request in flight for a given ID is allowed */
rc = qsd_request_enter(lqe);
if (rc) {
/* check whether we already own a valid lock for this ID */
rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
if (rc) {
- union ldlm_wire_lvb *lvb;
+ struct lquota_lvb *lvb;
OBD_ALLOC_PTR(lvb);
if (lvb == NULL) {
RETURN(0);
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN(0);
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
/* ignore block quota on MDTs, ignore inode quota on OSTs */
if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
/* ignore quota enforcement request when:
* - quota isn't enforced for this quota type
- * or - the user/group is root */
- if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
+ * or - the user/group is root
+ * or - quota accounting isn't enabled */
+ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+ qsd->qsd_acct_failed)
RETURN(0);
LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
memset(qbody, 0, sizeof(*qbody));
rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
- if (rc)
- RETURN(rc);
+ if (rc) {
+ /* add to adjust list again to trigger adjustment later when
+ * slave is ready */
+ LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
+ qsd_adjust_schedule(lqe, true, false);
+ RETURN(0);
+ }
qqi = lqe2qqi(lqe);
qsd = qqi->qqi_qsd;
/* fill qb_count & qb_flags */
if (!qsd_calc_adjust(lqe, qbody)) {
lqe_write_unlock(lqe);
- LQUOTA_DEBUG(lqe, "No adjust required");
+ LQUOTA_DEBUG(lqe, "no adjustment required");
RETURN(0);
}
qsd_req_completion, qqi, &qti->qti_lockh,
lqe);
} else {
- union ldlm_wire_lvb *lvb;
+ struct lquota_lvb *lvb;
OBD_ALLOC_PTR(lvb);
if (lvb == NULL)
RETURN_EXIT;
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
LASSERT(trans != NULL);
RETURN_EXIT;
/* We don't enforce quota until the qsd_instance is started */
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
qqi = qsd->qsd_type_array[qtype];
LASSERT(qqi);
qid->qid_uid == 0)
RETURN_EXIT;
- cfs_read_lock(&qsd->qsd_lock);
+ read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_read_unlock(&qsd->qsd_lock);
+ read_unlock(&qsd->qsd_lock);
lqe = lqe_locate(env, qqi->qqi_site, qid);
if (IS_ERR(lqe)) {