* GPL HEADER END
*/
/*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
}
if (lqe->lqe_pending_rel != 0) {
- LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
+ LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
lqe->lqe_pending_rel);
LBUG();
}
* the DQACQ since the limit for this ID has been removed, so we
* should not update quota entry & slave index copy neither. */
if (repbody != NULL && repbody->qb_count != 0) {
- LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
+ LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
if (req_is_rel(reqbody->qb_flags)) {
if (lqe->lqe_granted < repbody->qb_count) {
LQUOTA_ERROR(lqe, "can't release more space "
- "than owned "LPU64"<"LPU64,
+ "than owned %llu<%llu",
lqe->lqe_granted,
repbody->qb_count);
lqe->lqe_granted = 0;
}
/* extract information from lvb */
- if (ret == 0 && lvb != 0) {
+ if (ret == 0 && lvb != NULL) {
if (lvb->lvb_id_qunit != 0)
qsd_set_qunit(lqe, lvb->lvb_id_qunit);
qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
adjust = qsd_adjust_needed(lqe);
if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
lqe->lqe_acq_rc = ret;
- lqe->lqe_acq_time = cfs_time_current_64();
+ lqe->lqe_acq_time = ktime_get_seconds();
}
out_noadjust:
qsd_request_exit(lqe);
* sometimes due to the race reply of dqacq vs. id lock glimpse
* (see LU-4505), so we revalidate it every 5 seconds. */
} else if (lqe->lqe_edquot &&
- cfs_time_before_64(cfs_time_shift_64(-5),
- lqe->lqe_edquot_time)) {
+ (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
rc = -EDQUOT;
}else {
rc = -EAGAIN;
/* check whether an acquire request completed recently */
if (lqe->lqe_acq_rc != 0 &&
- cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+ lqe->lqe_acq_time > ktime_get_seconds() - 1) {
lqe_write_unlock(lqe);
LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
RETURN(lqe->lqe_acq_rc);
* \param space - is the amount of quota required for the operation
* \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
*
- * \retval true - exit from l_wait_event and real return value in \a ret
+ * \retval true - stop waiting in wait_event_idle_timeout,
+ * and real return value in \a ret
* \retval false - continue waiting
*/
static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
ENTRY;
for (count = 0; rc == 0; count++) {
- LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
+ LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
rc = -EINPROGRESS;
* rc < 0, something bad happened */
break;
+ /* if we have gotten some quota and stil wait more quota,
+ * it's better to give QMT some time to reclaim from clients */
+ if (count > 0)
+ schedule_timeout_interruptible(cfs_time_seconds(1));
+
/* need to acquire more quota space from master */
rc = qsd_acquire_remote(env, lqe);
}
/* already a request in flight, continue waiting */
RETURN(false);
*ret = rc;
- RETURN(true); /* exit from l_wait_event */
+ RETURN(true);
}
/**
*/
static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
struct lquota_id_info *qid, long long space,
- int *flags)
+ enum osd_quota_local_flags *local_flags)
{
- struct lquota_entry *lqe;
- int rc, ret = -EINPROGRESS;
- struct l_wait_info lwi;
+ struct lquota_entry *lqe;
+ enum osd_quota_local_flags qtype_flag = 0;
+ int rc, ret = -EINPROGRESS;
ENTRY;
if (qid->lqi_qentry != NULL) {
* quota space. That said, we still want to perform space
* adjustments in qsd_op_end, so we return here, but with
* a reference on the lqe */
- if (flags != NULL) {
+ if (local_flags != NULL) {
rc = qsd_refresh_usage(env, lqe);
GOTO(out_flags, rc);
}
RETURN(0);
}
- LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
+ LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
lqe_write_lock(lqe);
lqe->lqe_waiting_write += space;
/* acquire quota space for the operation, cap overall wait time to
* prevent a service thread from being stuck for too long */
- lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
- NULL, NULL);
- rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
- &lwi);
+ rc = wait_event_idle_timeout(
+ lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+ cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
- if (rc == 0 && ret == 0) {
+ if (rc > 0 && ret == 0) {
qid->lqi_space += space;
+ rc = 0;
} else {
- if (rc == 0)
+ if (rc > 0)
rc = ret;
+ else if (rc == 0)
+ rc = -ETIMEDOUT;
LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
lqe_write_lock(lqe);
lqe->lqe_waiting_write -= space;
- if (flags && lqe->lqe_pending_write != 0)
+ if (local_flags && lqe->lqe_pending_write != 0)
/* Inform OSD layer that there are pending writes.
* It might want to retry after a sync if appropriate */
- *flags |= QUOTA_FL_SYNC;
+ *local_flags |= QUOTA_FL_SYNC;
lqe_write_unlock(lqe);
/* convert recoverable error into -EINPROGRESS, client will
}
}
- if (flags != NULL) {
+ if (local_flags != NULL) {
out_flags:
LASSERT(qid->lqi_is_blk);
if (rc != 0) {
- *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+ *local_flags |= lquota_over_fl(qqi->qqi_qtype);
} else {
__u64 usage;
lqe_read_lock(lqe);
- usage = lqe->lqe_usage;
- usage += lqe->lqe_pending_write;
+ usage = lqe->lqe_pending_write;
usage += lqe->lqe_waiting_write;
- usage += qqi->qqi_qsd->qsd_sync_threshold;
+ /* There is a chance to successfully grant more quota
+ * but get edquot flag through glimpse. */
+ if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+ (usage % lqe->lqe_qunit >
+ qqi->qqi_qsd->qsd_sync_threshold)))
+ usage += qqi->qqi_qsd->qsd_sync_threshold;
+ usage += lqe->lqe_usage;
+
+ qtype_flag = lquota_over_fl(qqi->qqi_qtype);
/* if we should notify client to start sync write */
if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
- *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+ *local_flags |= qtype_flag;
else
- *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
+ *local_flags &= ~qtype_flag;
lqe_read_unlock(lqe);
}
}
static inline bool qid_equal(struct lquota_id_info *q1,
struct lquota_id_info *q2)
{
- if (q1->lqi_type != q2->lqi_type)
+ if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
return false;
return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
}
*/
int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
struct lquota_trans *trans, struct lquota_id_info *qi,
- int *flags)
+ enum osd_quota_local_flags *local_flags)
{
int i, rc;
bool found = false;
if (unlikely(qsd == NULL))
RETURN(0);
+ if (qsd->qsd_dev->dd_rdonly)
+ RETURN(0);
+
/* We don't enforce quota until the qsd_instance is started */
read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
* or - the user/group is root
* or - quota accounting isn't enabled */
if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
- qsd->qsd_acct_failed)
+ (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
RETURN(0);
LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
for (i = 0; i < trans->lqt_id_cnt; i++) {
if (qid_equal(qi, &trans->lqt_ids[i])) {
found = true;
- /* make sure we are not mixing inodes & blocks */
- LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
break;
}
}
/* manage quota enforcement for this ID */
rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
- &trans->lqt_ids[i], qi->lqi_space, flags);
+ &trans->lqt_ids[i], qi->lqi_space, local_flags);
RETURN(rc);
}
EXPORT_SYMBOL(qsd_op_begin);
qqi = lqe2qqi(lqe);
qsd = qqi->qqi_qsd;
+ if (qsd->qsd_dev->dd_rdonly)
+ RETURN(0);
+
lqe_write_lock(lqe);
/* fill qb_count & qb_flags */
if (unlikely(qsd == NULL))
RETURN_EXIT;
+ if (qsd->qsd_dev->dd_rdonly)
+ RETURN_EXIT;
+
/* We don't enforce quota until the qsd_instance is started */
read_lock(&qsd->qsd_lock);
if (!qsd->qsd_started) {
lqe = lqe_locate(env, qqi->qqi_site, qid);
if (IS_ERR(lqe)) {
- CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
+ CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
qsd->qsd_svname, qid->qid_uid, qtype);
RETURN_EXIT;
}