4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
25 * Use is subject to license terms.
27 * Author: Johann Lombardi <johann.lombardi@intel.com>
28 * Author: Niu Yawei <yawei.niu@intel.com>
31 #define DEBUG_SUBSYSTEM S_LQUOTA
33 #include <linux/kthread.h>
34 #include "qsd_internal.h"
37 * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
40 * \param qqi - is the qsd_qtype_info structure relevant to the update
41 * \param lqe - is the lquota entry subject to the update
42 * \param qid - is the identifier subject to the update
43 * \param rec - is the record storing the new quota settings
44 * \param ver - is the version associated with the update
45 * \param global - is a boolean set to true if this is an update of the global
46 * index and false for a slave index.
48 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
49 struct lquota_entry *lqe,
51 union lquota_rec *rec, __u64 ver,
54 struct qsd_upd_rec *upd;
56 OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, GFP_NOFS);
62 INIT_LIST_HEAD(&upd->qur_link);
70 upd->qur_global = global;
75 static void qsd_upd_free(struct qsd_upd_rec *upd)
78 lqe_putref(upd->qur_lqe);
79 OBD_SLAB_FREE_PTR(upd, upd_kmem);
82 /* must hold the qsd_lock */
83 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
85 if (!qsd->qsd_stopping) {
86 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
87 /* wake up the upd thread */
88 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
90 CWARN("%s: discard update.\n", qsd->qsd_svname);
92 LQUOTA_WARN(upd->qur_lqe, "discard update.");
97 /* must hold the qsd_lock */
98 static void qsd_add_deferred(struct qsd_instance *qsd, struct list_head *list,
99 struct qsd_upd_rec *upd)
101 struct qsd_upd_rec *tmp, *n;
103 if (qsd->qsd_stopping) {
104 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
106 LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
111 /* Sort the updates in ascending order */
112 list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
114 /* There could be some legacy records which have duplicated
115 * version. Imagine following scenario: slave received global
116 * glimpse and queued a record in the deferred list, then
117 * master crash and rollback to an ealier version, then the
118 * version of queued record will be conflicting with later
119 * updates. We should just delete the legacy record in such
121 if (upd->qur_ver == tmp->qur_ver) {
123 LQUOTA_WARN(tmp->qur_lqe, "Found a conflict "
124 "record with ver:%llu",
127 CWARN("%s: Found a conflict record with ver: "
128 "%llu\n", qsd->qsd_svname, tmp->qur_ver);
130 list_del_init(&tmp->qur_link);
132 } else if (upd->qur_ver < tmp->qur_ver) {
135 list_add_tail(&upd->qur_link, &tmp->qur_link);
139 list_add(&upd->qur_link, list);
142 /* must hold the qsd_lock */
143 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi,
144 struct list_head *list, __u64 ver)
146 struct qsd_upd_rec *upd, *tmp;
149 /* Get the first update record in the list, which has the smallest
150 * version, discard all records with versions smaller than the current
152 list_for_each_entry_safe(upd, tmp, list, qur_link) {
153 if (upd->qur_ver <= ver) {
154 /* drop this update */
155 list_del_init(&upd->qur_link);
156 CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
157 "%llu/%llu, global:%d, qid:%llu\n",
158 qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
159 upd->qur_global, upd->qur_qid.qid_uid);
166 /* No remaining deferred update */
167 if (list_empty(list))
170 CDEBUG(D_QUOTA, "%s: found deferred update record. "
171 "version:%llu/%llu, global:%d, qid:%llu\n",
172 qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
173 upd->qur_global, upd->qur_qid.qid_uid);
175 LASSERTF(upd->qur_ver > ver, "lur_ver:%llu, cur_ver:%llu\n",
178 /* Kick off the deferred udpate */
179 if (upd->qur_ver == ver + 1) {
180 list_del_init(&upd->qur_link);
181 qsd_upd_add(qqi->qqi_qsd, upd);
186 /* Bump version of global or slave index copy
188 * \param qqi - qsd_qtype_info
189 * \param ver - version to be bumped to
190 * \param global - global or slave index copy?
192 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
194 struct list_head *list;
197 idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
198 list = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
200 write_lock(&qqi->qqi_qsd->qsd_lock);
203 qqi->qqi_glb_uptodate = 1;
205 qqi->qqi_slv_uptodate = 1;
206 qsd_kickoff_deferred(qqi, list, ver);
207 write_unlock(&qqi->qqi_qsd->qsd_lock);
211 * Schedule a commit of a lquota entry
213 * \param qqi - qsd_qtype_info
214 * \param lqe - lquota_entry
215 * \param qid - quota id
216 * \param rec - global or slave record to be updated to disk
217 * \param ver - new index file version
218 * \param global- true: master record; false: slave record
220 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
221 union lquota_id *qid, union lquota_rec *rec, __u64 ver,
224 struct qsd_upd_rec *upd;
225 struct qsd_instance *qsd = qqi->qqi_qsd;
229 CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:%llu\n",
230 qsd->qsd_svname, global ? "true" : "false", ver);
232 upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
236 /* If we don't want update index version, no need to sort the
237 * records in version order, just schedule the updates instantly. */
239 write_lock(&qsd->qsd_lock);
240 qsd_upd_add(qsd, upd);
241 write_unlock(&qsd->qsd_lock);
245 write_lock(&qsd->qsd_lock);
247 cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
249 if (ver <= cur_ver) {
251 /* legitimate race between glimpse AST and
253 CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
254 " ver:%llu local ver:%llu\n",
255 qsd->qsd_svname, ver, cur_ver);
257 CERROR("%s: discard slv update, ver:%llu local ver:"
258 "%llu\n", qsd->qsd_svname, ver, cur_ver);
260 } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
261 qqi->qqi_slv_uptodate) {
262 /* In order update, and reintegration has been done. */
263 qsd_upd_add(qsd, upd);
265 /* Out of order update (the one with smaller version hasn't
266 * reached slave or hasn't been flushed to disk yet), or
267 * the reintegration is in progress. Defer the update. */
268 struct list_head *list = global ? &qqi->qqi_deferred_glb :
269 &qqi->qqi_deferred_slv;
270 qsd_add_deferred(qsd, list, upd);
273 write_unlock(&qsd->qsd_lock);
278 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
280 struct lquota_entry *lqe = upd->qur_lqe;
281 struct qsd_qtype_info *qqi = upd->qur_qqi;
286 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
288 GOTO(out, rc = PTR_ERR(lqe));
291 /* The in-memory lqe update for slave index copy isn't deferred,
292 * we shouldn't touch it here. */
293 if (upd->qur_global) {
294 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
298 qsd_refresh_usage(env, lqe);
300 spin_lock(&qqi->qqi_qsd->qsd_adjust_lock);
301 lqe->lqe_adjust_time = 0;
302 spin_unlock(&qqi->qqi_qsd->qsd_adjust_lock);
304 /* Report usage asynchronously */
305 rc = qsd_adjust(env, lqe);
307 LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
310 rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
311 upd->qur_ver, &upd->qur_rec);
313 if (upd->qur_global && rc == 0 &&
314 upd->qur_rec.lqr_glb_rec.qbr_softlimit == 0 &&
315 upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
316 (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
317 LQUOTA_FLAG_DEFAULT)) {
318 lqe->lqe_is_default = true;
319 if (qqi->qqi_default_softlimit == 0 &&
320 qqi->qqi_default_hardlimit == 0)
321 lqe->lqe_enforced = false;
323 lqe->lqe_enforced = true;
325 LQUOTA_DEBUG(lqe, "update to use default quota");
328 if (lqe && !IS_ERR(lqe)) {
335 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
337 struct qsd_instance *qsd = lqe2qqi(lqe)->qqi_qsd;
340 read_lock(&qsd->qsd_lock);
341 if (qsd->qsd_stopping) {
342 read_unlock(&qsd->qsd_lock);
345 read_unlock(&qsd->qsd_lock);
348 spin_lock(&qsd->qsd_adjust_lock);
350 /* the lqe is being queued for the per-ID lock cancel, we should
351 * cancel the lock cancel and re-add it for quota adjust */
352 if (!list_empty(&lqe->lqe_link) &&
353 lqe->lqe_adjust_time == 0) {
354 list_del_init(&lqe->lqe_link);
358 if (list_empty(&lqe->lqe_link)) {
360 lqe->lqe_adjust_time = ktime_get_seconds();
362 lqe->lqe_adjust_time += QSD_WB_INTERVAL;
364 lqe->lqe_adjust_time = 0;
367 /* lqe reference transferred to list */
369 list_add_tail(&lqe->lqe_link,
370 &qsd->qsd_adjust_list);
372 list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
375 spin_unlock(&qsd->qsd_adjust_lock);
378 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
383 /* return true if there is pending writeback records or the pending
385 static bool qsd_job_pending(struct qsd_instance *qsd, struct list_head *upd,
388 bool job_pending = false;
391 LASSERT(list_empty(upd));
394 spin_lock(&qsd->qsd_adjust_lock);
395 if (!list_empty(&qsd->qsd_adjust_list)) {
396 struct lquota_entry *lqe;
397 lqe = list_entry(qsd->qsd_adjust_list.next,
398 struct lquota_entry, lqe_link);
399 if (ktime_get_seconds() >= lqe->lqe_adjust_time)
402 spin_unlock(&qsd->qsd_adjust_lock);
404 write_lock(&qsd->qsd_lock);
405 if (!list_empty(&qsd->qsd_upd_list)) {
406 list_splice_init(&qsd->qsd_upd_list, upd);
410 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
411 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
413 /* don't bother kicking off reintegration if space accounting
414 * failed to be enabled */
415 if (qqi->qqi_acct_failed)
418 if (!qsd_type_enabled(qsd, qtype))
421 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
423 /* global or slave index not up to date and reint
424 * thread not running */
428 write_unlock(&qsd->qsd_lock);
432 static int qsd_upd_thread(void *arg)
434 struct qsd_instance *qsd = (struct qsd_instance *)arg;
435 struct ptlrpc_thread *thread = &qsd->qsd_upd_thread;
436 struct list_head queue;
437 struct qsd_upd_rec *upd, *n;
441 struct lquota_entry *lqe;
449 rc = lu_env_init(env, LCT_DT_THREAD);
451 CERROR("%s: cannot init env: rc = %d\n", qsd->qsd_svname, rc);
456 thread_set_flags(thread, SVC_RUNNING);
457 wake_up(&thread->t_ctl_waitq);
459 INIT_LIST_HEAD(&queue);
461 wait_event_idle_timeout(
463 qsd_job_pending(qsd, &queue, &uptodate) ||
464 !thread_is_running(thread),
465 cfs_time_seconds(QSD_WB_INTERVAL));
467 list_for_each_entry_safe(upd, n, &queue, qur_link) {
468 list_del_init(&upd->qur_link);
469 qsd_process_upd(env, upd);
473 spin_lock(&qsd->qsd_adjust_lock);
474 cur_time = ktime_get_seconds();
475 while (!list_empty(&qsd->qsd_adjust_list)) {
476 lqe = list_entry(qsd->qsd_adjust_list.next,
477 struct lquota_entry, lqe_link);
478 /* deferred items are sorted by time */
479 if (lqe->lqe_adjust_time > cur_time)
482 list_del_init(&lqe->lqe_link);
483 spin_unlock(&qsd->qsd_adjust_lock);
485 if (thread_is_running(thread) && uptodate) {
486 qsd_refresh_usage(env, lqe);
487 if (lqe->lqe_adjust_time == 0)
488 qsd_id_lock_cancel(env, lqe);
490 qsd_adjust(env, lqe);
494 spin_lock(&qsd->qsd_adjust_lock);
496 spin_unlock(&qsd->qsd_adjust_lock);
498 if (!thread_is_running(thread))
504 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
505 qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
509 thread_set_flags(thread, SVC_STOPPED);
510 wake_up(&thread->t_ctl_waitq);
514 int qsd_start_upd_thread(struct qsd_instance *qsd)
516 struct ptlrpc_thread *thread = &qsd->qsd_upd_thread;
517 struct task_struct *task;
520 task = kthread_run(qsd_upd_thread, (void *)qsd,
521 "lquota_wb_%s", qsd->qsd_svname);
523 CERROR("fail to start quota update thread: rc = %ld\n",
525 thread_set_flags(thread, SVC_STOPPED);
526 RETURN(PTR_ERR(task));
529 wait_event_idle(thread->t_ctl_waitq,
530 thread_is_running(thread) || thread_is_stopped(thread));
534 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
538 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
539 struct qsd_upd_rec *upd, *tmp;
540 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
545 write_lock(&qsd->qsd_lock);
546 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
548 CWARN("%s: Free global deferred upd: ID:%llu, "
549 "ver:%llu/%llu\n", qsd->qsd_svname,
550 upd->qur_qid.qid_uid, upd->qur_ver,
552 list_del_init(&upd->qur_link);
555 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
557 CWARN("%s: Free slave deferred upd: ID:%llu, "
558 "ver:%llu/%llu\n", qsd->qsd_svname,
559 upd->qur_qid.qid_uid, upd->qur_ver,
561 list_del_init(&upd->qur_link);
564 write_unlock(&qsd->qsd_lock);
568 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
570 struct lquota_entry *lqe;
572 spin_lock(&qsd->qsd_adjust_lock);
573 while (!list_empty(&qsd->qsd_adjust_list)) {
574 lqe = list_entry(qsd->qsd_adjust_list.next,
575 struct lquota_entry, lqe_link);
576 list_del_init(&lqe->lqe_link);
579 spin_unlock(&qsd->qsd_adjust_lock);
582 void qsd_stop_upd_thread(struct qsd_instance *qsd)
584 struct ptlrpc_thread *thread = &qsd->qsd_upd_thread;
586 if (!thread_is_stopped(thread)) {
587 thread_set_flags(thread, SVC_STOPPING);
588 wake_up(&thread->t_ctl_waitq);
590 wait_event_idle(thread->t_ctl_waitq, thread_is_stopped(thread));
592 qsd_cleanup_deferred(qsd);
593 qsd_cleanup_adjust(qsd);