Whamcloud - gitweb
a2b01a4188abf6b42f1d1fd33b4be22589a04a64
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_LQUOTA
36
37 #include "qsd_internal.h"
38
39 extern struct kmem_cache *upd_kmem;
40
41 /*
42  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
43  * thread.
44  *
45  * \param qqi - is the qsd_qtype_info structure relevant to the update
46  * \param lqe - is the lquota entry subject to the update
47  * \param qid - is the identifier subject to the update
48  * \param rec - is the record storing the new quota settings
49  * \param ver - is the version associated with the update
50  * \param global - is a boolean set to true if this is an update of the global
51  *                 index and false for a slave index.
52  */
53 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
54                                          struct lquota_entry *lqe,
55                                          union lquota_id *qid,
56                                          union lquota_rec *rec, __u64 ver,
57                                          bool global)
58 {
59         struct qsd_upd_rec      *upd;
60
61         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, __GFP_IO);
62         if (upd == NULL) {
63                 CERROR("Failed to allocate upd");
64                 return NULL;
65         }
66
67         /* fill it */
68         CFS_INIT_LIST_HEAD(&upd->qur_link);
69         upd->qur_qqi = qqi;
70         upd->qur_lqe = lqe;
71         if (lqe)
72                 lqe_getref(lqe);
73         upd->qur_qid    = *qid;
74         upd->qur_rec    = *rec;
75         upd->qur_ver    = ver;
76         upd->qur_global = global;
77
78         return upd;
79 }
80
81 static void qsd_upd_free(struct qsd_upd_rec *upd)
82 {
83         if (upd->qur_lqe)
84                 lqe_putref(upd->qur_lqe);
85         OBD_SLAB_FREE_PTR(upd, upd_kmem);
86 }
87
88 /* must hold the qsd_lock */
89 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
90 {
91         if (!qsd->qsd_stopping) {
92                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
93                 /* wake up the upd thread */
94                 cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
95         } else {
96                 CWARN("%s: discard update.\n", qsd->qsd_svname);
97                 if (upd->qur_lqe)
98                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
99                 qsd_upd_free(upd);
100         }
101 }
102
103 /* must hold the qsd_lock */
104 static void qsd_add_deferred(struct qsd_instance *qsd, cfs_list_t *list,
105                              struct qsd_upd_rec *upd)
106 {
107         struct qsd_upd_rec      *tmp, *n;
108
109         if (qsd->qsd_stopping) {
110                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
111                 if (upd->qur_lqe)
112                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
113                 qsd_upd_free(upd);
114                 return;
115         }
116
117         /* Sort the updates in ascending order */
118         cfs_list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
119
120                 /* There could be some legacy records which have duplicated
121                  * version. Imagine following scenario: slave received global
122                  * glimpse and queued a record in the deferred list, then
123                  * master crash and rollback to an ealier version, then the
124                  * version of queued record will be conflicting with later
125                  * updates. We should just delete the legacy record in such
126                  * case. */
127                 if (upd->qur_ver == tmp->qur_ver) {
128                         LASSERT(tmp->qur_lqe);
129                         LQUOTA_ERROR(tmp->qur_lqe, "Found a conflict record "
130                                      "with ver:"LPU64"", tmp->qur_ver);
131                         cfs_list_del_init(&tmp->qur_link);
132                         qsd_upd_free(tmp);
133                 }
134
135                 if (upd->qur_ver < tmp->qur_ver) {
136                         continue;
137                 } else {
138                         cfs_list_add_tail(&upd->qur_link, &tmp->qur_link);
139                         return;
140                 }
141         }
142         cfs_list_add(&upd->qur_link, list);
143 }
144
145 /* must hold the qsd_lock */
146 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi, cfs_list_t *list,
147                                  __u64 ver)
148 {
149         struct qsd_upd_rec      *upd, *tmp;
150         ENTRY;
151
152         /* Get the first update record in the list, which has the smallest
153          * version, discard all records with versions smaller than the current
154          * one */
155         cfs_list_for_each_entry_safe(upd, tmp, list, qur_link) {
156                 if (upd->qur_ver <= ver) {
157                         /* drop this update */
158                         cfs_list_del_init(&upd->qur_link);
159                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
160                                LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
161                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
162                                upd->qur_global, upd->qur_qid.qid_uid);
163                         qsd_upd_free(upd);
164                 } else {
165                         break;
166                 }
167         }
168
169         /* No remaining deferred update */
170         if (cfs_list_empty(list))
171                 RETURN_EXIT;
172
173         CDEBUG(D_QUOTA, "%s: found deferred update record. "
174                "version:"LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
175                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
176                upd->qur_global, upd->qur_qid.qid_uid);
177
178         LASSERTF(upd->qur_ver > ver, "lur_ver:"LPU64", cur_ver:"LPU64"\n",
179                  upd->qur_ver, ver);
180
181         /* Kick off the deferred udpate */
182         if (upd->qur_ver == ver + 1) {
183                 list_del_init(&upd->qur_link);
184                 qsd_upd_add(qqi->qqi_qsd, upd);
185         }
186         EXIT;
187 }
188
189 /* Bump version of global or slave index copy
190  *
191  * \param qqi    - qsd_qtype_info
192  * \param ver    - version to be bumped to
193  * \param global - global or slave index copy?
194  */
195 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
196 {
197         cfs_list_t      *list;
198         __u64           *idx_ver;
199
200         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
201         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
202
203         write_lock(&qqi->qqi_qsd->qsd_lock);
204         *idx_ver = ver;
205         if (global)
206                 qqi->qqi_glb_uptodate = 1;
207         else
208                 qqi->qqi_slv_uptodate = 1;
209         qsd_kickoff_deferred(qqi, list, ver);
210         write_unlock(&qqi->qqi_qsd->qsd_lock);
211 }
212
213 /*
214  * Schedule a commit of a lquota entry
215  *
216  * \param  qqi   - qsd_qtype_info
217  * \param  lqe   - lquota_entry
218  * \param  qid   - quota id
219  * \param  rec   - global or slave record to be updated to disk
220  * \param  ver   - new index file version
221  * \param  global- true: master record; false: slave record
222  */
223 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
224                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
225                       bool global)
226 {
227         struct qsd_upd_rec      *upd;
228         struct qsd_instance     *qsd = qqi->qqi_qsd;
229         __u64                    cur_ver;
230         ENTRY;
231
232         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:"LPU64"\n",
233                qsd->qsd_svname, global ? "true" : "false", ver);
234
235         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
236         if (upd == NULL)
237                 RETURN_EXIT;
238
239         /* If we don't want update index version, no need to sort the
240          * records in version order, just schedule the updates instantly. */
241         if (ver == 0) {
242                 write_lock(&qsd->qsd_lock);
243                 qsd_upd_add(qsd, upd);
244                 write_unlock(&qsd->qsd_lock);
245                 RETURN_EXIT;
246         }
247
248         write_lock(&qsd->qsd_lock);
249
250         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
251
252         if (ver <= cur_ver) {
253                 if (global)
254                         /* legitimate race between glimpse AST and
255                          * reintegration */
256                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
257                                " ver:"LPU64" local ver:"LPU64"\n",
258                                qsd->qsd_svname, ver, cur_ver);
259                 else
260                         CERROR("%s: discard slv update, ver:"LPU64" local ver:"
261                                LPU64"\n", qsd->qsd_svname, ver, cur_ver);
262                 qsd_upd_free(upd);
263         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
264                    qqi->qqi_slv_uptodate) {
265                 /* In order update, and reintegration has been done. */
266                 qsd_upd_add(qsd, upd);
267         } else {
268                 /* Out of order update (the one with smaller version hasn't
269                  * reached slave or hasn't been flushed to disk yet), or
270                  * the reintegration is in progress. Defer the update. */
271                 cfs_list_t *list = global ? &qqi->qqi_deferred_glb :
272                                             &qqi->qqi_deferred_slv;
273                 qsd_add_deferred(qsd, list, upd);
274         }
275
276         write_unlock(&qsd->qsd_lock);
277
278         EXIT;
279 }
280
281 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
282 {
283         struct lquota_entry     *lqe = upd->qur_lqe;
284         struct qsd_qtype_info   *qqi = upd->qur_qqi;
285         int                      rc;
286         ENTRY;
287
288         if (lqe == NULL) {
289                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
290                 if (IS_ERR(lqe))
291                         GOTO(out, rc = PTR_ERR(lqe));
292         }
293
294         /* The in-memory lqe update for slave index copy isn't deferred,
295          * we shouldn't touch it here. */
296         if (upd->qur_global) {
297                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
298                 if (rc)
299                         GOTO(out, rc);
300                 /* refresh usage */
301                 qsd_refresh_usage(env, lqe);
302                 /* Report usage asynchronously */
303                 rc = qsd_adjust(env, lqe);
304                 if (rc)
305                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
306         }
307
308         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
309                               upd->qur_ver, &upd->qur_rec);
310 out:
311         if (lqe && !IS_ERR(lqe)) {
312                 lqe_putref(lqe);
313                 upd->qur_lqe = NULL;
314         }
315         RETURN(rc);
316 }
317
318 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
319 {
320         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
321         bool                     added = false;
322
323         read_lock(&qsd->qsd_lock);
324         if (qsd->qsd_stopping) {
325                 read_unlock(&qsd->qsd_lock);
326                 return;
327         }
328         read_unlock(&qsd->qsd_lock);
329
330         lqe_getref(lqe);
331         spin_lock(&qsd->qsd_adjust_lock);
332
333         /* the lqe is being queued for the per-ID lock cancel, we should
334          * cancel the lock cancel and re-add it for quota adjust */
335         if (!cfs_list_empty(&lqe->lqe_link) &&
336             lqe->lqe_adjust_time == 0) {
337                 cfs_list_del_init(&lqe->lqe_link);
338                 lqe_putref(lqe);
339         }
340
341         if (cfs_list_empty(&lqe->lqe_link)) {
342                 if (cancel)
343                         lqe->lqe_adjust_time = 0;
344                 else
345                         lqe->lqe_adjust_time = defer ?
346                                 cfs_time_shift_64(QSD_WB_INTERVAL) :
347                                 cfs_time_current_64();
348                 /* lqe reference transfered to list */
349                 if (defer)
350                         cfs_list_add_tail(&lqe->lqe_link,
351                                           &qsd->qsd_adjust_list);
352                 else
353                         cfs_list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
354                 added = true;
355         }
356         spin_unlock(&qsd->qsd_adjust_lock);
357
358         if (added)
359                 cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
360         else
361                 lqe_putref(lqe);
362 }
363
364 /* return true if there is pending writeback records or the pending
365  * adjust requests */
366 static bool qsd_job_pending(struct qsd_instance *qsd, cfs_list_t *upd,
367                             bool *uptodate)
368 {
369         bool    job_pending = false;
370         int     qtype;
371
372         LASSERT(cfs_list_empty(upd));
373         *uptodate = true;
374
375         spin_lock(&qsd->qsd_adjust_lock);
376         if (!cfs_list_empty(&qsd->qsd_adjust_list)) {
377                 struct lquota_entry *lqe;
378                 lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
379                                      struct lquota_entry, lqe_link);
380                 if (cfs_time_beforeq_64(lqe->lqe_adjust_time,
381                                         cfs_time_current_64()))
382                         job_pending = true;
383         }
384         spin_unlock(&qsd->qsd_adjust_lock);
385
386         write_lock(&qsd->qsd_lock);
387         if (!cfs_list_empty(&qsd->qsd_upd_list)) {
388                 cfs_list_splice_init(&qsd->qsd_upd_list, upd);
389                 job_pending = true;
390         }
391
392         if (qsd->qsd_acct_failed) {
393                 /* don't bother kicking off reintegration if space accounting
394                  * failed to be enabled */
395                 write_unlock(&qsd->qsd_lock);
396                 return job_pending;
397         }
398
399         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
400                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
401
402                 if (!qsd_type_enabled(qsd, qtype))
403                         continue;
404
405                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
406                      !qqi->qqi_reint)
407                         /* global or slave index not up to date and reint
408                          * thread not running */
409                         *uptodate = false;
410         }
411
412         write_unlock(&qsd->qsd_lock);
413         return job_pending;
414 }
415
416 static int qsd_upd_thread(void *arg)
417 {
418         struct qsd_instance     *qsd = (struct qsd_instance *)arg;
419         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
420         struct l_wait_info       lwi;
421         cfs_list_t               queue;
422         struct qsd_upd_rec      *upd, *n;
423         struct lu_env           *env;
424         int                      qtype, rc = 0;
425         bool                     uptodate;
426         struct lquota_entry     *lqe, *tmp;
427         __u64                    cur_time;
428         ENTRY;
429
430         OBD_ALLOC_PTR(env);
431         if (env == NULL)
432                 RETURN(-ENOMEM);
433
434         rc = lu_env_init(env, LCT_DT_THREAD);
435         if (rc) {
436                 CERROR("%s: Fail to init env.", qsd->qsd_svname);
437                 OBD_FREE_PTR(env);
438                 RETURN(rc);
439         }
440
441         thread_set_flags(thread, SVC_RUNNING);
442         cfs_waitq_signal(&thread->t_ctl_waitq);
443
444         CFS_INIT_LIST_HEAD(&queue);
445         lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL);
446         while (1) {
447                 l_wait_event(thread->t_ctl_waitq,
448                              qsd_job_pending(qsd, &queue, &uptodate) ||
449                              !thread_is_running(thread), &lwi);
450
451                 cfs_list_for_each_entry_safe(upd, n, &queue, qur_link) {
452                         cfs_list_del_init(&upd->qur_link);
453                         qsd_process_upd(env, upd);
454                         qsd_upd_free(upd);
455                 }
456
457                 spin_lock(&qsd->qsd_adjust_lock);
458                 cur_time = cfs_time_current_64();
459                 cfs_list_for_each_entry_safe(lqe, tmp, &qsd->qsd_adjust_list,
460                                              lqe_link) {
461                         /* deferred items are sorted by time */
462                         if (!cfs_time_beforeq_64(lqe->lqe_adjust_time,
463                                                  cur_time))
464                                 break;
465
466                         cfs_list_del_init(&lqe->lqe_link);
467                         spin_unlock(&qsd->qsd_adjust_lock);
468
469                         if (thread_is_running(thread) && uptodate) {
470                                 qsd_refresh_usage(env, lqe);
471                                 if (lqe->lqe_adjust_time == 0)
472                                         qsd_id_lock_cancel(env, lqe);
473                                 else
474                                         qsd_adjust(env, lqe);
475                         }
476
477                         lqe_putref(lqe);
478                         spin_lock(&qsd->qsd_adjust_lock);
479                 }
480                 spin_unlock(&qsd->qsd_adjust_lock);
481
482                 if (!thread_is_running(thread))
483                         break;
484
485                 if (uptodate)
486                         continue;
487
488                 for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
489                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
490         }
491         lu_env_fini(env);
492         OBD_FREE_PTR(env);
493         thread_set_flags(thread, SVC_STOPPED);
494         cfs_waitq_signal(&thread->t_ctl_waitq);
495         RETURN(rc);
496 }
497
498 int qsd_start_upd_thread(struct qsd_instance *qsd)
499 {
500         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
501         struct l_wait_info       lwi = { 0 };
502         cfs_task_t              *task;
503         ENTRY;
504
505         task = kthread_run(qsd_upd_thread, (void *)qsd,
506                            "lquota_wb_%s", qsd->qsd_svname);
507         if (IS_ERR(task)) {
508                 CERROR("Fail to start quota update thread. rc: %ld\n",
509                         PTR_ERR(task));
510                 thread_set_flags(thread, SVC_STOPPED);
511                 RETURN(PTR_ERR(task));
512         }
513
514         l_wait_event(thread->t_ctl_waitq,
515                      thread_is_running(thread) || thread_is_stopped(thread),
516                      &lwi);
517         RETURN(0);
518 }
519
520 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
521 {
522         int     qtype;
523
524         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
525                 struct qsd_upd_rec      *upd, *tmp;
526                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
527
528                 if (qqi == NULL)
529                         continue;
530
531                 write_lock(&qsd->qsd_lock);
532                 cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
533                                              qur_link) {
534                         CWARN("%s: Free global deferred upd: ID:"LPU64", "
535                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
536                               upd->qur_qid.qid_uid, upd->qur_ver,
537                               qqi->qqi_glb_ver);
538                         list_del_init(&upd->qur_link);
539                         qsd_upd_free(upd);
540                 }
541                 cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
542                                              qur_link) {
543                         CWARN("%s: Free slave deferred upd: ID:"LPU64", "
544                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
545                               upd->qur_qid.qid_uid, upd->qur_ver,
546                               qqi->qqi_slv_ver);
547                         list_del_init(&upd->qur_link);
548                         qsd_upd_free(upd);
549                 }
550                 write_unlock(&qsd->qsd_lock);
551         }
552 }
553
554 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
555 {
556         struct lquota_entry     *lqe;
557
558         spin_lock(&qsd->qsd_adjust_lock);
559         while (!cfs_list_empty(&qsd->qsd_adjust_list)) {
560                 lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
561                                      struct lquota_entry, lqe_link);
562                 cfs_list_del_init(&lqe->lqe_link);
563                 lqe_putref(lqe);
564         }
565         spin_unlock(&qsd->qsd_adjust_lock);
566 }
567
568 void qsd_stop_upd_thread(struct qsd_instance *qsd)
569 {
570         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
571         struct l_wait_info       lwi    = { 0 };
572
573         if (!thread_is_stopped(thread)) {
574                 thread_set_flags(thread, SVC_STOPPING);
575                 cfs_waitq_signal(&thread->t_ctl_waitq);
576
577                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
578                              &lwi);
579         }
580         qsd_cleanup_deferred(qsd);
581         qsd_cleanup_adjust(qsd);
582 }