Whamcloud - gitweb
libcfs: fixes to CDEBUG(), LASSERT() and friends to reduce stack consumption
[fs/lustre-release.git] / lustre / quota / quota_context.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/quota/quota_context.c
5  *  Lustre Quota Context
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Niu YaWei <niu@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
13  *
14  */
15 #ifndef EXPORT_SYMTAB
16 # define EXPORT_SYMTAB
17 #endif
18
19 #define DEBUG_SUBSYSTEM S_MDS
20
21 #include <linux/version.h>
22 #include <linux/fs.h>
23 #include <asm/unistd.h>
24 #include <linux/slab.h>
25 #include <linux/quotaops.h>
26 #include <linux/module.h>
27 #include <linux/init.h>
28
29 #include <linux/obd_class.h>
30 #include <linux/lustre_quota.h>
31 #include <linux/lustre_fsfilt.h>
32 #include "quota_internal.h"
33
34 unsigned long default_bunit_sz = 100 * 1024 * 1024;       /* 100M bytes */
35 unsigned long default_btune_ratio = 50;                   /* 50 percentage */
36 unsigned long default_iunit_sz = 5000;       /* 5000 inodes */
37 unsigned long default_itune_ratio = 50;      /* 50 percentage */
38
39 kmem_cache_t *qunit_cachep = NULL;
40 struct list_head qunit_hash[NR_DQHASH];
41 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
42
43 struct lustre_qunit {
44         struct list_head lq_hash;               /* Hash list in memory */
45         atomic_t lq_refcnt;                     /* Use count */
46         struct lustre_quota_ctxt *lq_ctxt;      /* Quota context this applies to */
47         struct qunit_data lq_data;              /* See qunit_data */
48         unsigned int lq_opc;                    /* QUOTA_DQACQ, QUOTA_DQREL */
49         struct list_head lq_waiters;            /* All write threads waiting for this qunit */
50 };
51
52 void qunit_cache_cleanup(void)
53 {
54         int i;
55         ENTRY;
56
57         spin_lock(&qunit_hash_lock);
58         for (i = 0; i < NR_DQHASH; i++)
59                 LASSERT(list_empty(qunit_hash + i));
60         spin_unlock(&qunit_hash_lock);
61
62         if (qunit_cachep) {
63                 int rc;
64                 rc = kmem_cache_destroy(qunit_cachep);
65                 LASSERT(rc == 0);
66                 qunit_cachep = NULL;
67         }
68         EXIT;
69 }
70
71 int qunit_cache_init(void)
72 {
73         int i;
74         ENTRY;
75
76         LASSERT(qunit_cachep == NULL);
77         qunit_cachep = kmem_cache_create("ll_qunit_cache",
78                                          sizeof(struct lustre_qunit),
79                                          0, 0, NULL, NULL);
80         if (!qunit_cachep)
81                 RETURN(-ENOMEM);
82
83         spin_lock(&qunit_hash_lock);
84         for (i = 0; i < NR_DQHASH; i++)
85                 INIT_LIST_HEAD(qunit_hash + i);
86         spin_unlock(&qunit_hash_lock);
87         RETURN(0);
88 }
89
90 static inline int const
91 qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
92 {
93         unsigned int id = qdata->qd_id;
94         unsigned int type = qdata->qd_type;
95
96         unsigned long tmp = ((unsigned long)qctxt >> L1_CACHE_SHIFT) ^ id;
97         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
98         return tmp;
99 }
100
101 /* caller must hold qunit_hash_lock */
102 static inline struct lustre_qunit *find_qunit(unsigned int hashent,
103                                               struct lustre_quota_ctxt *qctxt,
104                                               struct qunit_data *qdata)
105 {
106         struct lustre_qunit *qunit = NULL;
107         struct qunit_data *tmp;
108
109         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
110         list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) {
111                 tmp = &qunit->lq_data;
112                 if (qunit->lq_ctxt == qctxt &&
113                     qdata->qd_id == tmp->qd_id && qdata->qd_type == tmp->qd_type
114                     && qdata->qd_isblk == tmp->qd_isblk)
115                         return qunit;
116         }
117         return NULL;
118 }
119
120 /* check_cur_qunit - check the current usage of qunit.
121  * @qctxt: quota context
122  * @qdata: the type of quota unit to be checked
123  *
124  * return: 1 - need acquire qunit;
125  *         2 - need release qunit;
126  *         0 - need do nothing.
127  *       < 0 - error.
128  */
129 static int
130 check_cur_qunit(struct obd_device *obd,
131                 struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
132 {
133         struct super_block *sb = qctxt->lqc_sb;
134         unsigned long qunit_sz, tune_sz;
135         __u64 usage, limit;
136         struct obd_quotactl *qctl;
137         int ret = 0;
138         ENTRY;
139
140         if (!sb_any_quota_enabled(sb))
141                 RETURN(0);
142
143         /* ignore root user */
144         if (qdata->qd_id == 0 && qdata->qd_type == USRQUOTA)
145                 RETURN(0);
146
147         OBD_ALLOC_PTR(qctl);
148         if (qctl == NULL)
149                 RETURN(-ENOMEM);
150
151         /* get fs quota usage & limit */
152         qctl->qc_cmd = Q_GETQUOTA;
153         qctl->qc_id = qdata->qd_id;
154         qctl->qc_type = qdata->qd_type;
155         ret = fsfilt_quotactl(obd, sb, qctl);
156         if (ret) {
157                 if (ret == -ESRCH)      /* no limit */
158                         ret = 0;
159                 else
160                         CERROR("can't get fs quota usage! (rc:%d)\n", ret);
161                 GOTO(out, ret);
162         }
163
164         if (qdata->qd_isblk) {
165                 usage = qctl->qc_dqblk.dqb_curspace;
166                 limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS;
167                 qunit_sz = qctxt->lqc_bunit_sz;
168                 tune_sz = qctxt->lqc_btune_sz;
169
170                 LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
171         } else {
172                 usage = qctl->qc_dqblk.dqb_curinodes;
173                 limit = qctl->qc_dqblk.dqb_ihardlimit;
174                 qunit_sz = qctxt->lqc_iunit_sz;
175                 tune_sz = qctxt->lqc_itune_sz;
176         }
177
178         /* ignore the no quota limit case */
179         if (!limit)
180                 GOTO(out, ret = 0);
181
182         /* we don't count the MIN_QLIMIT */
183         if ((limit == MIN_QLIMIT && !qdata->qd_isblk) ||
184             (toqb(limit) == MIN_QLIMIT && qdata->qd_isblk))
185                 limit = 0;
186
187         LASSERT(qdata->qd_count == 0);
188         if (limit <= usage + tune_sz) {
189                 while (qdata->qd_count + limit <= usage + tune_sz)
190                         qdata->qd_count += qunit_sz;
191                 ret = 1;
192         } else if (limit > usage + qunit_sz + tune_sz) {
193                 while (limit - qdata->qd_count > usage + qunit_sz + tune_sz)
194                         qdata->qd_count += qunit_sz;
195                 ret = 2;
196         }
197         LASSERT(ret == 0 || qdata->qd_count);
198         EXIT;
199 out:
200         OBD_FREE_PTR(qctl);
201         return ret;
202 }
203
204 /* caller must hold qunit_hash_lock */
205 static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
206                                             struct qunit_data *qdata)
207 {
208         unsigned int hashent = qunit_hashfn(qctxt, qdata);
209         struct lustre_qunit *qunit;
210         ENTRY;
211
212         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
213         qunit = find_qunit(hashent, qctxt, qdata);
214         RETURN(qunit);
215 }
216
217 static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
218                                         struct qunit_data *qdata, int opc)
219 {
220         struct lustre_qunit *qunit = NULL;
221         ENTRY;
222
223         OBD_SLAB_ALLOC(qunit, qunit_cachep, SLAB_NOFS, sizeof(*qunit));
224         if (qunit == NULL)
225                 RETURN(NULL);
226
227         INIT_LIST_HEAD(&qunit->lq_hash);
228         INIT_LIST_HEAD(&qunit->lq_waiters);
229         atomic_set(&qunit->lq_refcnt, 1);
230         qunit->lq_ctxt = qctxt;
231         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
232         qunit->lq_opc = opc;
233
234         RETURN(qunit);
235 }
236
237 static inline void free_qunit(struct lustre_qunit *qunit)
238 {
239         OBD_SLAB_FREE(qunit, qunit_cachep, sizeof(*qunit));
240 }
241
242 static inline void qunit_get(struct lustre_qunit *qunit)
243 {
244         atomic_inc(&qunit->lq_refcnt);
245 }
246
247 static void qunit_put(struct lustre_qunit *qunit)
248 {
249         LASSERT(atomic_read(&qunit->lq_refcnt));
250         if (atomic_dec_and_test(&qunit->lq_refcnt))
251                 free_qunit(qunit);
252 }
253
254 static void
255 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
256 {
257         struct list_head *head;
258
259         LASSERT(list_empty(&qunit->lq_hash));
260         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
261         list_add(&qunit->lq_hash, head);
262 }
263
264 static void remove_qunit_nolock(struct lustre_qunit *qunit)
265 {
266         LASSERT(!list_empty(&qunit->lq_hash));
267         list_del_init(&qunit->lq_hash);
268 }
269
270 struct qunit_waiter {
271         struct list_head qw_entry;
272         wait_queue_head_t qw_waitq;
273         int qw_rc;
274 };
275
276 #define QDATA_DEBUG(qd, fmt, arg...)                                    \
277         CDEBUG(D_QUOTA, "id(%u) type(%u) count(%u) isblk(%u):"          \
278                fmt, qd->qd_id, qd->qd_type, qd->qd_count, qd->qd_isblk, \
279                ## arg);                                                 \
280
281 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
282                                  (limit = count) : (limit += count)
283
284
285 /* FIXME check if this mds is the master of specified id */
286 static int
287 is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
288           unsigned int id, int type)
289 {
290         return qctxt->lqc_handler ? 1 : 0;
291 }
292
293 static int
294 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
295                struct qunit_data *qdata, int opc, int wait);
296
297 static int
298 dqacq_completion(struct obd_device *obd,
299                  struct lustre_quota_ctxt *qctxt,
300                  struct qunit_data *qdata, int rc, int opc)
301 {
302         struct lustre_qunit *qunit = NULL;
303         struct super_block *sb = qctxt->lqc_sb;
304         unsigned long qunit_sz;
305         struct qunit_waiter *qw, *tmp;
306         int err = 0;
307         ENTRY;
308
309         LASSERT(qdata);
310         qunit_sz = qdata->qd_isblk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz;
311         LASSERT(!(qdata->qd_count % qunit_sz));
312
313         /* update local operational quota file */
314         if (rc == 0) {
315                 __u32 count = QUSG(qdata->qd_count, qdata->qd_isblk);
316                 struct obd_quotactl *qctl;
317                 __u64 *hardlimit;
318
319                 OBD_ALLOC_PTR(qctl);
320                 if (qctl == NULL)
321                         GOTO(out, err = -ENOMEM);
322
323                 /* acq/rel qunit for specified uid/gid is serialized,
324                  * so there is no race between get fs quota limit and
325                  * set fs quota limit */
326                 qctl->qc_cmd = Q_GETQUOTA;
327                 qctl->qc_id = qdata->qd_id;
328                 qctl->qc_type = qdata->qd_type;
329                 err = fsfilt_quotactl(obd, sb, qctl);
330                 if (err) {
331                         CERROR("error get quota fs limit! (rc:%d)\n", err);
332                         GOTO(out_mem, err);
333                 }
334
335                 if (qdata->qd_isblk) {
336                         qctl->qc_dqblk.dqb_valid = QIF_BLIMITS;
337                         hardlimit = &qctl->qc_dqblk.dqb_bhardlimit;
338                 } else {
339                         qctl->qc_dqblk.dqb_valid = QIF_ILIMITS;
340                         hardlimit = &qctl->qc_dqblk.dqb_ihardlimit;
341                 }
342
343                 switch (opc) {
344                 case QUOTA_DQACQ:
345                         INC_QLIMIT(*hardlimit, count);
346                         break;
347                 case QUOTA_DQREL:
348                         LASSERT(count < *hardlimit);
349                         *hardlimit -= count;
350                         break;
351                 default:
352                         LBUG();
353                 }
354
355                 /* clear quota limit */
356                 if (count == 0)
357                         *hardlimit = 0;
358
359                 qctl->qc_cmd = Q_SETQUOTA;
360                 err = fsfilt_quotactl(obd, sb, qctl);
361                 if (err)
362                         CERROR("error set quota fs limit! (rc:%d)\n", err);
363
364                 QDATA_DEBUG(qdata, "%s completion\n",
365                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
366 out_mem:
367                 OBD_FREE_PTR(qctl);
368         } else if (rc == -EDQUOT) {
369                 QDATA_DEBUG(qdata, "acquire qunit got EDQUOT.\n");
370         } else if (rc == -EBUSY) {
371                 QDATA_DEBUG(qdata, "it's is recovering, got EBUSY.\n");
372         } else {
373                 CERROR("acquire qunit got error! (rc:%d)\n", rc);
374         }
375 out:
376         /* remove the qunit from hash */
377         spin_lock(&qunit_hash_lock);
378
379         qunit = dqacq_in_flight(qctxt, qdata);
380         /* this qunit has been removed by qctxt_cleanup() */
381         if (!qunit) {
382                 spin_unlock(&qunit_hash_lock);
383                 RETURN(err);
384         }
385
386         LASSERT(opc == qunit->lq_opc);
387         remove_qunit_nolock(qunit);
388
389         /* wake up all waiters */
390         list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) {
391                 list_del_init(&qw->qw_entry);
392                 qw->qw_rc = rc;
393                 wake_up(&qw->qw_waitq);
394         }
395
396         spin_unlock(&qunit_hash_lock);
397
398         qunit_put(qunit);
399
400         /* don't reschedule in such cases:
401          *   - acq/rel failure, but not for quota recovery.
402          *   - local dqacq/dqrel.
403          *   - local disk io failure.
404          */
405         if (err || (rc && rc != -EBUSY) ||
406             is_master(obd, qctxt, qdata->qd_id, qdata->qd_type))
407                 RETURN(err);
408
409         /* reschedule another dqacq/dqrel if needed */
410         qdata->qd_count = 0;
411         rc = check_cur_qunit(obd, qctxt, qdata);
412         if (rc > 0) {
413                 int opc;
414                 opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
415                 rc = schedule_dqacq(obd, qctxt, qdata, opc, 0);
416                 QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc);
417         }
418         RETURN(err);
419 }
420
421 struct dqacq_async_args {
422         struct lustre_quota_ctxt *aa_ctxt;
423         struct lustre_qunit *aa_qunit;
424 };
425
426 static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
427 {
428         struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
429         struct lustre_quota_ctxt *qctxt = aa->aa_ctxt;
430         struct lustre_qunit *qunit = aa->aa_qunit;
431         struct obd_device *obd = req->rq_import->imp_obd;
432         struct qunit_data *qdata = NULL;
433         ENTRY;
434
435         qdata = lustre_swab_repbuf(req, 0, sizeof(*qdata), lustre_swab_qdata);
436         if (rc == 0 && qdata == NULL)
437                 RETURN(-EPROTO);
438
439         LASSERT(qdata->qd_id == qunit->lq_data.qd_id &&
440                 qdata->qd_type == qunit->lq_data.qd_type &&
441                 (qdata->qd_count == qunit->lq_data.qd_count ||
442                  qdata->qd_count == 0));
443
444         QDATA_DEBUG(qdata, "%s interpret rc(%d).\n",
445                     req->rq_reqmsg->opc == QUOTA_DQACQ ? "DQACQ" : "DQREL", rc);
446
447         rc = dqacq_completion(obd, qctxt, qdata, rc, req->rq_reqmsg->opc);
448
449         RETURN(rc);
450 }
451
452 static int got_qunit(struct qunit_waiter *waiter)
453 {
454         int rc = 0;
455         ENTRY;
456         spin_lock(&qunit_hash_lock);
457         rc = list_empty(&waiter->qw_entry);
458         spin_unlock(&qunit_hash_lock);
459         RETURN(rc);
460 }
461
462 static int
463 schedule_dqacq(struct obd_device *obd,
464                struct lustre_quota_ctxt *qctxt,
465                struct qunit_data *qdata, int opc, int wait)
466 {
467         struct lustre_qunit *qunit, *empty;
468         struct qunit_waiter qw;
469         struct l_wait_info lwi = { 0 };
470         struct ptlrpc_request *req;
471         struct qunit_data *reqdata;
472         struct dqacq_async_args *aa;
473         int size = sizeof(*reqdata);
474         int rc = 0;
475         ENTRY;
476
477         INIT_LIST_HEAD(&qw.qw_entry);
478         init_waitqueue_head(&qw.qw_waitq);
479         qw.qw_rc = 0;
480
481         if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
482                 RETURN(-ENOMEM);
483
484         spin_lock(&qunit_hash_lock);
485
486         qunit = dqacq_in_flight(qctxt, qdata);
487         if (qunit) {
488                 if (wait)
489                         list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
490                 spin_unlock(&qunit_hash_lock);
491
492                 free_qunit(empty);
493                 goto wait_completion;
494         }
495         qunit = empty;
496         insert_qunit_nolock(qctxt, qunit);
497         if (wait)
498                 list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
499         spin_unlock(&qunit_hash_lock);
500
501         LASSERT(qunit);
502
503         /* master is going to dqacq/dqrel from itself */
504         if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_type)) {
505                 int rc2;
506                 QDATA_DEBUG(qdata, "local %s.\n",
507                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
508                 rc = qctxt->lqc_handler(obd, qdata, opc);
509                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
510                 RETURN((rc && rc != -EDQUOT) ? rc : rc2);
511         }
512
513         /* build dqacq/dqrel request */
514         LASSERT(qctxt->lqc_import);
515         req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 1,
516                               &size, NULL);
517         if (!req) {
518                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
519                 RETURN(-ENOMEM);
520         }
521
522         reqdata = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*reqdata));
523         *reqdata = *qdata;
524         size = sizeof(*reqdata);
525         req->rq_replen = lustre_msg_size(1, &size);
526
527         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
528         aa = (struct dqacq_async_args *)&req->rq_async_args;
529         aa->aa_ctxt = qctxt;
530         aa->aa_qunit = qunit;
531
532         req->rq_interpret_reply = dqacq_interpret;
533         ptlrpcd_add_req(req);
534
535         QDATA_DEBUG(qdata, "%s scheduled.\n",
536                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
537 wait_completion:
538         if (wait && qunit) {
539                 struct qunit_data *p = &qunit->lq_data;
540                 QDATA_DEBUG(p, "wait for dqacq.\n");
541
542                 l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
543                 if (qw.qw_rc == 0)
544                         rc = -EAGAIN;
545
546                 CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
547         }
548         RETURN(rc);
549 }
550
551 int
552 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
553                    uid_t uid, gid_t gid, __u32 isblk, int wait)
554 {
555         int ret, rc = 0, i = USRQUOTA;
556         __u32 id[MAXQUOTAS] = { uid, gid };
557         struct qunit_data qdata[MAXQUOTAS];
558         ENTRY;
559
560         CLASSERT(MAXQUOTAS < 4);
561         if (!sb_any_quota_enabled(qctxt->lqc_sb))
562                 RETURN(0);
563
564         for (i = 0; i < MAXQUOTAS; i++) {
565                 qdata[i].qd_id = id[i];
566                 qdata[i].qd_type = i;
567                 qdata[i].qd_isblk = isblk;
568                 qdata[i].qd_count = 0;
569
570                 ret = check_cur_qunit(obd, qctxt, &qdata[i]);
571                 if (ret > 0) {
572                         int opc;
573                         /* need acquire or release */
574                         opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
575                         ret = schedule_dqacq(obd, qctxt, &qdata[i], opc, wait);
576                         if (!rc)
577                                 rc = ret;
578                 }
579         }
580
581         RETURN(rc);
582 }
583
584 int
585 qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
586                          unsigned short type, int isblk)
587 {
588         struct lustre_qunit *qunit = NULL;
589         struct qunit_waiter qw;
590         struct qunit_data qdata;
591         struct l_wait_info lwi = { 0 };
592         ENTRY;
593
594         INIT_LIST_HEAD(&qw.qw_entry);
595         init_waitqueue_head(&qw.qw_waitq);
596         qw.qw_rc = 0;
597
598         qdata.qd_id = id;
599         qdata.qd_type = type;
600         qdata.qd_isblk = isblk;
601         qdata.qd_count = 0;
602
603         spin_lock(&qunit_hash_lock);
604
605         qunit = dqacq_in_flight(qctxt, &qdata);
606         if (qunit)
607                 list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
608
609         spin_unlock(&qunit_hash_lock);
610
611         if (qunit) {
612                 struct qunit_data *p = &qdata;
613                 QDATA_DEBUG(p, "wait for dqacq completion.\n");
614                 l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
615                 QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
616         }
617         RETURN(0);
618 }
619
620 int
621 qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb,
622            dqacq_handler_t handler)
623 {
624         int rc = 0;
625         ENTRY;
626
627         rc = ptlrpcd_addref();
628         if (rc)
629                 RETURN(rc);
630
631         qctxt->lqc_handler = handler;
632         qctxt->lqc_sb = sb;
633         qctxt->lqc_import = NULL;
634         qctxt->lqc_recovery = 0;
635         qctxt->lqc_bunit_sz = default_bunit_sz;
636         qctxt->lqc_btune_sz = default_bunit_sz / 100 * default_btune_ratio;
637         qctxt->lqc_iunit_sz = default_iunit_sz;
638         qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100;
639
640         RETURN(0);
641 }
642
643 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
644 {
645         struct lustre_qunit *qunit, *tmp;
646         struct qunit_waiter *qw, *tmp2;
647         int i;
648         ENTRY;
649
650         spin_lock(&qunit_hash_lock);
651
652         for (i = 0; i < NR_DQHASH; i++) {
653                 list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
654                         if (qunit->lq_ctxt != qctxt)
655                                 continue;
656
657                         remove_qunit_nolock(qunit);
658                         /* wake up all waiters */
659                         list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters,
660                                                  qw_entry) {
661                                 list_del_init(&qw->qw_entry);
662                                 qw->qw_rc = 0;
663                                 wake_up(&qw->qw_waitq);
664                         }
665                         qunit_put(qunit);
666                 }
667         }
668
669         spin_unlock(&qunit_hash_lock);
670
671         ptlrpcd_decref();
672
673         EXIT;
674 }
675
676 struct qslave_recov_thread_data {
677         struct obd_device *obd;
678         struct lustre_quota_ctxt *qctxt;
679         struct completion comp;
680 };
681
682 /* FIXME only recovery block quota by now */
683 static int qslave_recovery_main(void *arg)
684 {
685         struct qslave_recov_thread_data *data = arg;
686         struct obd_device *obd = data->obd;
687         struct lustre_quota_ctxt *qctxt = data->qctxt;
688         unsigned int type;
689         int rc = 0;
690         ENTRY;
691
692         ptlrpc_daemonize("qslave_recovd");
693
694         complete(&data->comp);
695
696         if (qctxt->lqc_recovery)
697                 RETURN(0);
698         qctxt->lqc_recovery = 1;
699
700         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
701                 struct qunit_data qdata;
702                 struct quota_info *dqopt = sb_dqopt(qctxt->lqc_sb);
703                 struct list_head id_list;
704                 struct dquot_id *dqid, *tmp;
705                 int ret;
706
707                 down(&dqopt->dqonoff_sem);
708                 if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) {
709                         up(&dqopt->dqonoff_sem);
710                         break;
711                 }
712
713                 LASSERT(dqopt->files[type] != NULL);
714                 INIT_LIST_HEAD(&id_list);
715 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,12)
716                 rc = fsfilt_qids(obd, dqopt->files[type], NULL, type, &id_list);
717 #else
718                 rc = fsfilt_qids(obd, NULL, dqopt->files[type], type, &id_list);
719 #endif
720                 up(&dqopt->dqonoff_sem);
721                 if (rc)
722                         CERROR("Get ids from quota file failed. (rc:%d)\n", rc);
723
724                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
725                         list_del_init(&dqid->di_link);
726                         /* skip slave recovery on itself */
727                         if (is_master(obd, qctxt, dqid->di_id, type))
728                                 goto free;
729                         if (rc && rc != -EBUSY)
730                                 goto free;
731
732                         qdata.qd_id = dqid->di_id;
733                         qdata.qd_type = type;
734                         qdata.qd_isblk = 1;
735                         qdata.qd_count = 0;
736
737                         ret = check_cur_qunit(obd, qctxt, &qdata);
738                         if (ret > 0) {
739                                 int opc;
740                                 opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
741                                 rc = schedule_dqacq(obd, qctxt, &qdata, opc, 0);
742                         } else
743                                 rc = 0;
744
745                         if (rc)
746                                 CDEBUG_EX(rc == -EBUSY ? D_QUOTA : D_ERROR,
747                                        "qslave recovery failed! (id:%d type:%d "
748                                        " rc:%d)\n", dqid->di_id, type, rc);
749 free:
750                         kfree(dqid);
751                 }
752         }
753
754         qctxt->lqc_recovery = 0;
755         RETURN(rc);
756 }
757
758 void
759 qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
760 {
761         struct qslave_recov_thread_data data;
762         int rc;
763         ENTRY;
764
765         if (!sb_any_quota_enabled(qctxt->lqc_sb))
766                 goto exit;
767
768         data.obd = obd;
769         data.qctxt = qctxt;
770         init_completion(&data.comp);
771
772         rc = kernel_thread(qslave_recovery_main, &data, CLONE_VM|CLONE_FILES);
773         if (rc < 0) {
774                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
775                 goto exit;
776         }
777         wait_for_completion(&data.comp);
778 exit:
779         EXIT;
780 }
781