Whamcloud - gitweb
LU-12930 various: use schedule_timeout_*interruptible
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34 #include "qsd_internal.h"
35
36 /*
37  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
38  * thread.
39  *
40  * \param qqi - is the qsd_qtype_info structure relevant to the update
41  * \param lqe - is the lquota entry subject to the update
42  * \param qid - is the identifier subject to the update
43  * \param rec - is the record storing the new quota settings
44  * \param ver - is the version associated with the update
45  * \param global - is a boolean set to true if this is an update of the global
46  *                 index and false for a slave index.
47  */
48 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
49                                          struct lquota_entry *lqe,
50                                          union lquota_id *qid,
51                                          union lquota_rec *rec, __u64 ver,
52                                          bool global)
53 {
54         struct qsd_upd_rec      *upd;
55
56         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, GFP_NOFS);
57         if (upd == NULL) {
58                 return NULL;
59         }
60
61         /* fill it */
62         INIT_LIST_HEAD(&upd->qur_link);
63         upd->qur_qqi = qqi;
64         upd->qur_lqe = lqe;
65         if (lqe)
66                 lqe_getref(lqe);
67         upd->qur_qid    = *qid;
68         upd->qur_rec    = *rec;
69         upd->qur_ver    = ver;
70         upd->qur_global = global;
71
72         return upd;
73 }
74
75 static void qsd_upd_free(struct qsd_upd_rec *upd)
76 {
77         if (upd->qur_lqe)
78                 lqe_putref(upd->qur_lqe);
79         OBD_SLAB_FREE_PTR(upd, upd_kmem);
80 }
81
82 /* must hold the qsd_lock */
83 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
84 {
85         if (!qsd->qsd_stopping) {
86                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
87                 /* wake up the upd thread */
88                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
89         } else {
90                 CWARN("%s: discard update.\n", qsd->qsd_svname);
91                 if (upd->qur_lqe)
92                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
93                 qsd_upd_free(upd);
94         }
95 }
96
97 /* must hold the qsd_lock */
98 static void qsd_add_deferred(struct qsd_instance *qsd, struct list_head *list,
99                              struct qsd_upd_rec *upd)
100 {
101         struct qsd_upd_rec      *tmp, *n;
102
103         if (qsd->qsd_stopping) {
104                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
105                 if (upd->qur_lqe)
106                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
107                 qsd_upd_free(upd);
108                 return;
109         }
110
111         /* Sort the updates in ascending order */
112         list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
113
114                 /* There could be some legacy records which have duplicated
115                  * version. Imagine following scenario: slave received global
116                  * glimpse and queued a record in the deferred list, then
117                  * master crash and rollback to an ealier version, then the
118                  * version of queued record will be conflicting with later
119                  * updates. We should just delete the legacy record in such
120                  * case. */
121                 if (upd->qur_ver == tmp->qur_ver) {
122                         if (tmp->qur_lqe)
123                                 LQUOTA_WARN(tmp->qur_lqe, "Found a conflict "
124                                             "record with ver:%llu",
125                                             tmp->qur_ver);
126                         else
127                                 CWARN("%s: Found a conflict record with ver: "
128                                       "%llu\n", qsd->qsd_svname, tmp->qur_ver);
129
130                         list_del_init(&tmp->qur_link);
131                         qsd_upd_free(tmp);
132                 } else if (upd->qur_ver < tmp->qur_ver) {
133                         continue;
134                 } else {
135                         list_add_tail(&upd->qur_link, &tmp->qur_link);
136                         return;
137                 }
138         }
139         list_add(&upd->qur_link, list);
140 }
141
142 /* must hold the qsd_lock */
143 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi,
144                                  struct list_head *list, __u64 ver)
145 {
146         struct qsd_upd_rec      *upd, *tmp;
147         ENTRY;
148
149         /* Get the first update record in the list, which has the smallest
150          * version, discard all records with versions smaller than the current
151          * one */
152         list_for_each_entry_safe(upd, tmp, list, qur_link) {
153                 if (upd->qur_ver <= ver) {
154                         /* drop this update */
155                         list_del_init(&upd->qur_link);
156                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
157                                "%llu/%llu, global:%d, qid:%llu\n",
158                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
159                                upd->qur_global, upd->qur_qid.qid_uid);
160                         qsd_upd_free(upd);
161                 } else {
162                         break;
163                 }
164         }
165
166         /* No remaining deferred update */
167         if (list_empty(list))
168                 RETURN_EXIT;
169
170         CDEBUG(D_QUOTA, "%s: found deferred update record. "
171                "version:%llu/%llu, global:%d, qid:%llu\n",
172                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
173                upd->qur_global, upd->qur_qid.qid_uid);
174
175         LASSERTF(upd->qur_ver > ver, "lur_ver:%llu, cur_ver:%llu\n",
176                  upd->qur_ver, ver);
177
178         /* Kick off the deferred udpate */
179         if (upd->qur_ver == ver + 1) {
180                 list_del_init(&upd->qur_link);
181                 qsd_upd_add(qqi->qqi_qsd, upd);
182         }
183         EXIT;
184 }
185
186 /* Bump version of global or slave index copy
187  *
188  * \param qqi    - qsd_qtype_info
189  * \param ver    - version to be bumped to
190  * \param global - global or slave index copy?
191  */
192 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
193 {
194         struct list_head *list;
195         __u64            *idx_ver;
196
197         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
198         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
199
200         write_lock(&qqi->qqi_qsd->qsd_lock);
201         *idx_ver = ver;
202         if (global)
203                 qqi->qqi_glb_uptodate = 1;
204         else
205                 qqi->qqi_slv_uptodate = 1;
206         qsd_kickoff_deferred(qqi, list, ver);
207         write_unlock(&qqi->qqi_qsd->qsd_lock);
208 }
209
210 /*
211  * Schedule a commit of a lquota entry
212  *
213  * \param  qqi   - qsd_qtype_info
214  * \param  lqe   - lquota_entry
215  * \param  qid   - quota id
216  * \param  rec   - global or slave record to be updated to disk
217  * \param  ver   - new index file version
218  * \param  global- true: master record; false: slave record
219  */
220 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
221                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
222                       bool global)
223 {
224         struct qsd_upd_rec      *upd;
225         struct qsd_instance     *qsd = qqi->qqi_qsd;
226         __u64                    cur_ver;
227         ENTRY;
228
229         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:%llu\n",
230                qsd->qsd_svname, global ? "true" : "false", ver);
231
232         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
233         if (upd == NULL)
234                 RETURN_EXIT;
235
236         /* If we don't want update index version, no need to sort the
237          * records in version order, just schedule the updates instantly. */
238         if (ver == 0) {
239                 write_lock(&qsd->qsd_lock);
240                 qsd_upd_add(qsd, upd);
241                 write_unlock(&qsd->qsd_lock);
242                 RETURN_EXIT;
243         }
244
245         write_lock(&qsd->qsd_lock);
246
247         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
248
249         if (ver <= cur_ver) {
250                 if (global)
251                         /* legitimate race between glimpse AST and
252                          * reintegration */
253                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
254                                " ver:%llu local ver:%llu\n",
255                                qsd->qsd_svname, ver, cur_ver);
256                 else
257                         CERROR("%s: discard slv update, ver:%llu local ver:"
258                                "%llu\n", qsd->qsd_svname, ver, cur_ver);
259                 qsd_upd_free(upd);
260         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
261                    qqi->qqi_slv_uptodate) {
262                 /* In order update, and reintegration has been done. */
263                 qsd_upd_add(qsd, upd);
264         } else {
265                 /* Out of order update (the one with smaller version hasn't
266                  * reached slave or hasn't been flushed to disk yet), or
267                  * the reintegration is in progress. Defer the update. */
268                 struct list_head *list = global ? &qqi->qqi_deferred_glb :
269                                                   &qqi->qqi_deferred_slv;
270                 qsd_add_deferred(qsd, list, upd);
271         }
272
273         write_unlock(&qsd->qsd_lock);
274
275         EXIT;
276 }
277
278 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
279 {
280         struct lquota_entry     *lqe = upd->qur_lqe;
281         struct qsd_qtype_info   *qqi = upd->qur_qqi;
282         int                      rc;
283         ENTRY;
284
285         if (lqe == NULL) {
286                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
287                 if (IS_ERR(lqe))
288                         GOTO(out, rc = PTR_ERR(lqe));
289         }
290
291         /* The in-memory lqe update for slave index copy isn't deferred,
292          * we shouldn't touch it here. */
293         if (upd->qur_global) {
294                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
295                 if (rc)
296                         GOTO(out, rc);
297                 /* refresh usage */
298                 qsd_refresh_usage(env, lqe);
299
300                 spin_lock(&qqi->qqi_qsd->qsd_adjust_lock);
301                 lqe->lqe_adjust_time = 0;
302                 spin_unlock(&qqi->qqi_qsd->qsd_adjust_lock);
303
304                 /* Report usage asynchronously */
305                 rc = qsd_adjust(env, lqe);
306                 if (rc)
307                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
308         }
309
310         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
311                               upd->qur_ver, &upd->qur_rec);
312 out:
313         if (upd->qur_global && rc == 0 &&
314             upd->qur_rec.lqr_glb_rec.qbr_softlimit == 0 &&
315             upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
316             (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
317                                                         LQUOTA_FLAG_DEFAULT)) {
318                 lqe->lqe_is_default = true;
319                 if (qqi->qqi_default_softlimit == 0 &&
320                     qqi->qqi_default_hardlimit == 0)
321                         lqe->lqe_enforced = false;
322                 else
323                         lqe->lqe_enforced = true;
324
325                 LQUOTA_DEBUG(lqe, "update to use default quota");
326         }
327
328         if (lqe && !IS_ERR(lqe)) {
329                 lqe_putref(lqe);
330                 upd->qur_lqe = NULL;
331         }
332         RETURN(rc);
333 }
334
335 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
336 {
337         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
338         bool                     added = false;
339
340         read_lock(&qsd->qsd_lock);
341         if (qsd->qsd_stopping) {
342                 read_unlock(&qsd->qsd_lock);
343                 return;
344         }
345         read_unlock(&qsd->qsd_lock);
346
347         lqe_getref(lqe);
348         spin_lock(&qsd->qsd_adjust_lock);
349
350         /* the lqe is being queued for the per-ID lock cancel, we should
351          * cancel the lock cancel and re-add it for quota adjust */
352         if (!list_empty(&lqe->lqe_link) &&
353             lqe->lqe_adjust_time == 0) {
354                 list_del_init(&lqe->lqe_link);
355                 lqe_putref(lqe);
356         }
357
358         if (list_empty(&lqe->lqe_link)) {
359                 if (!cancel) {
360                         lqe->lqe_adjust_time = ktime_get_seconds();
361                         if (defer)
362                                 lqe->lqe_adjust_time += QSD_WB_INTERVAL;
363                 } else {
364                         lqe->lqe_adjust_time = 0;
365                 }
366
367                 /* lqe reference transferred to list */
368                 if (defer)
369                         list_add_tail(&lqe->lqe_link,
370                                           &qsd->qsd_adjust_list);
371                 else
372                         list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
373                 added = true;
374         }
375         spin_unlock(&qsd->qsd_adjust_lock);
376
377         if (added)
378                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
379         else
380                 lqe_putref(lqe);
381 }
382
383 /* return true if there is pending writeback records or the pending
384  * adjust requests */
385 static bool qsd_job_pending(struct qsd_instance *qsd, struct list_head *upd,
386                             bool *uptodate)
387 {
388         bool    job_pending = false;
389         int     qtype;
390
391         LASSERT(list_empty(upd));
392         *uptodate = true;
393
394         spin_lock(&qsd->qsd_adjust_lock);
395         if (!list_empty(&qsd->qsd_adjust_list)) {
396                 struct lquota_entry *lqe;
397                 lqe = list_entry(qsd->qsd_adjust_list.next,
398                                      struct lquota_entry, lqe_link);
399                 if (ktime_get_seconds() >= lqe->lqe_adjust_time)
400                         job_pending = true;
401         }
402         spin_unlock(&qsd->qsd_adjust_lock);
403
404         write_lock(&qsd->qsd_lock);
405         if (!list_empty(&qsd->qsd_upd_list)) {
406                 list_splice_init(&qsd->qsd_upd_list, upd);
407                 job_pending = true;
408         }
409
410         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
411                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
412
413                 /* don't bother kicking off reintegration if space accounting
414                  * failed to be enabled */
415                 if (qqi->qqi_acct_failed)
416                         continue;
417
418                 if (!qsd_type_enabled(qsd, qtype))
419                         continue;
420
421                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
422                      !qqi->qqi_reint)
423                         /* global or slave index not up to date and reint
424                          * thread not running */
425                         *uptodate = false;
426         }
427
428         write_unlock(&qsd->qsd_lock);
429         return job_pending;
430 }
431
432 static int qsd_upd_thread(void *arg)
433 {
434         struct qsd_instance     *qsd = (struct qsd_instance *)arg;
435         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
436         LIST_HEAD(queue);
437         struct qsd_upd_rec      *upd, *n;
438         struct lu_env           *env;
439         int                      qtype, rc = 0;
440         bool                     uptodate;
441         struct lquota_entry     *lqe;
442         time64_t cur_time;
443         ENTRY;
444
445         OBD_ALLOC_PTR(env);
446         if (env == NULL)
447                 RETURN(-ENOMEM);
448
449         rc = lu_env_init(env, LCT_DT_THREAD);
450         if (rc) {
451                 CERROR("%s: cannot init env: rc = %d\n", qsd->qsd_svname, rc);
452                 OBD_FREE_PTR(env);
453                 RETURN(rc);
454         }
455
456         thread_set_flags(thread, SVC_RUNNING);
457         wake_up(&thread->t_ctl_waitq);
458
459         while (1) {
460                 wait_event_idle_timeout(
461                         thread->t_ctl_waitq,
462                         qsd_job_pending(qsd, &queue, &uptodate) ||
463                         !thread_is_running(thread),
464                         cfs_time_seconds(QSD_WB_INTERVAL));
465
466                 list_for_each_entry_safe(upd, n, &queue, qur_link) {
467                         list_del_init(&upd->qur_link);
468                         qsd_process_upd(env, upd);
469                         qsd_upd_free(upd);
470                 }
471
472                 spin_lock(&qsd->qsd_adjust_lock);
473                 cur_time = ktime_get_seconds();
474                 while (!list_empty(&qsd->qsd_adjust_list)) {
475                         lqe = list_entry(qsd->qsd_adjust_list.next,
476                                          struct lquota_entry, lqe_link);
477                         /* deferred items are sorted by time */
478                         if (lqe->lqe_adjust_time > cur_time)
479                                 break;
480
481                         list_del_init(&lqe->lqe_link);
482                         spin_unlock(&qsd->qsd_adjust_lock);
483
484                         if (thread_is_running(thread) && uptodate) {
485                                 qsd_refresh_usage(env, lqe);
486                                 if (lqe->lqe_adjust_time == 0)
487                                         qsd_id_lock_cancel(env, lqe);
488                                 else
489                                         qsd_adjust(env, lqe);
490                         }
491
492                         lqe_putref(lqe);
493                         spin_lock(&qsd->qsd_adjust_lock);
494                 }
495                 spin_unlock(&qsd->qsd_adjust_lock);
496
497                 if (!thread_is_running(thread))
498                         break;
499
500                 if (uptodate)
501                         continue;
502
503                 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
504                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
505         }
506         lu_env_fini(env);
507         OBD_FREE_PTR(env);
508         thread_set_flags(thread, SVC_STOPPED);
509         wake_up(&thread->t_ctl_waitq);
510         RETURN(rc);
511 }
512
513 int qsd_start_upd_thread(struct qsd_instance *qsd)
514 {
515         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
516         struct task_struct              *task;
517         ENTRY;
518
519         task = kthread_run(qsd_upd_thread, (void *)qsd,
520                            "lquota_wb_%s", qsd->qsd_svname);
521         if (IS_ERR(task)) {
522                 CERROR("fail to start quota update thread: rc = %ld\n",
523                         PTR_ERR(task));
524                 thread_set_flags(thread, SVC_STOPPED);
525                 RETURN(PTR_ERR(task));
526         }
527
528         wait_event_idle(thread->t_ctl_waitq,
529                         thread_is_running(thread) || thread_is_stopped(thread));
530         RETURN(0);
531 }
532
533 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
534 {
535         int     qtype;
536
537         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
538                 struct qsd_upd_rec      *upd, *tmp;
539                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
540
541                 if (qqi == NULL)
542                         continue;
543
544                 write_lock(&qsd->qsd_lock);
545                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
546                                          qur_link) {
547                         CWARN("%s: Free global deferred upd: ID:%llu, "
548                               "ver:%llu/%llu\n", qsd->qsd_svname,
549                               upd->qur_qid.qid_uid, upd->qur_ver,
550                               qqi->qqi_glb_ver);
551                         list_del_init(&upd->qur_link);
552                         qsd_upd_free(upd);
553                 }
554                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
555                                          qur_link) {
556                         CWARN("%s: Free slave deferred upd: ID:%llu, "
557                               "ver:%llu/%llu\n", qsd->qsd_svname,
558                               upd->qur_qid.qid_uid, upd->qur_ver,
559                               qqi->qqi_slv_ver);
560                         list_del_init(&upd->qur_link);
561                         qsd_upd_free(upd);
562                 }
563                 write_unlock(&qsd->qsd_lock);
564         }
565 }
566
567 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
568 {
569         struct lquota_entry     *lqe;
570
571         spin_lock(&qsd->qsd_adjust_lock);
572         while (!list_empty(&qsd->qsd_adjust_list)) {
573                 lqe = list_entry(qsd->qsd_adjust_list.next,
574                                  struct lquota_entry, lqe_link);
575                 list_del_init(&lqe->lqe_link);
576                 lqe_putref(lqe);
577         }
578         spin_unlock(&qsd->qsd_adjust_lock);
579 }
580
581 void qsd_stop_upd_thread(struct qsd_instance *qsd)
582 {
583         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
584
585         if (!thread_is_stopped(thread)) {
586                 thread_set_flags(thread, SVC_STOPPING);
587                 wake_up(&thread->t_ctl_waitq);
588
589                 wait_event_idle(thread->t_ctl_waitq, thread_is_stopped(thread));
590         }
591         qsd_cleanup_deferred(qsd);
592         qsd_cleanup_adjust(qsd);
593 }