Whamcloud - gitweb
LU-17871 ldlm: FLOCK ownlocks may be not set
[fs/lustre-release.git] / lustre / quota / qsd_writeback.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <linux/kthread.h>
34 #include "qsd_internal.h"
35
36 /*
37  * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
38  * thread.
39  *
40  * \param qqi - is the qsd_qtype_info structure relevant to the update
41  * \param lqe - is the lquota entry subject to the update
42  * \param qid - is the identifier subject to the update
43  * \param rec - is the record storing the new quota settings
44  * \param ver - is the version associated with the update
45  * \param global - is a boolean set to true if this is an update of the global
46  *                 index and false for a slave index.
47  */
48 static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
49                                          struct lquota_entry *lqe,
50                                          union lquota_id *qid,
51                                          union lquota_rec *rec, __u64 ver,
52                                          bool global)
53 {
54         struct qsd_upd_rec      *upd;
55
56         OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, GFP_NOFS);
57         if (upd == NULL) {
58                 return NULL;
59         }
60
61         /* fill it */
62         INIT_LIST_HEAD(&upd->qur_link);
63         upd->qur_qqi = qqi;
64         upd->qur_lqe = lqe;
65         if (lqe)
66                 lqe_getref(lqe);
67         upd->qur_qid    = *qid;
68         upd->qur_rec    = *rec;
69         upd->qur_ver    = ver;
70         upd->qur_global = global;
71
72         return upd;
73 }
74
75 static void qsd_upd_free(struct qsd_upd_rec *upd)
76 {
77         if (upd->qur_lqe)
78                 lqe_putref(upd->qur_lqe);
79         OBD_SLAB_FREE_PTR(upd, upd_kmem);
80 }
81
82 /* must hold the qsd_lock */
83 static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
84 {
85         if (!qsd->qsd_stopping) {
86                 list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
87                 /* wake up the upd thread */
88                 if (qsd->qsd_upd_task)
89                         wake_up_process(qsd->qsd_upd_task);
90         } else {
91                 CWARN("%s: discard update.\n", qsd->qsd_svname);
92                 if (upd->qur_lqe)
93                         LQUOTA_WARN(upd->qur_lqe, "discard update.");
94                 qsd_upd_free(upd);
95         }
96 }
97
98 /* must hold the qsd_lock */
99 static void qsd_add_deferred(struct qsd_instance *qsd, struct list_head *list,
100                              struct qsd_upd_rec *upd)
101 {
102         struct qsd_upd_rec      *tmp, *n;
103
104         if (qsd->qsd_stopping) {
105                 CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
106                 if (upd->qur_lqe)
107                         LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
108                 qsd_upd_free(upd);
109                 return;
110         }
111
112         /* Sort the updates in ascending order */
113         list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
114
115                 /* There could be some legacy records which have duplicated
116                  * version. Imagine following scenario: slave received global
117                  * glimpse and queued a record in the deferred list, then
118                  * master crash and rollback to an ealier version, then the
119                  * version of queued record will be conflicting with later
120                  * updates. We should just delete the legacy record in such
121                  * case. */
122                 if (upd->qur_ver == tmp->qur_ver) {
123                         if (tmp->qur_lqe)
124                                 LQUOTA_WARN(tmp->qur_lqe, "Found a conflict "
125                                             "record with ver:%llu",
126                                             tmp->qur_ver);
127                         else
128                                 CWARN("%s: Found a conflict record with ver: "
129                                       "%llu\n", qsd->qsd_svname, tmp->qur_ver);
130
131                         list_del_init(&tmp->qur_link);
132                         qsd_upd_free(tmp);
133                 } else if (upd->qur_ver < tmp->qur_ver) {
134                         continue;
135                 } else {
136                         list_add_tail(&upd->qur_link, &tmp->qur_link);
137                         return;
138                 }
139         }
140         list_add(&upd->qur_link, list);
141 }
142
143 /* must hold the qsd_lock */
144 static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi,
145                                  struct list_head *list, __u64 ver)
146 {
147         struct qsd_upd_rec      *upd, *tmp;
148         ENTRY;
149
150         /* Get the first update record in the list, which has the smallest
151          * version, discard all records with versions smaller than the current
152          * one */
153         list_for_each_entry_safe(upd, tmp, list, qur_link) {
154                 if (upd->qur_ver <= ver) {
155                         /* drop this update */
156                         list_del_init(&upd->qur_link);
157                         CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
158                                "%llu/%llu, global:%d, qid:%llu\n",
159                                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
160                                upd->qur_global, upd->qur_qid.qid_uid);
161                         qsd_upd_free(upd);
162                 } else {
163                         break;
164                 }
165         }
166
167         /* No remaining deferred update */
168         if (list_empty(list))
169                 RETURN_EXIT;
170
171         CDEBUG(D_QUOTA, "%s: found deferred update record. "
172                "version:%llu/%llu, global:%d, qid:%llu\n",
173                qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
174                upd->qur_global, upd->qur_qid.qid_uid);
175
176         LASSERTF(upd->qur_ver > ver, "lur_ver:%llu, cur_ver:%llu\n",
177                  upd->qur_ver, ver);
178
179         /* Kick off the deferred udpate */
180         if (upd->qur_ver == ver + 1) {
181                 list_del_init(&upd->qur_link);
182                 qsd_upd_add(qqi->qqi_qsd, upd);
183         }
184         EXIT;
185 }
186
187 /* Bump version of global or slave index copy
188  *
189  * \param qqi    - qsd_qtype_info
190  * \param ver    - version to be bumped to
191  * \param global - global or slave index copy?
192  */
193 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
194 {
195         struct list_head *list;
196         __u64            *idx_ver;
197
198         idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
199         list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
200
201         write_lock(&qqi->qqi_qsd->qsd_lock);
202         *idx_ver = ver;
203         if (global)
204                 qqi->qqi_glb_uptodate = 1;
205         else
206                 qqi->qqi_slv_uptodate = 1;
207         qsd_kickoff_deferred(qqi, list, ver);
208         write_unlock(&qqi->qqi_qsd->qsd_lock);
209 }
210
211 /*
212  * Schedule a commit of a lquota entry
213  *
214  * \param  qqi   - qsd_qtype_info
215  * \param  lqe   - lquota_entry
216  * \param  qid   - quota id
217  * \param  rec   - global or slave record to be updated to disk
218  * \param  ver   - new index file version
219  * \param  global- true: master record; false: slave record
220  */
221 void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
222                       union lquota_id *qid, union lquota_rec *rec, __u64 ver,
223                       bool global)
224 {
225         struct qsd_upd_rec      *upd;
226         struct qsd_instance     *qsd = qqi->qqi_qsd;
227         __u64                    cur_ver;
228         ENTRY;
229
230         CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:%llu\n",
231                qsd->qsd_svname, global ? "true" : "false", ver);
232
233         upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
234         if (upd == NULL)
235                 RETURN_EXIT;
236
237         /* If we don't want update index version, no need to sort the
238          * records in version order, just schedule the updates instantly. */
239         if (ver == 0) {
240                 write_lock(&qsd->qsd_lock);
241                 qsd_upd_add(qsd, upd);
242                 write_unlock(&qsd->qsd_lock);
243                 RETURN_EXIT;
244         }
245
246         write_lock(&qsd->qsd_lock);
247
248         cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
249
250         if (ver <= cur_ver) {
251                 if (global)
252                         /* legitimate race between glimpse AST and
253                          * reintegration */
254                         CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
255                                " ver:%llu local ver:%llu\n",
256                                qsd->qsd_svname, ver, cur_ver);
257                 else
258                         CERROR("%s: discard slv update, ver:%llu local ver:"
259                                "%llu\n", qsd->qsd_svname, ver, cur_ver);
260                 qsd_upd_free(upd);
261         } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
262                    qqi->qqi_slv_uptodate) {
263                 /* In order update, and reintegration has been done. */
264                 qsd_upd_add(qsd, upd);
265         } else {
266                 /* Out of order update (the one with smaller version hasn't
267                  * reached slave or hasn't been flushed to disk yet), or
268                  * the reintegration is in progress. Defer the update. */
269                 struct list_head *list = global ? &qqi->qqi_deferred_glb :
270                                                   &qqi->qqi_deferred_slv;
271                 qsd_add_deferred(qsd, list, upd);
272         }
273
274         write_unlock(&qsd->qsd_lock);
275
276         EXIT;
277 }
278
279 static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
280 {
281         struct lquota_entry     *lqe = upd->qur_lqe;
282         struct qsd_qtype_info   *qqi = upd->qur_qqi;
283         struct qsd_instance     *qsd = qqi->qqi_qsd;
284         int                      rc;
285         ENTRY;
286
287         if (qsd->qsd_exclusive) { /* It could be deadlock running with reint */
288                 read_lock(&qsd->qsd_lock);
289                 rc = qqi->qqi_reint;
290                 read_unlock(&qsd->qsd_lock);
291                 if (rc)
292                         return 1;
293         }
294
295         if (upd->qur_global &&
296             (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
297                                                         LQUOTA_FLAG_DELETED)) {
298                 struct thandle          *th = NULL;
299                 struct dt_object        *obj;
300
301                 obj = qqi->qqi_glb_obj;
302
303                 th = dt_trans_create(env, qqi->qqi_qsd->qsd_dev);
304                 if (IS_ERR(th))
305                         RETURN(PTR_ERR(th));
306
307                 rc = lquota_disk_declare_write(env, th, obj, &upd->qur_qid);
308                 if (rc)
309                         GOTO(out_del, rc);
310
311                 rc = dt_trans_start_local(env, qqi->qqi_qsd->qsd_dev, th);
312                 if (rc)
313                         GOTO(out_del, rc);
314
315                 rc = lquota_disk_delete(env, th, obj, upd->qur_qid.qid_uid,
316                                         NULL);
317                 if (rc == -ENOENT)
318                         rc = 0;
319
320 out_del:
321                 dt_trans_stop(env, qqi->qqi_qsd->qsd_dev, th);
322                 if (lqe != NULL)
323                         lqe_set_deleted(lqe);
324
325                 qsd_bump_version(qqi, upd->qur_ver, true);
326                 RETURN(rc);
327         }
328
329         if (lqe == NULL) {
330                 lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
331                 if (IS_ERR(lqe))
332                         GOTO(out, rc = PTR_ERR(lqe));
333         }
334
335         lqe->lqe_is_reset = false;
336         lqe->lqe_is_deleted = 0;
337
338         /* The in-memory lqe update for slave index copy isn't deferred,
339          * we shouldn't touch it here. */
340         if (upd->qur_global) {
341                 rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
342                 if (rc)
343                         GOTO(out, rc);
344                 /* refresh usage */
345                 qsd_refresh_usage(env, lqe);
346
347                 spin_lock(&qsd->qsd_adjust_lock);
348                 lqe->lqe_adjust_time = 0;
349                 spin_unlock(&qsd->qsd_adjust_lock);
350
351                 if (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
352                                                         LQUOTA_FLAG_REVOKE)
353                         lqe->lqe_revoke = 1;
354
355                 /* Report usage asynchronously */
356                 rc = qsd_adjust(env, lqe);
357                 lqe->lqe_revoke = 0;
358                 if (rc)
359                         LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
360         }
361
362         rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
363                               upd->qur_ver, &upd->qur_rec);
364 out:
365         if (upd->qur_global && rc == 0 &&
366             upd->qur_rec.lqr_glb_rec.qbr_softlimit == 0 &&
367             upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
368             (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
369                                                         LQUOTA_FLAG_DEFAULT)) {
370                 lqe->lqe_is_default = true;
371                 if (qqi->qqi_default_softlimit == 0 &&
372                     qqi->qqi_default_hardlimit == 0)
373                         lqe->lqe_enforced = false;
374                 else
375                         lqe->lqe_enforced = true;
376
377                 LQUOTA_DEBUG(lqe, "update to use default quota");
378         }
379         if (upd->qur_global && rc == 0 &&
380             (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
381                                                         LQUOTA_FLAG_RESET)) {
382                 struct lquota_slv_rec srec;
383
384                 lqe->lqe_granted = 0;
385                 lqe->lqe_softlimit = 0;
386                 lqe->lqe_hardlimit = 0;
387                 lqe->lqe_is_default = false;
388                 lqe->lqe_is_reset = true;
389
390                 memset(&srec, 0, sizeof(srec));
391                 rc = qsd_update_index(env, qqi, &upd->qur_qid, false, 0, &srec);
392         }
393
394         if (lqe && !IS_ERR(lqe)) {
395                 lqe_putref(lqe);
396                 upd->qur_lqe = NULL;
397         }
398         RETURN(rc);
399 }
400
401 void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
402 {
403         struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
404         bool                     added = false;
405
406         read_lock(&qsd->qsd_lock);
407         if (qsd->qsd_stopping) {
408                 read_unlock(&qsd->qsd_lock);
409                 return;
410         }
411         read_unlock(&qsd->qsd_lock);
412
413         lqe_getref(lqe);
414         spin_lock(&qsd->qsd_adjust_lock);
415
416         /* the lqe is being queued for the per-ID lock cancel, we should
417          * cancel the lock cancel and re-add it for quota adjust */
418         if (!list_empty(&lqe->lqe_link) &&
419             lqe->lqe_adjust_time == 0) {
420                 list_del_init(&lqe->lqe_link);
421                 lqe_putref(lqe);
422         }
423
424         if (list_empty(&lqe->lqe_link)) {
425                 if (!cancel) {
426                         lqe->lqe_adjust_time = ktime_get_seconds();
427                         if (defer)
428                                 lqe->lqe_adjust_time += QSD_WB_INTERVAL;
429                 } else {
430                         lqe->lqe_adjust_time = 0;
431                 }
432
433                 /* lqe reference transferred to list */
434                 if (defer)
435                         list_add_tail(&lqe->lqe_link,
436                                           &qsd->qsd_adjust_list);
437                 else
438                         list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
439                 added = true;
440         }
441         spin_unlock(&qsd->qsd_adjust_lock);
442
443         if (!added)
444                 lqe_putref(lqe);
445         else {
446                 read_lock(&qsd->qsd_lock);
447                 if (qsd->qsd_upd_task)
448                         wake_up_process(qsd->qsd_upd_task);
449                 read_unlock(&qsd->qsd_lock);
450         }
451 }
452
453 /* return true if there is pending writeback records or the pending
454  * adjust requests */
455 static bool qsd_job_pending(struct qsd_instance *qsd, struct list_head *upd,
456                             bool *uptodate)
457 {
458         bool    job_pending = false;
459         int     qtype;
460
461         LASSERT(list_empty(upd));
462         *uptodate = true;
463
464         spin_lock(&qsd->qsd_adjust_lock);
465         if (!list_empty(&qsd->qsd_adjust_list)) {
466                 struct lquota_entry *lqe;
467                 lqe = list_first_entry(&qsd->qsd_adjust_list,
468                                        struct lquota_entry, lqe_link);
469                 if (ktime_get_seconds() >= lqe->lqe_adjust_time)
470                         job_pending = true;
471         }
472         spin_unlock(&qsd->qsd_adjust_lock);
473
474         write_lock(&qsd->qsd_lock);
475         if (!list_empty(&qsd->qsd_upd_list)) {
476                 list_splice_init(&qsd->qsd_upd_list, upd);
477                 job_pending = true;
478         }
479         if (qsd->qsd_exclusive)
480                 qsd->qsd_updating = job_pending;
481
482         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
483                 struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
484
485                 /* don't bother kicking off reintegration if space accounting
486                  * failed to be enabled */
487                 if (qqi->qqi_acct_failed)
488                         continue;
489
490                 if (!qsd_type_enabled(qsd, qtype))
491                         continue;
492
493                 if ((!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) &&
494                      !qqi->qqi_reint)
495                         /* global or slave index not up to date and reint
496                          * thread not running */
497                         *uptodate = false;
498         }
499
500         write_unlock(&qsd->qsd_lock);
501         return job_pending;
502 }
503
504 struct qsd_upd_args {
505         struct qsd_instance     *qua_inst;
506         struct lu_env            qua_env;
507         struct completion       *qua_started;
508 };
509
510 #ifndef TASK_IDLE
511 /* This identity is only safe inside kernel threads, or other places where
512  * all signals are disabled.  So it is placed here rather than in an include
513  * file.
514  * TASK_IDLE was added in v4.1-rc4-43-g80ed87c8a9ca so this can be removed
515  * when we no longer support kernels older than that.
516  */
517 #define TASK_IDLE TASK_INTERRUPTIBLE
518 #endif
519
520 static int qsd_upd_thread(void *_args)
521 {
522         struct qsd_upd_args     *args = _args;
523         struct qsd_instance     *qsd = args->qua_inst;
524         LIST_HEAD(queue);
525         struct qsd_upd_rec      *upd, *n;
526         struct lu_env           *env = &args->qua_env;
527         int                      qtype, rc = 0;
528         bool                     uptodate;
529         struct lquota_entry     *lqe;
530         time64_t cur_time;
531         ENTRY;
532
533         complete(args->qua_started);
534         while (({set_current_state(TASK_IDLE);
535                  !kthread_should_stop(); })) {
536                 int count = 0;
537
538                 if (!qsd_job_pending(qsd, &queue, &uptodate))
539                         schedule_timeout(cfs_time_seconds(QSD_WB_INTERVAL));
540                 __set_current_state(TASK_RUNNING);
541
542                 while (1) {
543                         list_for_each_entry_safe(upd, n, &queue, qur_link) {
544                                 if (qsd_process_upd(env, upd) <= 0) {
545                                         list_del_init(&upd->qur_link);
546                                         qsd_upd_free(upd);
547                                 }
548                         }
549                         if (list_empty(&queue))
550                                 break;
551                         count++;
552                         if (count % 7 == 0) {
553                                 n = list_first_entry(&queue, struct qsd_upd_rec,
554                                                      qur_link);
555                                 CWARN("%s: The reintegration thread [%d] "
556                                       "blocked more than %ld seconds\n",
557                                       n->qur_qqi->qqi_qsd->qsd_svname,
558                                       n->qur_qqi->qqi_qtype, count *
559                                       cfs_time_seconds(QSD_WB_INTERVAL) / 10);
560                         }
561                         schedule_timeout_interruptible(
562                                 cfs_time_seconds(QSD_WB_INTERVAL) / 10);
563                 }
564                 if (qsd->qsd_exclusive) {
565                         write_lock(&qsd->qsd_lock);
566                         qsd->qsd_updating = false;
567                         write_unlock(&qsd->qsd_lock);
568                 }
569
570                 spin_lock(&qsd->qsd_adjust_lock);
571                 cur_time = ktime_get_seconds();
572                 while (!list_empty(&qsd->qsd_adjust_list)) {
573                         lqe = list_first_entry(&qsd->qsd_adjust_list,
574                                                struct lquota_entry, lqe_link);
575                         /* deferred items are sorted by time */
576                         if (lqe->lqe_adjust_time > cur_time)
577                                 break;
578
579                         list_del_init(&lqe->lqe_link);
580                         spin_unlock(&qsd->qsd_adjust_lock);
581
582                         if (!kthread_should_stop() && uptodate) {
583                                 qsd_refresh_usage(env, lqe);
584                                 if (lqe->lqe_adjust_time == 0)
585                                         qsd_id_lock_cancel(env, lqe);
586                                 else
587                                         qsd_adjust(env, lqe);
588                         }
589
590                         lqe_putref(lqe);
591                         spin_lock(&qsd->qsd_adjust_lock);
592                 }
593                 spin_unlock(&qsd->qsd_adjust_lock);
594
595                 if (uptodate || kthread_should_stop())
596                         continue;
597
598                 for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
599                         qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
600         }
601         __set_current_state(TASK_RUNNING);
602
603         lu_env_fini(env);
604         OBD_FREE_PTR(args);
605
606         RETURN(rc);
607 }
608
609 int qsd_start_upd_thread(struct qsd_instance *qsd)
610 {
611         struct qsd_upd_args *args;
612         struct task_struct *task;
613         DECLARE_COMPLETION_ONSTACK(started);
614         int rc;
615         ENTRY;
616
617         OBD_ALLOC_PTR(args);
618         if (args == NULL)
619                 RETURN(-ENOMEM);
620
621         rc = lu_env_init(&args->qua_env, LCT_DT_THREAD);
622         if (rc) {
623                 CERROR("%s: cannot init env: rc = %d\n", qsd->qsd_svname, rc);
624                 goto out_free;
625         }
626         args->qua_inst = qsd;
627         args->qua_started = &started;
628
629         task = kthread_create(qsd_upd_thread, args,
630                               "lquota_wb_%s", qsd->qsd_svname);
631         if (IS_ERR(task)) {
632                 rc = PTR_ERR(task);
633                 CERROR("fail to start quota update thread: rc = %d\n", rc);
634                 goto out_fini;
635         }
636         qsd->qsd_upd_task = task;
637         wake_up_process(task);
638         wait_for_completion(&started);
639
640         RETURN(0);
641
642 out_fini:
643         lu_env_fini(&args->qua_env);
644 out_free:
645         OBD_FREE_PTR(args);
646         RETURN(rc);
647 }
648
649 static void qsd_cleanup_deferred(struct qsd_instance *qsd)
650 {
651         int     qtype;
652
653         for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
654                 struct qsd_upd_rec      *upd, *tmp;
655                 struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
656
657                 if (qqi == NULL)
658                         continue;
659
660                 write_lock(&qsd->qsd_lock);
661                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
662                                          qur_link) {
663                         CWARN("%s: Free global deferred upd: ID:%llu, "
664                               "ver:%llu/%llu\n", qsd->qsd_svname,
665                               upd->qur_qid.qid_uid, upd->qur_ver,
666                               qqi->qqi_glb_ver);
667                         list_del_init(&upd->qur_link);
668                         qsd_upd_free(upd);
669                 }
670                 list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
671                                          qur_link) {
672                         CWARN("%s: Free slave deferred upd: ID:%llu, "
673                               "ver:%llu/%llu\n", qsd->qsd_svname,
674                               upd->qur_qid.qid_uid, upd->qur_ver,
675                               qqi->qqi_slv_ver);
676                         list_del_init(&upd->qur_link);
677                         qsd_upd_free(upd);
678                 }
679                 write_unlock(&qsd->qsd_lock);
680         }
681 }
682
683 static void qsd_cleanup_adjust(struct qsd_instance *qsd)
684 {
685         struct lquota_entry     *lqe;
686
687         spin_lock(&qsd->qsd_adjust_lock);
688         while (!list_empty(&qsd->qsd_adjust_list)) {
689                 lqe = list_first_entry(&qsd->qsd_adjust_list,
690                                        struct lquota_entry, lqe_link);
691                 list_del_init(&lqe->lqe_link);
692                 lqe_putref(lqe);
693         }
694         spin_unlock(&qsd->qsd_adjust_lock);
695 }
696
697 void qsd_stop_upd_thread(struct qsd_instance *qsd)
698 {
699         struct task_struct *task;
700
701         write_lock(&qsd->qsd_lock);
702         task = qsd->qsd_upd_task;
703         qsd->qsd_upd_task = NULL;
704         write_unlock(&qsd->qsd_lock);
705         if (task)
706                 kthread_stop(task);
707
708         qsd_cleanup_deferred(qsd);
709         qsd_cleanup_adjust(qsd);
710 }