Whamcloud - gitweb
LU-1346 libcfs: cleanup libcfs primitive (linux-prim.h)
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include "qsd_internal.h"
34
35 extern struct kmem_cache *upd_kmem;
36
37 /*
38  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
39  * thread.
40  *
41  * \param qqi - is the qsd_qtype_info structure relevant to the update
42  * \param lqe - is the lquota entry subject to the update
43  * \param qid - is the identifier subject to the update
44  * \param rec - is the record storing the new quota settings
45  * \param ver - is the version associated with the update
46  * \param global - is a boolean set to true if this is an update of the global
47  *                 index and false for a slave index.
48  */
49 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
50                                          struct lquota_entry *lqe,
51                                          union lquota_id *qid,
52                                          union lquota_rec *rec, __u64 ver,
53                                          bool global)
54 {
55         struct qsd_upd_rec      *upd;
56
57         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, __GFP_IO);
58         if (upd == NULL) {
59                 CERROR("Failed to allocate upd");
60                 return NULL;
61         }
62
63         /* fill it */
64         CFS_INIT_LIST_HEAD(&upd->qur_link);
65         upd->qur_qqi = qqi;
66         upd->qur_lqe = lqe;
67         if (lqe)
68                 lqe_getref(lqe);
69         upd->qur_qid    = *qid;
70         upd->qur_rec    = *rec;
71         upd->qur_ver    = ver;
72         upd->qur_global = global;
73
74         return upd;
75 }
76
77 static void qsd_upd_free(struct qsd_upd_rec *upd)
78 {
79         if (upd->qur_lqe)
80                 lqe_putref(upd->qur_lqe);
81         OBD_SLAB_FREE_PTR(upd, upd_kmem);
82 }
83
84 /* must hold the qsd_lock */
85 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
86 {
87         if (!qsd->qsd_stopping) {
88                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
89                 /* wake up the upd thread */
90                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
91         } else {
92                 CWARN("%s: discard update.\n", qsd->qsd_svname);
93                 if (upd->qur_lqe)
94                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
95                 qsd_upd_free(upd);
96         }
97 }
98
99 /* must hold the qsd_lock */
100 static void qsd_add_deferred(struct qsd_instance *qsd, cfs_list_t *list,
101                              struct qsd_upd_rec *upd)
102 {
103         struct qsd_upd_rec      *tmp, *n;
104
105         if (qsd->qsd_stopping) {
106                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
107                 if (upd->qur_lqe)
108                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
109                 qsd_upd_free(upd);
110                 return;
111         }
112
113         /* Sort the updates in ascending order */
114         cfs_list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
115
116                 /* There could be some legacy records which have duplicated
117                  * version. Imagine following scenario: slave received global
118                  * glimpse and queued a record in the deferred list, then
119                  * master crash and rollback to an ealier version, then the
120                  * version of queued record will be conflicting with later
121                  * updates. We should just delete the legacy record in such
122                  * case. */
123                 if (upd->qur_ver == tmp->qur_ver) {
124                         LASSERT(tmp->qur_lqe);
125                         LQUOTA_ERROR(tmp->qur_lqe, "Found a conflict record "
126                                      "with ver:"LPU64"", tmp->qur_ver);
127                         cfs_list_del_init(&tmp->qur_link);
128                         qsd_upd_free(tmp);
129                 }
130
131                 if (upd->qur_ver < tmp->qur_ver) {
132                         continue;
133                 } else {
134                         cfs_list_add_tail(&upd->qur_link, &tmp->qur_link);
135                         return;
136                 }
137         }
138         cfs_list_add(&upd->qur_link, list);
139 }
140
141 /* must hold the qsd_lock */
142 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi, cfs_list_t *list,
143                                  __u64 ver)
144 {
145         struct qsd_upd_rec      *upd, *tmp;
146         ENTRY;
147
148         /* Get the first update record in the list, which has the smallest
149          * version, discard all records with versions smaller than the current
150          * one */
151         cfs_list_for_each_entry_safe(upd, tmp, list, qur_link) {
152                 if (upd->qur_ver <= ver) {
153                         /* drop this update */
154                         cfs_list_del_init(&upd->qur_link);
155                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
156                                LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
157                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
158                                upd->qur_global, upd->qur_qid.qid_uid);
159                         qsd_upd_free(upd);
160                 } else {
161                         break;
162                 }
163         }
164
165         /* No remaining deferred update */
166         if (cfs_list_empty(list))
167                 RETURN_EXIT;
168
169         CDEBUG(D_QUOTA, "%s: found deferred update record. "
170                "version:"LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
171                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
172                upd->qur_global, upd->qur_qid.qid_uid);
173
174         LASSERTF(upd->qur_ver > ver, "lur_ver:"LPU64", cur_ver:"LPU64"\n",
175                  upd->qur_ver, ver);
176
177         /* Kick off the deferred udpate */
178         if (upd->qur_ver == ver + 1) {
179                 list_del_init(&upd->qur_link);
180                 qsd_upd_add(qqi->qqi_qsd, upd);
181         }
182         EXIT;
183 }
184
185 /* Bump version of global or slave index copy
186  *
187  * \param qqi    - qsd_qtype_info
188  * \param ver    - version to be bumped to
189  * \param global - global or slave index copy?
190  */
191 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
192 {
193         cfs_list_t      *list;
194         __u64           *idx_ver;
195
196         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
197         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
198
199         write_lock(&qqi->qqi_qsd->qsd_lock);
200         *idx_ver = ver;
201         if (global)
202                 qqi->qqi_glb_uptodate = 1;
203         else
204                 qqi->qqi_slv_uptodate = 1;
205         qsd_kickoff_deferred(qqi, list, ver);
206         write_unlock(&qqi->qqi_qsd->qsd_lock);
207 }
208
209 /*
210  * Schedule a commit of a lquota entry
211  *
212  * \param  qqi   - qsd_qtype_info
213  * \param  lqe   - lquota_entry
214  * \param  qid   - quota id
215  * \param  rec   - global or slave record to be updated to disk
216  * \param  ver   - new index file version
217  * \param  global- true: master record; false: slave record
218  */
219 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
220                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
221                       bool global)
222 {
223         struct qsd_upd_rec      *upd;
224         struct qsd_instance     *qsd = qqi->qqi_qsd;
225         __u64                    cur_ver;
226         ENTRY;
227
228         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:"LPU64"\n",
229                qsd->qsd_svname, global ? "true" : "false", ver);
230
231         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
232         if (upd == NULL)
233                 RETURN_EXIT;
234
235         /* If we don't want update index version, no need to sort the
236          * records in version order, just schedule the updates instantly. */
237         if (ver == 0) {
238                 write_lock(&qsd->qsd_lock);
239                 qsd_upd_add(qsd, upd);
240                 write_unlock(&qsd->qsd_lock);
241                 RETURN_EXIT;
242         }
243
244         write_lock(&qsd->qsd_lock);
245
246         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
247
248         if (ver <= cur_ver) {
249                 if (global)
250                         /* legitimate race between glimpse AST and
251                          * reintegration */
252                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
253                                " ver:"LPU64" local ver:"LPU64"\n",
254                                qsd->qsd_svname, ver, cur_ver);
255                 else
256                         CERROR("%s: discard slv update, ver:"LPU64" local ver:"
257                                LPU64"\n", qsd->qsd_svname, ver, cur_ver);
258                 qsd_upd_free(upd);
259         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
260                    qqi->qqi_slv_uptodate) {
261                 /* In order update, and reintegration has been done. */
262                 qsd_upd_add(qsd, upd);
263         } else {
264                 /* Out of order update (the one with smaller version hasn't
265                  * reached slave or hasn't been flushed to disk yet), or
266                  * the reintegration is in progress. Defer the update. */
267                 cfs_list_t *list = global ? &qqi->qqi_deferred_glb :
268                                             &qqi->qqi_deferred_slv;
269                 qsd_add_deferred(qsd, list, upd);
270         }
271
272         write_unlock(&qsd->qsd_lock);
273
274         EXIT;
275 }
276
277 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
278 {
279         struct lquota_entry     *lqe = upd->qur_lqe;
280         struct qsd_qtype_info   *qqi = upd->qur_qqi;
281         int                      rc;
282         ENTRY;
283
284         if (lqe == NULL) {
285                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
286                 if (IS_ERR(lqe))
287                         GOTO(out, rc = PTR_ERR(lqe));
288         }
289
290         /* The in-memory lqe update for slave index copy isn't deferred,
291          * we shouldn't touch it here. */
292         if (upd->qur_global) {
293                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
294                 if (rc)
295                         GOTO(out, rc);
296                 /* refresh usage */
297                 qsd_refresh_usage(env, lqe);
298                 /* Report usage asynchronously */
299                 rc = qsd_adjust(env, lqe);
300                 if (rc)
301                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
302         }
303
304         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
305                               upd->qur_ver, &upd->qur_rec);
306 out:
307         if (lqe && !IS_ERR(lqe)) {
308                 lqe_putref(lqe);
309                 upd->qur_lqe = NULL;
310         }
311         RETURN(rc);
312 }
313
314 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
315 {
316         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
317         bool                     added = false;
318
319         read_lock(&qsd->qsd_lock);
320         if (qsd->qsd_stopping) {
321                 read_unlock(&qsd->qsd_lock);
322                 return;
323         }
324         read_unlock(&qsd->qsd_lock);
325
326         lqe_getref(lqe);
327         spin_lock(&qsd->qsd_adjust_lock);
328
329         /* the lqe is being queued for the per-ID lock cancel, we should
330          * cancel the lock cancel and re-add it for quota adjust */
331         if (!cfs_list_empty(&lqe->lqe_link) &&
332             lqe->lqe_adjust_time == 0) {
333                 cfs_list_del_init(&lqe->lqe_link);
334                 lqe_putref(lqe);
335         }
336
337         if (cfs_list_empty(&lqe->lqe_link)) {
338                 if (cancel)
339                         lqe->lqe_adjust_time = 0;
340                 else
341                         lqe->lqe_adjust_time = defer ?
342                                 cfs_time_shift_64(QSD_WB_INTERVAL) :
343                                 cfs_time_current_64();
344                 /* lqe reference transfered to list */
345                 if (defer)
346                         cfs_list_add_tail(&lqe->lqe_link,
347                                           &qsd->qsd_adjust_list);
348                 else
349                         cfs_list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
350                 added = true;
351         }
352         spin_unlock(&qsd->qsd_adjust_lock);
353
354         if (added)
355                 wake_up(&qsd->qsd_upd_thread.t_ctl_waitq);
356         else
357                 lqe_putref(lqe);
358 }
359
360 /* return true if there is pending writeback records or the pending
361  * adjust requests */
362 static bool qsd_job_pending(struct qsd_instance *qsd, cfs_list_t *upd,
363                             bool *uptodate)
364 {
365         bool    job_pending = false;
366         int     qtype;
367
368         LASSERT(cfs_list_empty(upd));
369         *uptodate = true;
370
371         spin_lock(&qsd->qsd_adjust_lock);
372         if (!cfs_list_empty(&qsd->qsd_adjust_list)) {
373                 struct lquota_entry *lqe;
374                 lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
375                                      struct lquota_entry, lqe_link);
376                 if (cfs_time_beforeq_64(lqe->lqe_adjust_time,
377                                         cfs_time_current_64()))
378                         job_pending = true;
379         }
380         spin_unlock(&qsd->qsd_adjust_lock);
381
382         write_lock(&qsd->qsd_lock);
383         if (!cfs_list_empty(&qsd->qsd_upd_list)) {
384                 cfs_list_splice_init(&qsd->qsd_upd_list, upd);
385                 job_pending = true;
386         }
387
388         if (qsd->qsd_acct_failed) {
389                 /* don't bother kicking off reintegration if space accounting
390                  * failed to be enabled */
391                 write_unlock(&qsd->qsd_lock);
392                 return job_pending;
393         }
394
395         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
396                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
397
398                 if (!qsd_type_enabled(qsd, qtype))
399                         continue;
400
401                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
402                      !qqi->qqi_reint)
403                         /* global or slave index not up to date and reint
404                          * thread not running */
405                         *uptodate = false;
406         }
407
408         write_unlock(&qsd->qsd_lock);
409         return job_pending;
410 }
411
412 static int qsd_upd_thread(void *arg)
413 {
414         struct qsd_instance     *qsd = (struct qsd_instance *)arg;
415         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
416         struct l_wait_info       lwi;
417         cfs_list_t               queue;
418         struct qsd_upd_rec      *upd, *n;
419         struct lu_env           *env;
420         int                      qtype, rc = 0;
421         bool                     uptodate;
422         struct lquota_entry     *lqe, *tmp;
423         __u64                    cur_time;
424         ENTRY;
425
426         OBD_ALLOC_PTR(env);
427         if (env == NULL)
428                 RETURN(-ENOMEM);
429
430         rc = lu_env_init(env, LCT_DT_THREAD);
431         if (rc) {
432                 CERROR("%s: Fail to init env.", qsd->qsd_svname);
433                 OBD_FREE_PTR(env);
434                 RETURN(rc);
435         }
436
437         thread_set_flags(thread, SVC_RUNNING);
438         wake_up(&thread->t_ctl_waitq);
439
440         CFS_INIT_LIST_HEAD(&queue);
441         lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL);
442         while (1) {
443                 l_wait_event(thread->t_ctl_waitq,
444                              qsd_job_pending(qsd, &queue, &uptodate) ||
445                              !thread_is_running(thread), &lwi);
446
447                 cfs_list_for_each_entry_safe(upd, n, &queue, qur_link) {
448                         cfs_list_del_init(&upd->qur_link);
449                         qsd_process_upd(env, upd);
450                         qsd_upd_free(upd);
451                 }
452
453                 spin_lock(&qsd->qsd_adjust_lock);
454                 cur_time = cfs_time_current_64();
455                 cfs_list_for_each_entry_safe(lqe, tmp, &qsd->qsd_adjust_list,
456                                              lqe_link) {
457                         /* deferred items are sorted by time */
458                         if (!cfs_time_beforeq_64(lqe->lqe_adjust_time,
459                                                  cur_time))
460                                 break;
461
462                         cfs_list_del_init(&lqe->lqe_link);
463                         spin_unlock(&qsd->qsd_adjust_lock);
464
465                         if (thread_is_running(thread) && uptodate) {
466                                 qsd_refresh_usage(env, lqe);
467                                 if (lqe->lqe_adjust_time == 0)
468                                         qsd_id_lock_cancel(env, lqe);
469                                 else
470                                         qsd_adjust(env, lqe);
471                         }
472
473                         lqe_putref(lqe);
474                         spin_lock(&qsd->qsd_adjust_lock);
475                 }
476                 spin_unlock(&qsd->qsd_adjust_lock);
477
478                 if (!thread_is_running(thread))
479                         break;
480
481                 if (uptodate)
482                         continue;
483
484                 for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
485                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
486         }
487         lu_env_fini(env);
488         OBD_FREE_PTR(env);
489         thread_set_flags(thread, SVC_STOPPED);
490         wake_up(&thread->t_ctl_waitq);
491         RETURN(rc);
492 }
493
494 int qsd_start_upd_thread(struct qsd_instance *qsd)
495 {
496         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
497         struct l_wait_info       lwi = { 0 };
498         struct task_struct              *task;
499         ENTRY;
500
501         task = kthread_run(qsd_upd_thread, (void *)qsd,
502                            "lquota_wb_%s", qsd->qsd_svname);
503         if (IS_ERR(task)) {
504                 CERROR("Fail to start quota update thread. rc: %ld\n",
505                         PTR_ERR(task));
506                 thread_set_flags(thread, SVC_STOPPED);
507                 RETURN(PTR_ERR(task));
508         }
509
510         l_wait_event(thread->t_ctl_waitq,
511                      thread_is_running(thread) || thread_is_stopped(thread),
512                      &lwi);
513         RETURN(0);
514 }
515
516 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
517 {
518         int     qtype;
519
520         for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
521                 struct qsd_upd_rec      *upd, *tmp;
522                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
523
524                 if (qqi == NULL)
525                         continue;
526
527                 write_lock(&qsd->qsd_lock);
528                 cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
529                                              qur_link) {
530                         CWARN("%s: Free global deferred upd: ID:"LPU64", "
531                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
532                               upd->qur_qid.qid_uid, upd->qur_ver,
533                               qqi->qqi_glb_ver);
534                         list_del_init(&upd->qur_link);
535                         qsd_upd_free(upd);
536                 }
537                 cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
538                                              qur_link) {
539                         CWARN("%s: Free slave deferred upd: ID:"LPU64", "
540                               "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
541                               upd->qur_qid.qid_uid, upd->qur_ver,
542                               qqi->qqi_slv_ver);
543                         list_del_init(&upd->qur_link);
544                         qsd_upd_free(upd);
545                 }
546                 write_unlock(&qsd->qsd_lock);
547         }
548 }
549
550 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
551 {
552         struct lquota_entry     *lqe;
553
554         spin_lock(&qsd->qsd_adjust_lock);
555         while (!cfs_list_empty(&qsd->qsd_adjust_list)) {
556                 lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
557                                      struct lquota_entry, lqe_link);
558                 cfs_list_del_init(&lqe->lqe_link);
559                 lqe_putref(lqe);
560         }
561         spin_unlock(&qsd->qsd_adjust_lock);
562 }
563
564 void qsd_stop_upd_thread(struct qsd_instance *qsd)
565 {
566         struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
567         struct l_wait_info       lwi    = { 0 };
568
569         if (!thread_is_stopped(thread)) {
570                 thread_set_flags(thread, SVC_STOPPING);
571                 wake_up(&thread->t_ctl_waitq);
572
573                 l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
574                              &lwi);
575         }
576         qsd_cleanup_deferred(qsd);
577         qsd_cleanup_adjust(qsd);
578 }