Whamcloud - gitweb
LU-6142 tests: Fix style issues for chownmany.c
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34 #include "qsd_internal.h"
35
36 extern struct kmem_cache *upd_kmem;
37
38 /*
39  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
40  * thread.
41  *
42  * \param qqi - is the qsd_qtype_info structure relevant to the update
43  * \param lqe - is the lquota entry subject to the update
44  * \param qid - is the identifier subject to the update
45  * \param rec - is the record storing the new quota settings
46  * \param ver - is the version associated with the update
47  * \param global - is a boolean set to true if this is an update of the global
48  *                 index and false for a slave index.
49  */
50 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
51                                          struct lquota_entry *lqe,
52                                          union lquota_id *qid,
53                                          union lquota_rec *rec, __u64 ver,
54                                          bool global)
55 {
56         struct qsd_upd_rec      *upd;
57
58         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, GFP_NOFS);
59         if (upd == NULL) {
60                 return NULL;
61         }
62
63         /* fill it */
64         INIT_LIST_HEAD(&upd->qur_link);
65         upd->qur_qqi = qqi;
66         upd->qur_lqe = lqe;
67         if (lqe)
68                 lqe_getref(lqe);
69         upd->qur_qid    = *qid;
70         upd->qur_rec    = *rec;
71         upd->qur_ver    = ver;
72         upd->qur_global = global;
73
74         return upd;
75 }
76
77 static void qsd_upd_free(struct qsd_upd_rec *upd)
78 {
79         if (upd->qur_lqe)
80                 lqe_putref(upd->qur_lqe);
81         OBD_SLAB_FREE_PTR(upd, upd_kmem);
82 }
83
84 /* must hold the qsd_lock */
85 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
86 {
87         if (!qsd->qsd_stopping) {
88                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
89                 /* wake up the upd thread */
90                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
91         } else {
92                 CWARN("%s: discard update.\n", qsd->qsd_svname);
93                 if (upd->qur_lqe)
94                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
95                 qsd_upd_free(upd);
96         }
97 }
98
99 /* must hold the qsd_lock */
100 static void qsd_add_deferred(struct qsd_instance *qsd, struct list_head *list,
101                              struct qsd_upd_rec *upd)
102 {
103         struct qsd_upd_rec      *tmp, *n;
104
105         if (qsd->qsd_stopping) {
106                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
107                 if (upd->qur_lqe)
108                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
109                 qsd_upd_free(upd);
110                 return;
111         }
112
113         /* Sort the updates in ascending order */
114         list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
115
116                 /* There could be some legacy records which have duplicated
117                  * version. Imagine following scenario: slave received global
118                  * glimpse and queued a record in the deferred list, then
119                  * master crash and rollback to an ealier version, then the
120                  * version of queued record will be conflicting with later
121                  * updates. We should just delete the legacy record in such
122                  * case. */
123                 if (upd->qur_ver == tmp->qur_ver) {
124                         if (tmp->qur_lqe)
125                                 LQUOTA_WARN(tmp->qur_lqe, "Found a conflict "
126                                             "record with ver:%llu",
127                                             tmp->qur_ver);
128                         else
129                                 CWARN("%s: Found a conflict record with ver: "
130                                       "%llu\n", qsd->qsd_svname, tmp->qur_ver);
131
132                         list_del_init(&tmp->qur_link);
133                         qsd_upd_free(tmp);
134                 } else if (upd->qur_ver < tmp->qur_ver) {
135                         continue;
136                 } else {
137                         list_add_tail(&upd->qur_link, &tmp->qur_link);
138                         return;
139                 }
140         }
141         list_add(&upd->qur_link, list);
142 }
143
144 /* must hold the qsd_lock */
145 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi,
146                                  struct list_head *list, __u64 ver)
147 {
148         struct qsd_upd_rec      *upd, *tmp;
149         ENTRY;
150
151         /* Get the first update record in the list, which has the smallest
152          * version, discard all records with versions smaller than the current
153          * one */
154         list_for_each_entry_safe(upd, tmp, list, qur_link) {
155                 if (upd->qur_ver <= ver) {
156                         /* drop this update */
157                         list_del_init(&upd->qur_link);
158                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
159                                "%llu/%llu, global:%d, qid:%llu\n",
160                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
161                                upd->qur_global, upd->qur_qid.qid_uid);
162                         qsd_upd_free(upd);
163                 } else {
164                         break;
165                 }
166         }
167
168         /* No remaining deferred update */
169         if (list_empty(list))
170                 RETURN_EXIT;
171
172         CDEBUG(D_QUOTA, "%s: found deferred update record. "
173                "version:%llu/%llu, global:%d, qid:%llu\n",
174                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
175                upd->qur_global, upd->qur_qid.qid_uid);
176
177         LASSERTF(upd->qur_ver > ver, "lur_ver:%llu, cur_ver:%llu\n",
178                  upd->qur_ver, ver);
179
180         /* Kick off the deferred udpate */
181         if (upd->qur_ver == ver + 1) {
182                 list_del_init(&upd->qur_link);
183                 qsd_upd_add(qqi->qqi_qsd, upd);
184         }
185         EXIT;
186 }
187
188 /* Bump version of global or slave index copy
189  *
190  * \param qqi    - qsd_qtype_info
191  * \param ver    - version to be bumped to
192  * \param global - global or slave index copy?
193  */
194 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
195 {
196         struct list_head *list;
197         __u64            *idx_ver;
198
199         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
200         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
201
202         write_lock(&qqi->qqi_qsd->qsd_lock);
203         *idx_ver = ver;
204         if (global)
205                 qqi->qqi_glb_uptodate = 1;
206         else
207                 qqi->qqi_slv_uptodate = 1;
208         qsd_kickoff_deferred(qqi, list, ver);
209         write_unlock(&qqi->qqi_qsd->qsd_lock);
210 }
211
212 /*
213  * Schedule a commit of a lquota entry
214  *
215  * \param  qqi   - qsd_qtype_info
216  * \param  lqe   - lquota_entry
217  * \param  qid   - quota id
218  * \param  rec   - global or slave record to be updated to disk
219  * \param  ver   - new index file version
220  * \param  global- true: master record; false: slave record
221  */
222 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
223                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
224                       bool global)
225 {
226         struct qsd_upd_rec      *upd;
227         struct qsd_instance     *qsd = qqi->qqi_qsd;
228         __u64                    cur_ver;
229         ENTRY;
230
231         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:%llu\n",
232                qsd->qsd_svname, global ? "true" : "false", ver);
233
234         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
235         if (upd == NULL)
236                 RETURN_EXIT;
237
238         /* If we don't want update index version, no need to sort the
239          * records in version order, just schedule the updates instantly. */
240         if (ver == 0) {
241                 write_lock(&qsd->qsd_lock);
242                 qsd_upd_add(qsd, upd);
243                 write_unlock(&qsd->qsd_lock);
244                 RETURN_EXIT;
245         }
246
247         write_lock(&qsd->qsd_lock);
248
249         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
250
251         if (ver <= cur_ver) {
252                 if (global)
253                         /* legitimate race between glimpse AST and
254                          * reintegration */
255                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
256                                " ver:%llu local ver:%llu\n",
257                                qsd->qsd_svname, ver, cur_ver);
258                 else
259                         CERROR("%s: discard slv update, ver:%llu local ver:"
260                                "%llu\n", qsd->qsd_svname, ver, cur_ver);
261                 qsd_upd_free(upd);
262         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
263                    qqi->qqi_slv_uptodate) {
264                 /* In order update, and reintegration has been done. */
265                 qsd_upd_add(qsd, upd);
266         } else {
267                 /* Out of order update (the one with smaller version hasn't
268                  * reached slave or hasn't been flushed to disk yet), or
269                  * the reintegration is in progress. Defer the update. */
270                 struct list_head *list = global ? &qqi->qqi_deferred_glb :
271                                                   &qqi->qqi_deferred_slv;
272                 qsd_add_deferred(qsd, list, upd);
273         }
274
275         write_unlock(&qsd->qsd_lock);
276
277         EXIT;
278 }
279
280 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
281 {
282         struct lquota_entry     *lqe = upd->qur_lqe;
283         struct qsd_qtype_info   *qqi = upd->qur_qqi;
284         int                      rc;
285         ENTRY;
286
287         if (lqe == NULL) {
288                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
289                 if (IS_ERR(lqe))
290                         GOTO(out, rc = PTR_ERR(lqe));
291         }
292
293         /* The in-memory lqe update for slave index copy isn't deferred,
294          * we shouldn't touch it here. */
295         if (upd->qur_global) {
296                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
297                 if (rc)
298                         GOTO(out, rc);
299                 /* refresh usage */
300                 qsd_refresh_usage(env, lqe);
301
302                 spin_lock(&qqi->qqi_qsd->qsd_adjust_lock);
303                 lqe->lqe_adjust_time = 0;
304                 spin_unlock(&qqi->qqi_qsd->qsd_adjust_lock);
305
306                 /* Report usage asynchronously */
307                 rc = qsd_adjust(env, lqe);
308                 if (rc)
309                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
310         }
311
312         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
313                               upd->qur_ver, &upd->qur_rec);
314 out:
315         if (upd->qur_global && rc == 0 &&
316             upd->qur_rec.lqr_glb_rec.qbr_softlimit == 0 &&
317             upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
318             (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
319                                                         LQUOTA_FLAG_DEFAULT)) {
320                 lqe->lqe_is_default = true;
321                 if (qqi->qqi_default_softlimit == 0 &&
322                     qqi->qqi_default_hardlimit == 0)
323                         lqe->lqe_enforced = false;
324                 else
325                         lqe->lqe_enforced = true;
326
327                 LQUOTA_DEBUG(lqe, "update to use default quota");
328         }
329
330         if (lqe && !IS_ERR(lqe)) {
331                 lqe_putref(lqe);
332                 upd->qur_lqe = NULL;
333         }
334         RETURN(rc);
335 }
336
337 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
338 {
339         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
340         bool                     added = false;
341
342         read_lock(&qsd->qsd_lock);
343         if (qsd->qsd_stopping) {
344                 read_unlock(&qsd->qsd_lock);
345                 return;
346         }
347         read_unlock(&qsd->qsd_lock);
348
349         lqe_getref(lqe);
350         spin_lock(&qsd->qsd_adjust_lock);
351
352         /* the lqe is being queued for the per-ID lock cancel, we should
353          * cancel the lock cancel and re-add it for quota adjust */
354         if (!list_empty(&lqe->lqe_link) &&
355             lqe->lqe_adjust_time == 0) {
356                 list_del_init(&lqe->lqe_link);
357                 lqe_putref(lqe);
358         }
359
360         if (list_empty(&lqe->lqe_link)) {
361                 if (!cancel) {
362                         lqe->lqe_adjust_time = ktime_get_seconds();
363                         if (defer)
364                                 lqe->lqe_adjust_time += QSD_WB_INTERVAL;
365                 } else {
366                         lqe->lqe_adjust_time = 0;
367                 }
368
369                 /* lqe reference transferred to list */
370                 if (defer)
371                         list_add_tail(&lqe->lqe_link,
372                                           &qsd->qsd_adjust_list);
373                 else
374                         list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
375                 added = true;
376         }
377         spin_unlock(&qsd->qsd_adjust_lock);
378
379         if (added)
380                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
381         else
382                 lqe_putref(lqe);
383 }
384
385 /* return true if there is pending writeback records or the pending
386  * adjust requests */
387 static bool qsd_job_pending(struct qsd_instance *qsd, struct list_head *upd,
388                             bool *uptodate)
389 {
390         bool    job_pending = false;
391         int     qtype;
392
393         LASSERT(list_empty(upd));
394         *uptodate = true;
395
396         spin_lock(&qsd->qsd_adjust_lock);
397         if (!list_empty(&qsd->qsd_adjust_list)) {
398                 struct lquota_entry *lqe;
399                 lqe = list_entry(qsd->qsd_adjust_list.next,
400                                      struct lquota_entry, lqe_link);
401                 if (ktime_get_seconds() >= lqe->lqe_adjust_time)
402                         job_pending = true;
403         }
404         spin_unlock(&qsd->qsd_adjust_lock);
405
406         write_lock(&qsd->qsd_lock);
407         if (!list_empty(&qsd->qsd_upd_list)) {
408                 list_splice_init(&qsd->qsd_upd_list, upd);
409                 job_pending = true;
410         }
411
412         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
413                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
414
415                 /* don't bother kicking off reintegration if space accounting
416                  * failed to be enabled */
417                 if (qqi->qqi_acct_failed)
418                         continue;
419
420                 if (!qsd_type_enabled(qsd, qtype))
421                         continue;
422
423                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
424                      !qqi->qqi_reint)
425                         /* global or slave index not up to date and reint
426                          * thread not running */
427                         *uptodate = false;
428         }
429
430         write_unlock(&qsd->qsd_lock);
431         return job_pending;
432 }
433
434 static int qsd_upd_thread(void *arg)
435 {
436         struct qsd_instance     *qsd = (struct qsd_instance *)arg;
437         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
438         struct l_wait_info       lwi;
439         struct list_head         queue;
440         struct qsd_upd_rec      *upd, *n;
441         struct lu_env           *env;
442         int                      qtype, rc = 0;
443         bool                     uptodate;
444         struct lquota_entry     *lqe;
445         time64_t cur_time;
446         ENTRY;
447
448         OBD_ALLOC_PTR(env);
449         if (env == NULL)
450                 RETURN(-ENOMEM);
451
452         rc = lu_env_init(env, LCT_DT_THREAD);
453         if (rc) {
454                 CERROR("%s: cannot init env: rc = %d\n", qsd->qsd_svname, rc);
455                 OBD_FREE_PTR(env);
456                 RETURN(rc);
457         }
458
459         thread_set_flags(thread, SVC_RUNNING);
460         wake_up(&thread->t_ctl_waitq);
461
462         INIT_LIST_HEAD(&queue);
463         lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL);
464         while (1) {
465                 l_wait_event(thread->t_ctl_waitq,
466                              qsd_job_pending(qsd, &queue, &uptodate) ||
467                              !thread_is_running(thread), &lwi);
468
469                 list_for_each_entry_safe(upd, n, &queue, qur_link) {
470                         list_del_init(&upd->qur_link);
471                         qsd_process_upd(env, upd);
472                         qsd_upd_free(upd);
473                 }
474
475                 spin_lock(&qsd->qsd_adjust_lock);
476                 cur_time = ktime_get_seconds();
477                 while (!list_empty(&qsd->qsd_adjust_list)) {
478                         lqe = list_entry(qsd->qsd_adjust_list.next,
479                                          struct lquota_entry, lqe_link);
480                         /* deferred items are sorted by time */
481                         if (lqe->lqe_adjust_time > cur_time)
482                                 break;
483
484                         list_del_init(&lqe->lqe_link);
485                         spin_unlock(&qsd->qsd_adjust_lock);
486
487                         if (thread_is_running(thread) && uptodate) {
488                                 qsd_refresh_usage(env, lqe);
489                                 if (lqe->lqe_adjust_time == 0)
490                                         qsd_id_lock_cancel(env, lqe);
491                                 else
492                                         qsd_adjust(env, lqe);
493                         }
494
495                         lqe_putref(lqe);
496                         spin_lock(&qsd->qsd_adjust_lock);
497                 }
498                 spin_unlock(&qsd->qsd_adjust_lock);
499
500                 if (!thread_is_running(thread))
501                         break;
502
503                 if (uptodate)
504                         continue;
505
506                 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
507                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
508         }
509         lu_env_fini(env);
510         OBD_FREE_PTR(env);
511         thread_set_flags(thread, SVC_STOPPED);
512         wake_up(&thread->t_ctl_waitq);
513         RETURN(rc);
514 }
515
516 int qsd_start_upd_thread(struct qsd_instance *qsd)
517 {
518         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
519         struct l_wait_info       lwi = { 0 };
520         struct task_struct              *task;
521         ENTRY;
522
523         task = kthread_run(qsd_upd_thread, (void *)qsd,
524                            "lquota_wb_%s", qsd->qsd_svname);
525         if (IS_ERR(task)) {
526                 CERROR("fail to start quota update thread: rc = %ld\n",
527                         PTR_ERR(task));
528                 thread_set_flags(thread, SVC_STOPPED);
529                 RETURN(PTR_ERR(task));
530         }
531
532         l_wait_event(thread->t_ctl_waitq,
533                      thread_is_running(thread) || thread_is_stopped(thread),
534                      &lwi);
535         RETURN(0);
536 }
537
538 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
539 {
540         int     qtype;
541
542         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
543                 struct qsd_upd_rec      *upd, *tmp;
544                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
545
546                 if (qqi == NULL)
547                         continue;
548
549                 write_lock(&qsd->qsd_lock);
550                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
551                                          qur_link) {
552                         CWARN("%s: Free global deferred upd: ID:%llu, "
553                               "ver:%llu/%llu\n", qsd->qsd_svname,
554                               upd->qur_qid.qid_uid, upd->qur_ver,
555                               qqi->qqi_glb_ver);
556                         list_del_init(&upd->qur_link);
557                         qsd_upd_free(upd);
558                 }
559                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
560                                          qur_link) {
561                         CWARN("%s: Free slave deferred upd: ID:%llu, "
562                               "ver:%llu/%llu\n", qsd->qsd_svname,
563                               upd->qur_qid.qid_uid, upd->qur_ver,
564                               qqi->qqi_slv_ver);
565                         list_del_init(&upd->qur_link);
566                         qsd_upd_free(upd);
567                 }
568                 write_unlock(&qsd->qsd_lock);
569         }
570 }
571
572 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
573 {
574         struct lquota_entry     *lqe;
575
576         spin_lock(&qsd->qsd_adjust_lock);
577         while (!list_empty(&qsd->qsd_adjust_list)) {
578                 lqe = list_entry(qsd->qsd_adjust_list.next,
579                                  struct lquota_entry, lqe_link);
580                 list_del_init(&lqe->lqe_link);
581                 lqe_putref(lqe);
582         }
583         spin_unlock(&qsd->qsd_adjust_lock);
584 }
585
586 void qsd_stop_upd_thread(struct qsd_instance *qsd)
587 {
588         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
589         struct l_wait_info       lwi    = { 0 };
590
591         if (!thread_is_stopped(thread)) {
592                 thread_set_flags(thread, SVC_STOPPING);
593                 wake_up(&thread->t_ctl_waitq);
594
595                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
596                              &lwi);
597         }
598         qsd_cleanup_deferred(qsd);
599         qsd_cleanup_adjust(qsd);
600 }