}
lqe->lqe_pending_req--;
lqe->lqe_pending_rel = 0;
- wake_up_all(&lqe->lqe_waiters);
+ wake_up(&lqe->lqe_waiters);
}
/**
* \param space - is the amount of quota required for the operation
* \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...)
*
- * \retval true - exit from l_wait_event and real return value in \a ret
+ * \retval true - stop waiting in wait_event_idle_timeout,
+ * and real return value in \a ret
* \retval false - continue waiting
*/
static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
long long space, int *ret)
{
int rc = 0, count;
+ int wait_pending = 0;
+ struct qsd_qtype_info *qqi = lqe2qqi(lqe);
+
ENTRY;
for (count = 0; rc == 0; count++) {
LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
-
+again:
if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
rc = -EINPROGRESS;
break;
/* rc == 0, Wouhou! enough local quota space
* rc < 0, something bad happened */
break;
+ /*
+ * There might be a window that commit transaction
+ * have updated usage but pending write doesn't change
+ * wait for it before acquiring remotely.
+ */
+ if (lqe->lqe_pending_write >= space && !wait_pending) {
+ wait_pending = 1;
+ dt_sync(env, qqi->qqi_qsd->qsd_dev);
+ goto again;
+ }
/* if we have gotten some quota and stil wait more quota,
* it's better to give QMT some time to reclaim from clients */
- if (count > 0) {
- set_current_state(TASK_INTERRUPTIBLE);
- schedule_timeout(cfs_time_seconds(1));
- }
+ if (count > 0)
+ schedule_timeout_interruptible(cfs_time_seconds(1));
/* need to acquire more quota space from master */
rc = qsd_acquire_remote(env, lqe);
/* already a request in flight, continue waiting */
RETURN(false);
*ret = rc;
- RETURN(true); /* exit from l_wait_event */
+ RETURN(true);
}
/**
*/
static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
struct lquota_id_info *qid, long long space,
- int *flags)
+ enum osd_quota_local_flags *local_flags)
{
struct lquota_entry *lqe;
- struct l_wait_info lwi;
- int qtype_flag = 0;
+ enum osd_quota_local_flags qtype_flag = 0;
int rc, ret = -EINPROGRESS;
ENTRY;
* quota space. That said, we still want to perform space
* adjustments in qsd_op_end, so we return here, but with
* a reference on the lqe */
- if (flags != NULL) {
+ if (local_flags != NULL) {
rc = qsd_refresh_usage(env, lqe);
GOTO(out_flags, rc);
}
/* acquire quota space for the operation, cap overall wait time to
* prevent a service thread from being stuck for too long */
- lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
- NULL, NULL);
- rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
- &lwi);
+ rc = wait_event_idle_timeout(
+ lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+ cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
- if (rc == 0 && ret == 0) {
+ if (rc > 0 && ret == 0) {
qid->lqi_space += space;
+ rc = 0;
} else {
- if (rc == 0)
+ if (rc > 0)
rc = ret;
+ else if (rc == 0)
+ rc = -ETIMEDOUT;
LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
lqe_write_lock(lqe);
lqe->lqe_waiting_write -= space;
- if (flags && lqe->lqe_pending_write != 0)
+ if (local_flags && lqe->lqe_pending_write != 0)
/* Inform OSD layer that there are pending writes.
* It might want to retry after a sync if appropriate */
- *flags |= QUOTA_FL_SYNC;
+ *local_flags |= QUOTA_FL_SYNC;
lqe_write_unlock(lqe);
/* convert recoverable error into -EINPROGRESS, client will
}
}
- if (flags != NULL) {
+ if (local_flags != NULL) {
out_flags:
LASSERT(qid->lqi_is_blk);
if (rc != 0) {
- *flags |= lquota_over_fl(qqi->qqi_qtype);
+ *local_flags |= lquota_over_fl(qqi->qqi_qtype);
} else {
__u64 usage;
lqe_read_lock(lqe);
usage = lqe->lqe_pending_write;
usage += lqe->lqe_waiting_write;
- if (lqe->lqe_qunit != 0 && (usage % lqe->lqe_qunit >
- qqi->qqi_qsd->qsd_sync_threshold))
+ /* There is a chance to successfully grant more quota
+ * but get edquot flag through glimpse. */
+ if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+ (usage % lqe->lqe_qunit >
+ qqi->qqi_qsd->qsd_sync_threshold)))
usage += qqi->qqi_qsd->qsd_sync_threshold;
usage += lqe->lqe_usage;
qtype_flag = lquota_over_fl(qqi->qqi_qtype);
/* if we should notify client to start sync write */
if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
- *flags |= qtype_flag;
+ *local_flags |= qtype_flag;
else
- *flags &= ~qtype_flag;
+ *local_flags &= ~qtype_flag;
lqe_read_unlock(lqe);
}
}
static inline bool qid_equal(struct lquota_id_info *q1,
struct lquota_id_info *q2)
{
- if (q1->lqi_type != q2->lqi_type)
+ if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
return false;
return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
}
*/
int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
struct lquota_trans *trans, struct lquota_id_info *qi,
- int *flags)
+ enum osd_quota_local_flags *local_flags)
{
int i, rc;
bool found = false;
for (i = 0; i < trans->lqt_id_cnt; i++) {
if (qid_equal(qi, &trans->lqt_ids[i])) {
found = true;
- /* make sure we are not mixing inodes & blocks */
- LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
break;
}
}
/* manage quota enforcement for this ID */
rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
- &trans->lqt_ids[i], qi->lqi_space, flags);
+ &trans->lqt_ids[i], qi->lqi_space, local_flags);
RETURN(rc);
}
EXPORT_SYMBOL(qsd_op_begin);
}
EXPORT_SYMBOL(qsd_op_end);
+/* Simple wrapper on top of qsd API which implement quota transfer for osd
+ * setattr needs. As a reminder, only the root user can change ownership of
+ * a file, that's why EDQUOT & EINPROGRESS errors are discarded
+ */
+int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
+ struct lquota_trans *trans, unsigned int qtype,
+ u64 orig_id, u64 new_id, u64 bspace,
+ struct lquota_id_info *qi)
+{
+ int rc;
+
+ if (unlikely(!qsd))
+ return 0;
+
+ LASSERT(qtype < LL_MAXQUOTAS);
+ if (qtype == PRJQUOTA)
+ if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
+ return -EINVAL;
+
+ qi->lqi_type = qtype;
+
+ /* inode accounting */
+ qi->lqi_is_blk = false;
+
+ /* one more inode for the new owner ... */
+ qi->lqi_id.qid_uid = new_id;
+ qi->lqi_space = 1;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* and one less inode for the current id */
+ qi->lqi_id.qid_uid = orig_id;
+ qi->lqi_space = -1;
+ /* can't get EDQUOT when reducing usage */
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* block accounting */
+ qi->lqi_is_blk = true;
+
+ /* more blocks for the new owner ... */
+ qi->lqi_id.qid_uid = new_id;
+ qi->lqi_space = bspace;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* and finally less blocks for the current owner */
+ qi->lqi_id.qid_uid = orig_id;
+ qi->lqi_space = -bspace;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ /* can't get EDQUOT when reducing usage */
+ if (rc == -EINPROGRESS)
+ rc = 0;
+ return rc;
+}
+EXPORT_SYMBOL(qsd_transfer);
+
/**
* Trigger pre-acquire/release if necessary.
* It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
EXIT;
}
EXPORT_SYMBOL(qsd_op_adjust);
+
+/**
+ * Reserve or free quota.
+ *
+ * Currently, It's used to reserve quota space before changing the file's group
+ * for normal user and free the reserved quota after the group change.
+ *
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the device in charge of
+ * the operation.
+ * \param qi - qid & space required by current operation
+ *
+ * \retval 0 - success
+ * \retval -EDQUOT - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve - other appropriate errors
+ */
+int qsd_reserve_or_free_quota(const struct lu_env *env,
+ struct qsd_instance *qsd,
+ struct lquota_id_info *qi)
+{
+ struct lquota_entry *lqe;
+ struct qsd_qtype_info *qqi;
+ int rc = 0;
+ bool is_free = qi->lqi_space < 0;
+
+ ENTRY;
+
+ if (unlikely(qsd == NULL))
+ RETURN(0);
+
+ if (qsd->qsd_dev->dd_rdonly)
+ RETURN(0);
+
+ if (is_free)
+ qi->lqi_space *= -1;
+
+ /* We don't enforce quota until the qsd_instance is started */
+ read_lock(&qsd->qsd_lock);
+ if (!qsd->qsd_started) {
+ read_unlock(&qsd->qsd_lock);
+ RETURN(0);
+ }
+ read_unlock(&qsd->qsd_lock);
+
+ qqi = qsd->qsd_type_array[qi->lqi_type];
+ LASSERT(qqi);
+
+ CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
+ qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
+ (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
+ "succeed", is_free, qi->lqi_space);
+
+ /* ignore quota enforcement request when:
+ * - quota isn't enforced for this quota type
+ * or - the user/group is root
+ * or - quota accounting isn't enabled
+ */
+ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+ (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
+ RETURN(0);
+
+ if (is_free) {
+ /* look up lquota entry associated with qid */
+ lqe = lqe_locate(env, qqi->qqi_site, &qi->lqi_id);
+ if (IS_ERR(lqe))
+ RETURN(PTR_ERR(lqe));
+ if (!lqe->lqe_enforced) {
+ lqe_putref(lqe);
+ RETURN(0);
+ }
+
+ qi->lqi_qentry = lqe;
+
+ /* lqe will be put in qsd_op_end0 */
+ qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
+ qi->lqi_qentry = NULL;
+ } else {
+ /* manage quota enforcement for this ID */
+ rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
+ qi->lqi_space, NULL);
+
+ if (qi->lqi_qentry != NULL) {
+ lqe_putref(qi->lqi_qentry);
+ qi->lqi_qentry = NULL;
+ }
+ }
+
+ CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
+ is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
+ qi->lqi_id.qid_gid, qi->lqi_space);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(qsd_reserve_or_free_quota);