};
struct qsd_instance *qsd_init(const struct lu_env *, char *, struct dt_device *,
- struct proc_dir_entry *, bool is_md);
+ struct proc_dir_entry *, bool is_md, bool excl);
int qsd_prepare(const struct lu_env *, struct qsd_instance *);
int qsd_start(const struct lu_env *, struct qsd_instance *);
void qsd_fini(const struct lu_env *, struct qsd_instance *);
/* currently it's no need to prepare qsd_instance_md for OST */
if (!o->od_is_ost) {
o->od_quota_slave_md = qsd_init(env, o->od_svname,
- &o->od_dt_dev,
- o->od_proc_entry, true);
+ &o->od_dt_dev, o->od_proc_entry,
+ true, true);
if (IS_ERR(o->od_quota_slave_md)) {
rc = PTR_ERR(o->od_quota_slave_md);
o->od_quota_slave_md = NULL;
}
o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
- o->od_proc_entry, false);
+ o->od_proc_entry, false, true);
if (IS_ERR(o->od_quota_slave_dt)) {
if (o->od_quota_slave_md != NULL) {
/* currently it's no need to prepare qsd_instance_md for OST */
if (!o->od_is_ost) {
o->od_quota_slave_md = qsd_init(env, o->od_svname,
- &o->od_dt_dev,
- o->od_proc_entry, true);
+ &o->od_dt_dev, o->od_proc_entry,
+ true, false);
if (IS_ERR(o->od_quota_slave_md)) {
rc = PTR_ERR(o->od_quota_slave_md);
o->od_quota_slave_md = NULL;
}
o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
- o->od_proc_entry, false);
+ o->od_proc_entry, false, false);
if (IS_ERR(o->od_quota_slave_dt)) {
if (o->od_quota_slave_md != NULL) {
qsd_prepared:1, /* qsd_prepare() successfully
* called */
qsd_exp_valid:1,/* qsd_exp is now valid */
- qsd_stopping:1; /* qsd_instance is stopping */
+ qsd_stopping:1, /* qsd_instance is stopping */
+ qsd_updating:1, /* qsd is updating record */
+ qsd_exclusive:1; /* upd exclusive with reint */
};
*/
struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
struct dt_device *dev,
- struct proc_dir_entry *osd_proc, bool is_md)
+ struct proc_dir_entry *osd_proc,
+ bool is_md, bool excl)
{
struct qsd_thread_info *qti = qsd_info(env);
struct qsd_instance *qsd;
qsd->qsd_prepared = false;
qsd->qsd_started = false;
qsd->qsd_is_md = is_md;
+ qsd->qsd_updating = false;
+ qsd->qsd_exclusive = excl;
/* copy service name */
if (strlcpy(qsd->qsd_svname, svname, sizeof(qsd->qsd_svname))
return 0;
}
-static bool qsd_pending_updates(struct qsd_qtype_info *qqi)
+static bool qqi_reint_delayed(struct qsd_qtype_info *qqi)
{
struct qsd_instance *qsd = qqi->qqi_qsd;
struct qsd_upd_rec *upd;
struct lquota_entry *lqe, *n;
int dqacq = 0;
- bool updates = false;
+ bool delay = false;
ENTRY;
/* any pending quota adjust? */
}
spin_unlock(&qsd->qsd_adjust_lock);
+ /* any pending quota request? */
+ cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash, qsd_entry_iter_cb,
+ &dqacq);
+ if (dqacq) {
+ CDEBUG(D_QUOTA, "%s: pending dqacq for type:%d.\n",
+ qsd->qsd_svname, qqi->qqi_qtype);
+ GOTO(out, delay = true);
+ }
+
/* any pending updates? */
- read_lock(&qsd->qsd_lock);
+ write_lock(&qsd->qsd_lock);
+
+ /* check if the reintegration has already started or finished */
+ if ((qqi->qqi_glb_uptodate && qqi->qqi_slv_uptodate) ||
+ qqi->qqi_reint || qsd->qsd_stopping || qsd->qsd_updating)
+ GOTO(out_lock, delay = true);
+
+ /* there could be some unfinished global or index entry updates
+ * (very unlikely), to avoid them messing up with the reint
+ * procedure, we just return and try to re-start reint later. */
list_for_each_entry(upd, &qsd->qsd_upd_list, qur_link) {
if (upd->qur_qqi == qqi) {
- read_unlock(&qsd->qsd_lock);
CDEBUG(D_QUOTA, "%s: pending %s updates for type:%d.\n",
qsd->qsd_svname,
upd->qur_global ? "global" : "slave",
qqi->qqi_qtype);
- GOTO(out, updates = true);
+ GOTO(out_lock, delay = true);
}
}
- read_unlock(&qsd->qsd_lock);
+ qqi->qqi_reint = 1;
- /* any pending quota request? */
- cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash, qsd_entry_iter_cb,
- &dqacq);
- if (dqacq) {
- CDEBUG(D_QUOTA, "%s: pending dqacq for type:%d.\n",
- qsd->qsd_svname, qqi->qqi_qtype);
- updates = true;
- }
EXIT;
+out_lock:
+ write_unlock(&qsd->qsd_lock);
out:
- if (updates)
+ if (delay)
CERROR("%s: Delaying reintegration for qtype:%d until pending "
"updates are flushed.\n",
qsd->qsd_svname, qqi->qqi_qtype);
- return updates;
+ return delay;
}
int qsd_start_reint_thread(struct qsd_qtype_info *qqi)
/* no space accounting support, can't enable enforcement */
RETURN(0);
- /* check if the reintegration has already started or finished */
- write_lock(&qsd->qsd_lock);
-
- if ((qqi->qqi_glb_uptodate && qqi->qqi_slv_uptodate) ||
- qqi->qqi_reint || qsd->qsd_stopping) {
- write_unlock(&qsd->qsd_lock);
+ if (qqi_reint_delayed(qqi))
RETURN(0);
- }
- qqi->qqi_reint = 1;
-
- write_unlock(&qsd->qsd_lock);
-
- /* there could be some unfinished global or index entry updates
- * (very unlikely), to avoid them messing up with the reint
- * procedure, we just return and try to re-start reint later. */
- if (qsd_pending_updates(qqi)) {
- write_lock(&qsd->qsd_lock);
- qqi->qqi_reint = 0;
- write_unlock(&qsd->qsd_lock);
- RETURN(0);
- }
-
OBD_ALLOC_PTR(args);
if (args == NULL)
{
struct lquota_entry *lqe = upd->qur_lqe;
struct qsd_qtype_info *qqi = upd->qur_qqi;
+ struct qsd_instance *qsd = qqi->qqi_qsd;
int rc;
ENTRY;
+ if (qsd->qsd_exclusive) { /* It could be deadlock running with reint */
+ read_lock(&qsd->qsd_lock);
+ rc = qqi->qqi_reint;
+ read_unlock(&qsd->qsd_lock);
+ if (rc)
+ return 1;
+ }
+
if (lqe == NULL) {
lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
if (IS_ERR(lqe))
/* refresh usage */
qsd_refresh_usage(env, lqe);
- spin_lock(&qqi->qqi_qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
lqe->lqe_adjust_time = 0;
- spin_unlock(&qqi->qqi_qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
/* Report usage asynchronously */
rc = qsd_adjust(env, lqe);
list_splice_init(&qsd->qsd_upd_list, upd);
job_pending = true;
}
+ if (qsd->qsd_exclusive)
+ qsd->qsd_updating = job_pending;
for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
complete(args->qua_started);
while (({set_current_state(TASK_IDLE);
!kthread_should_stop(); })) {
+ int count = 0;
if (!qsd_job_pending(qsd, &queue, &uptodate))
schedule_timeout(cfs_time_seconds(QSD_WB_INTERVAL));
__set_current_state(TASK_RUNNING);
- list_for_each_entry_safe(upd, n, &queue, qur_link) {
- list_del_init(&upd->qur_link);
- qsd_process_upd(env, upd);
- qsd_upd_free(upd);
+ while (1) {
+ list_for_each_entry_safe(upd, n, &queue, qur_link) {
+ if (qsd_process_upd(env, upd) <= 0) {
+ list_del_init(&upd->qur_link);
+ qsd_upd_free(upd);
+ }
+ }
+ if (list_empty(&queue))
+ break;
+ count++;
+ if (count % 7 == 0) {
+ n = list_entry(&queue, struct qsd_upd_rec,
+ qur_link);
+ CWARN("%s: The reintegration thread [%d] "
+ "blocked more than %ld seconds\n",
+ n->qur_qqi->qqi_qsd->qsd_svname,
+ n->qur_qqi->qqi_qtype, count *
+ cfs_time_seconds(QSD_WB_INTERVAL) / 10);
+ }
+ schedule_timeout_interruptible(
+ cfs_time_seconds(QSD_WB_INTERVAL) / 10);
+ }
+ if (qsd->qsd_exclusive) {
+ write_lock(&qsd->qsd_lock);
+ qsd->qsd_updating = false;
+ write_unlock(&qsd->qsd_lock);
}
spin_lock(&qsd->qsd_adjust_lock);