Whamcloud - gitweb
a25dbc51b143faa1a2f8934cf16e8f3ff2d4c305
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2014, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qsd_internal.h"
34
35 extern struct kmem_cache *upd_kmem;
36
37 /*
38  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
39  * thread.
40  *
41  * \param qqi - is the qsd_qtype_info structure relevant to the update
42  * \param lqe - is the lquota entry subject to the update
43  * \param qid - is the identifier subject to the update
44  * \param rec - is the record storing the new quota settings
45  * \param ver - is the version associated with the update
46  * \param global - is a boolean set to true if this is an update of the global
47  *                 index and false for a slave index.
48  */
49 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
50                                          struct lquota_entry *lqe,
51                                          union lquota_id *qid,
52                                          union lquota_rec *rec, __u64 ver,
53                                          bool global)
54 {
55         struct qsd_upd_rec      *upd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, GFP_NOFS);
58         if (upd == NULL) {
59                 return NULL;
60         }
61
62         /* fill it */
63         INIT_LIST_HEAD(&upd->qur_link);
64         upd->qur_qqi = qqi;
65         upd->qur_lqe = lqe;
66         if (lqe)
67                 lqe_getref(lqe);
68         upd->qur_qid    = *qid;
69         upd->qur_rec    = *rec;
70         upd->qur_ver    = ver;
71         upd->qur_global = global;
72
73         return upd;
74 }
75
76 static void qsd_upd_free(struct qsd_upd_rec *upd)
77 {
78         if (upd->qur_lqe)
79                 lqe_putref(upd->qur_lqe);
80         OBD_SLAB_FREE_PTR(upd, upd_kmem);
81 }
82
83 /* must hold the qsd_lock */
84 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
85 {
86         if (!qsd->qsd_stopping) {
87                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
88                 /* wake up the upd thread */
89                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
90         } else {
91                 CWARN("%s: discard update.\n", qsd->qsd_svname);
92                 if (upd->qur_lqe)
93                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
94                 qsd_upd_free(upd);
95         }
96 }
97
98 /* must hold the qsd_lock */
99 static void qsd_add_deferred(struct qsd_instance *qsd, struct list_head *list,
100                              struct qsd_upd_rec *upd)
101 {
102         struct qsd_upd_rec      *tmp, *n;
103
104         if (qsd->qsd_stopping) {
105                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
106                 if (upd->qur_lqe)
107                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
108                 qsd_upd_free(upd);
109                 return;
110         }
111
112         /* Sort the updates in ascending order */
113         list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
114
115                 /* There could be some legacy records which have duplicated
116                  * version. Imagine following scenario: slave received global
117                  * glimpse and queued a record in the deferred list, then
118                  * master crash and rollback to an ealier version, then the
119                  * version of queued record will be conflicting with later
120                  * updates. We should just delete the legacy record in such
121                  * case. */
122                 if (upd->qur_ver == tmp->qur_ver) {
123                         LASSERT(tmp->qur_lqe);
124                         LQUOTA_ERROR(tmp->qur_lqe, "Found a conflict record "
125                                      "with ver:"LPU64"", tmp->qur_ver);
126                         list_del_init(&tmp->qur_link);
127                         qsd_upd_free(tmp);
128                 } else if (upd->qur_ver < tmp->qur_ver) {
129                         continue;
130                 } else {
131                         list_add_tail(&upd->qur_link, &tmp->qur_link);
132                         return;
133                 }
134         }
135         list_add(&upd->qur_link, list);
136 }
137
138 /* must hold the qsd_lock */
139 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi,
140                                  struct list_head *list, __u64 ver)
141 {
142         struct qsd_upd_rec      *upd, *tmp;
143         ENTRY;
144
145         /* Get the first update record in the list, which has the smallest
146          * version, discard all records with versions smaller than the current
147          * one */
148         list_for_each_entry_safe(upd, tmp, list, qur_link) {
149                 if (upd->qur_ver <= ver) {
150                         /* drop this update */
151                         list_del_init(&upd->qur_link);
152                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
153                                LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
154                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
155                                upd->qur_global, upd->qur_qid.qid_uid);
156                         qsd_upd_free(upd);
157                 } else {
158                         break;
159                 }
160         }
161
162         /* No remaining deferred update */
163         if (list_empty(list))
164                 RETURN_EXIT;
165
166         CDEBUG(D_QUOTA, "%s: found deferred update record. "
167                "version:"LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
168                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
169                upd->qur_global, upd->qur_qid.qid_uid);
170
171         LASSERTF(upd->qur_ver > ver, "lur_ver:"LPU64", cur_ver:"LPU64"\n",
172                  upd->qur_ver, ver);
173
174         /* Kick off the deferred udpate */
175         if (upd->qur_ver == ver + 1) {
176                 list_del_init(&upd->qur_link);
177                 qsd_upd_add(qqi->qqi_qsd, upd);
178         }
179         EXIT;
180 }
181
182 /* Bump version of global or slave index copy
183  *
184  * \param qqi    - qsd_qtype_info
185  * \param ver    - version to be bumped to
186  * \param global - global or slave index copy?
187  */
188 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
189 {
190         struct list_head *list;
191         __u64            *idx_ver;
192
193         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
194         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
195
196         write_lock(&qqi->qqi_qsd->qsd_lock);
197         *idx_ver = ver;
198         if (global)
199                 qqi->qqi_glb_uptodate = 1;
200         else
201                 qqi->qqi_slv_uptodate = 1;
202         qsd_kickoff_deferred(qqi, list, ver);
203         write_unlock(&qqi->qqi_qsd->qsd_lock);
204 }
205
206 /*
207  * Schedule a commit of a lquota entry
208  *
209  * \param  qqi   - qsd_qtype_info
210  * \param  lqe   - lquota_entry
211  * \param  qid   - quota id
212  * \param  rec   - global or slave record to be updated to disk
213  * \param  ver   - new index file version
214  * \param  global- true: master record; false: slave record
215  */
216 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
217                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
218                       bool global)
219 {
220         struct qsd_upd_rec      *upd;
221         struct qsd_instance     *qsd = qqi->qqi_qsd;
222         __u64                    cur_ver;
223         ENTRY;
224
225         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:"LPU64"\n",
226                qsd->qsd_svname, global ? "true" : "false", ver);
227
228         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
229         if (upd == NULL)
230                 RETURN_EXIT;
231
232         /* If we don't want update index version, no need to sort the
233          * records in version order, just schedule the updates instantly. */
234         if (ver == 0) {
235                 write_lock(&qsd->qsd_lock);
236                 qsd_upd_add(qsd, upd);
237                 write_unlock(&qsd->qsd_lock);
238                 RETURN_EXIT;
239         }
240
241         write_lock(&qsd->qsd_lock);
242
243         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
244
245         if (ver <= cur_ver) {
246                 if (global)
247                         /* legitimate race between glimpse AST and
248                          * reintegration */
249                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
250                                " ver:"LPU64" local ver:"LPU64"\n",
251                                qsd->qsd_svname, ver, cur_ver);
252                 else
253                         CERROR("%s: discard slv update, ver:"LPU64" local ver:"
254                                LPU64"\n", qsd->qsd_svname, ver, cur_ver);
255                 qsd_upd_free(upd);
256         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
257                    qqi->qqi_slv_uptodate) {
258                 /* In order update, and reintegration has been done. */
259                 qsd_upd_add(qsd, upd);
260         } else {
261                 /* Out of order update (the one with smaller version hasn't
262                  * reached slave or hasn't been flushed to disk yet), or
263                  * the reintegration is in progress. Defer the update. */
264                 struct list_head *list = global ? &qqi->qqi_deferred_glb :
265                                                   &qqi->qqi_deferred_slv;
266                 qsd_add_deferred(qsd, list, upd);
267         }
268
269         write_unlock(&qsd->qsd_lock);
270
271         EXIT;
272 }
273
274 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
275 {
276         struct lquota_entry     *lqe = upd->qur_lqe;
277         struct qsd_qtype_info   *qqi = upd->qur_qqi;
278         int                      rc;
279         ENTRY;
280
281         if (lqe == NULL) {
282                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
283                 if (IS_ERR(lqe))
284                         GOTO(out, rc = PTR_ERR(lqe));
285         }
286
287         /* The in-memory lqe update for slave index copy isn't deferred,
288          * we shouldn't touch it here. */
289         if (upd->qur_global) {
290                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
291                 if (rc)
292                         GOTO(out, rc);
293                 /* refresh usage */
294                 qsd_refresh_usage(env, lqe);
295                 /* Report usage asynchronously */
296                 rc = qsd_adjust(env, lqe);
297                 if (rc)
298                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
299         }
300
301         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
302                               upd->qur_ver, &upd->qur_rec);
303 out:
304         if (lqe && !IS_ERR(lqe)) {
305                 lqe_putref(lqe);
306                 upd->qur_lqe = NULL;
307         }
308         RETURN(rc);
309 }
310
311 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
312 {
313         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
314         bool                     added = false;
315
316         read_lock(&qsd->qsd_lock);
317         if (qsd->qsd_stopping) {
318                 read_unlock(&qsd->qsd_lock);
319                 return;
320         }
321         read_unlock(&qsd->qsd_lock);
322
323         lqe_getref(lqe);
324         spin_lock(&qsd->qsd_adjust_lock);
325
326         /* the lqe is being queued for the per-ID lock cancel, we should
327          * cancel the lock cancel and re-add it for quota adjust */
328         if (!list_empty(&lqe->lqe_link) &&
329             lqe->lqe_adjust_time == 0) {
330                 list_del_init(&lqe->lqe_link);
331                 lqe_putref(lqe);
332         }
333
334         if (list_empty(&lqe->lqe_link)) {
335                 if (cancel)
336                         lqe->lqe_adjust_time = 0;
337                 else
338                         lqe->lqe_adjust_time = defer ?
339                                 cfs_time_shift_64(QSD_WB_INTERVAL) :
340                                 cfs_time_current_64();
341                 /* lqe reference transfered to list */
342                 if (defer)
343                         list_add_tail(&lqe->lqe_link,
344                                           &qsd->qsd_adjust_list);
345                 else
346                         list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
347                 added = true;
348         }
349         spin_unlock(&qsd->qsd_adjust_lock);
350
351         if (added)
352                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
353         else
354                 lqe_putref(lqe);
355 }
356
357 /* return true if there is pending writeback records or the pending
358  * adjust requests */
359 static bool qsd_job_pending(struct qsd_instance *qsd, struct list_head *upd,
360                             bool *uptodate)
361 {
362         bool    job_pending = false;
363         int     qtype;
364
365         LASSERT(list_empty(upd));
366         *uptodate = true;
367
368         spin_lock(&qsd->qsd_adjust_lock);
369         if (!list_empty(&qsd->qsd_adjust_list)) {
370                 struct lquota_entry *lqe;
371                 lqe = list_entry(qsd->qsd_adjust_list.next,
372                                      struct lquota_entry, lqe_link);
373                 if (cfs_time_beforeq_64(lqe->lqe_adjust_time,
374                                         cfs_time_current_64()))
375                         job_pending = true;
376         }
377         spin_unlock(&qsd->qsd_adjust_lock);
378
379         write_lock(&qsd->qsd_lock);
380         if (!list_empty(&qsd->qsd_upd_list)) {
381                 list_splice_init(&qsd->qsd_upd_list, upd);
382                 job_pending = true;
383         }
384
385         if (qsd->qsd_acct_failed) {
386                 /* don't bother kicking off reintegration if space accounting
387                  * failed to be enabled */
388                 write_unlock(&qsd->qsd_lock);
389                 return job_pending;
390         }
391
392         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
393                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
394
395                 if (!qsd_type_enabled(qsd, qtype))
396                         continue;
397
398                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
399                      !qqi->qqi_reint)
400                         /* global or slave index not up to date and reint
401                          * thread not running */
402                         *uptodate = false;
403         }
404
405         write_unlock(&qsd->qsd_lock);
406         return job_pending;
407 }
408
409 static int qsd_upd_thread(void *arg)
410 {
411         struct qsd_instance     *qsd = (struct qsd_instance *)arg;
412         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
413         struct l_wait_info       lwi;
414         struct list_head         queue;
415         struct qsd_upd_rec      *upd, *n;
416         struct lu_env           *env;
417         int                      qtype, rc = 0;
418         bool                     uptodate;
419         struct lquota_entry     *lqe;
420         __u64                    cur_time;
421         ENTRY;
422
423         OBD_ALLOC_PTR(env);
424         if (env == NULL)
425                 RETURN(-ENOMEM);
426
427         rc = lu_env_init(env, LCT_DT_THREAD);
428         if (rc) {
429                 CERROR("%s: cannot init env: rc = %d\n", qsd->qsd_svname, rc);
430                 OBD_FREE_PTR(env);
431                 RETURN(rc);
432         }
433
434         thread_set_flags(thread, SVC_RUNNING);
435         wake_up(&thread->t_ctl_waitq);
436
437         INIT_LIST_HEAD(&queue);
438         lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL);
439         while (1) {
440                 l_wait_event(thread->t_ctl_waitq,
441                              qsd_job_pending(qsd, &queue, &uptodate) ||
442                              !thread_is_running(thread), &lwi);
443
444                 list_for_each_entry_safe(upd, n, &queue, qur_link) {
445                         list_del_init(&upd->qur_link);
446                         qsd_process_upd(env, upd);
447                         qsd_upd_free(upd);
448                 }
449
450                 spin_lock(&qsd->qsd_adjust_lock);
451                 cur_time = cfs_time_current_64();
452                 while (!list_empty(&qsd->qsd_adjust_list)) {
453                         lqe = list_entry(qsd->qsd_adjust_list.next,
454                                          struct lquota_entry, lqe_link);
455                         /* deferred items are sorted by time */
456                         if (!cfs_time_beforeq_64(lqe->lqe_adjust_time,
457                                                  cur_time))
458                                 break;
459
460                         list_del_init(&lqe->lqe_link);
461                         spin_unlock(&qsd->qsd_adjust_lock);
462
463                         if (thread_is_running(thread) && uptodate) {
464                                 qsd_refresh_usage(env, lqe);
465                                 if (lqe->lqe_adjust_time == 0)
466                                         qsd_id_lock_cancel(env, lqe);
467                                 else
468                                         qsd_adjust(env, lqe);
469                         }
470
471                         lqe_putref(lqe);
472                         spin_lock(&qsd->qsd_adjust_lock);
473                 }
474                 spin_unlock(&qsd->qsd_adjust_lock);
475
476                 if (!thread_is_running(thread))
477                         break;
478
479                 if (uptodate)
480                         continue;
481
482                 for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
483                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
484         }
485         lu_env_fini(env);
486         OBD_FREE_PTR(env);
487         thread_set_flags(thread, SVC_STOPPED);
488         wake_up(&thread->t_ctl_waitq);
489         RETURN(rc);
490 }
491
492 int qsd_start_upd_thread(struct qsd_instance *qsd)
493 {
494         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
495         struct l_wait_info       lwi = { 0 };
496         struct task_struct              *task;
497         ENTRY;
498
499         task = kthread_run(qsd_upd_thread, (void *)qsd,
500                            "lquota_wb_%s", qsd->qsd_svname);
501         if (IS_ERR(task)) {
502                 CERROR("fail to start quota update thread: rc = %ld\n",
503                         PTR_ERR(task));
504                 thread_set_flags(thread, SVC_STOPPED);
505                 RETURN(PTR_ERR(task));
506         }
507
508         l_wait_event(thread->t_ctl_waitq,
509                      thread_is_running(thread) || thread_is_stopped(thread),
510                      &lwi);
511         RETURN(0);
512 }
513
514 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
515 {
516         int     qtype;
517
518         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
519                 struct qsd_upd_rec      *upd, *tmp;
520                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
521
522                 if (qqi == NULL)
523                         continue;
524
525                 write_lock(&qsd->qsd_lock);
526                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
527                                          qur_link) {
528                         CWARN("%s: Free global deferred upd: ID:"LPU64", "
529                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
530                               upd->qur_qid.qid_uid, upd->qur_ver,
531                               qqi->qqi_glb_ver);
532                         list_del_init(&upd->qur_link);
533                         qsd_upd_free(upd);
534                 }
535                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
536                                          qur_link) {
537                         CWARN("%s: Free slave deferred upd: ID:"LPU64", "
538                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
539                               upd->qur_qid.qid_uid, upd->qur_ver,
540                               qqi->qqi_slv_ver);
541                         list_del_init(&upd->qur_link);
542                         qsd_upd_free(upd);
543                 }
544                 write_unlock(&qsd->qsd_lock);
545         }
546 }
547
548 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
549 {
550         struct lquota_entry     *lqe;
551
552         spin_lock(&qsd->qsd_adjust_lock);
553         while (!list_empty(&qsd->qsd_adjust_list)) {
554                 lqe = list_entry(qsd->qsd_adjust_list.next,
555                                  struct lquota_entry, lqe_link);
556                 list_del_init(&lqe->lqe_link);
557                 lqe_putref(lqe);
558         }
559         spin_unlock(&qsd->qsd_adjust_lock);
560 }
561
562 void qsd_stop_upd_thread(struct qsd_instance *qsd)
563 {
564         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
565         struct l_wait_info       lwi    = { 0 };
566
567         if (!thread_is_stopped(thread)) {
568                 thread_set_flags(thread, SVC_STOPPING);
569                 wake_up(&thread->t_ctl_waitq);
570
571                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
572                              &lwi);
573         }
574         qsd_cleanup_deferred(qsd);
575         qsd_cleanup_adjust(qsd);
576 }