Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / quota / quota_context.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/quota/quota_context.c
37  *
38  * Lustre Quota Context
39  *
40  * Author: Niu YaWei <niu@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_LQUOTA
48
49 #include <linux/version.h>
50 #include <linux/fs.h>
51 #include <asm/unistd.h>
52 #include <linux/slab.h>
53 #include <linux/quotaops.h>
54 #include <linux/module.h>
55 #include <linux/init.h>
56
57 #include <obd_class.h>
58 #include <lustre_quota.h>
59 #include <lustre_fsfilt.h>
60 #include <class_hash.h>
61 #include <lprocfs_status.h>
62 #include "quota_internal.h"
63
64 #ifdef HAVE_QUOTA_SUPPORT
65
66 static lustre_hash_ops_t lqs_hash_ops;
67
68 unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */
69 unsigned long default_btune_ratio = 50;             /* 50 percentage */
70 unsigned long default_iunit_sz = 5120;              /* 5120 inodes */
71 unsigned long default_itune_ratio = 50;             /* 50 percentage */
72
73 cfs_mem_cache_t *qunit_cachep = NULL;
74 struct list_head qunit_hash[NR_DQHASH];
75 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
76
77 /* please sync qunit_state with qunit_state_names */
78 enum qunit_state {
79         QUNIT_CREATED      = 0,   /* a qunit is created */
80         QUNIT_IN_HASH      = 1,   /* a qunit is added into qunit hash, that means
81                                    * a quota req will be sent or is flying */
82         QUNIT_RM_FROM_HASH = 2,   /* a qunit is removed from qunit hash, that
83                                    * means a quota req is handled and comes
84                                    * back */
85         QUNIT_FINISHED     = 3,   /* qunit can wake up all threads waiting
86                                    * for it */
87 };
88
89 static const char *qunit_state_names[] = {
90         [QUNIT_CREATED]      = "CREATED",
91         [QUNIT_IN_HASH]      = "IN_HASH",
92         [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH",
93         [QUNIT_FINISHED]     = "FINISHED",
94 };
95
96 struct lustre_qunit {
97         struct list_head lq_hash;          /* Hash list in memory */
98         atomic_t lq_refcnt;                /* Use count */
99         struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */
100         struct qunit_data lq_data;         /* See qunit_data */
101         unsigned int lq_opc;               /* QUOTA_DQACQ, QUOTA_DQREL */
102         cfs_waitq_t lq_waitq;              /* Threads waiting for this qunit */
103         spinlock_t lq_lock;                /* Protect the whole structure */
104         enum qunit_state lq_state;         /* Present the status of qunit */
105         int lq_rc;                         /* The rc of lq_data */
106 };
107
108 #define QUNIT_SET_STATE(qunit, state)                                   \
109 do {                                                                    \
110         spin_lock(&qunit->lq_lock);                                     \
111         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
112                     "lq_rc(%d)\n",                                      \
113                     qunit, qunit_state_names[qunit->lq_state],          \
114                     qunit_state_names[state], qunit->lq_rc);            \
115         qunit->lq_state = state;                                        \
116         spin_unlock(&qunit->lq_lock);                                   \
117 } while(0)
118
119 #define QUNIT_SET_STATE_AND_RC(qunit, state, rc)                        \
120 do {                                                                    \
121         spin_lock(&qunit->lq_lock);                                     \
122         qunit->lq_rc = rc;                                              \
123         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
124                     "lq_rc(%d)\n",                                      \
125                     qunit, qunit_state_names[qunit->lq_state],          \
126                     qunit_state_names[state], qunit->lq_rc);            \
127         qunit->lq_state = state;                                        \
128         spin_unlock(&qunit->lq_lock);                                   \
129 } while(0)
130
131
132 int should_translate_quota (struct obd_import *imp)
133 {
134         ENTRY;
135
136         LASSERT(imp);
137         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64)
138                 RETURN(0);
139         else
140                 RETURN(1);
141 }
142
143 void qunit_cache_cleanup(void)
144 {
145         int i;
146         ENTRY;
147
148         spin_lock(&qunit_hash_lock);
149         for (i = 0; i < NR_DQHASH; i++)
150                 LASSERT(list_empty(qunit_hash + i));
151         spin_unlock(&qunit_hash_lock);
152
153         if (qunit_cachep) {
154                 int rc;
155                 rc = cfs_mem_cache_destroy(qunit_cachep);
156                 LASSERTF(rc == 0, "couldn't destory qunit_cache slab\n");
157                 qunit_cachep = NULL;
158         }
159         EXIT;
160 }
161
162 int qunit_cache_init(void)
163 {
164         int i;
165         ENTRY;
166
167         LASSERT(qunit_cachep == NULL);
168         qunit_cachep = cfs_mem_cache_create("ll_qunit_cache",
169                                             sizeof(struct lustre_qunit),
170                                             0, 0);
171         if (!qunit_cachep)
172                 RETURN(-ENOMEM);
173
174         spin_lock(&qunit_hash_lock);
175         for (i = 0; i < NR_DQHASH; i++)
176                 INIT_LIST_HEAD(qunit_hash + i);
177         spin_unlock(&qunit_hash_lock);
178         RETURN(0);
179 }
180
181 static inline int
182 qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
183              __attribute__((__const__));
184
185 static inline int
186 qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
187 {
188         unsigned int id = qdata->qd_id;
189         unsigned int type = QDATA_IS_GRP(qdata);
190
191         unsigned long tmp = ((unsigned long)qctxt >> L1_CACHE_SHIFT) ^ id;
192         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
193         return tmp;
194 }
195
196 /* caller must hold qunit_hash_lock */
197 static inline struct lustre_qunit *find_qunit(unsigned int hashent,
198                                               struct lustre_quota_ctxt *qctxt,
199                                               struct qunit_data *qdata)
200 {
201         struct lustre_qunit *qunit = NULL;
202         struct qunit_data *tmp;
203
204         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
205         list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) {
206                 tmp = &qunit->lq_data;
207                 if (qunit->lq_ctxt == qctxt &&
208                     qdata->qd_id == tmp->qd_id &&
209                     (qdata->qd_flags & LQUOTA_QUNIT_FLAGS) ==
210                     (tmp->qd_flags & LQUOTA_QUNIT_FLAGS))
211                         return qunit;
212         }
213         return NULL;
214 }
215
216 /* check_cur_qunit - check the current usage of qunit.
217  * @qctxt: quota context
218  * @qdata: the type of quota unit to be checked
219  *
220  * return: 1 - need acquire qunit;
221  *         2 - need release qunit;
222  *         0 - need do nothing.
223  *       < 0 - error.
224  */
225 static int
226 check_cur_qunit(struct obd_device *obd,
227                 struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata)
228 {
229         struct super_block *sb = qctxt->lqc_sb;
230         unsigned long qunit_sz, tune_sz;
231         __u64 usage, limit, limit_org, pending_write = 0;
232         long long record = 0;
233         struct obd_quotactl *qctl;
234         struct lustre_qunit_size *lqs = NULL;
235         int ret = 0;
236         ENTRY;
237
238         if (!ll_sb_any_quota_active(sb))
239                 RETURN(0);
240
241         spin_lock(&qctxt->lqc_lock);
242         if (!qctxt->lqc_valid){
243                 spin_unlock(&qctxt->lqc_lock);
244                 RETURN(0);
245         }
246         spin_unlock(&qctxt->lqc_lock);
247
248         OBD_ALLOC_PTR(qctl);
249         if (qctl == NULL)
250                 RETURN(-ENOMEM);
251
252         /* get fs quota usage & limit */
253         qctl->qc_cmd = Q_GETQUOTA;
254         qctl->qc_id = qdata->qd_id;
255         qctl->qc_type = QDATA_IS_GRP(qdata);
256         ret = fsfilt_quotactl(obd, sb, qctl);
257         if (ret) {
258                 if (ret == -ESRCH)      /* no limit */
259                         ret = 0;
260                 else
261                         CERROR("can't get fs quota usage! (rc:%d)\n", ret);
262                 GOTO(out, ret);
263         }
264
265         if (QDATA_IS_BLK(qdata)) {
266                 usage = qctl->qc_dqblk.dqb_curspace;
267                 limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS;
268         } else {
269                 usage = qctl->qc_dqblk.dqb_curinodes;
270                 limit = qctl->qc_dqblk.dqb_ihardlimit;
271         }
272
273         /* ignore the no quota limit case; and it can avoid creating
274          * unnecessary lqs for uid/gid */
275         if (!limit)
276                 GOTO(out, ret = 0);
277
278         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
279                                qctxt, 0);
280         if (IS_ERR(lqs) || lqs == NULL) {
281                 CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
282                        QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
283                 GOTO (out, ret = 0);
284         }
285         spin_lock(&lqs->lqs_lock);
286
287         if (QDATA_IS_BLK(qdata)) {
288                 qunit_sz = lqs->lqs_bunit_sz;
289                 tune_sz  = lqs->lqs_btune_sz;
290                 pending_write = lqs->lqs_bwrite_pending;
291                 record   = lqs->lqs_blk_rec;
292                 LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
293         } else {
294                 /* we didn't need change inode qunit size now */
295                 qunit_sz = lqs->lqs_iunit_sz;
296                 tune_sz  = lqs->lqs_itune_sz;
297                 pending_write = lqs->lqs_iwrite_pending;
298                 record   = lqs->lqs_ino_rec;
299         }
300
301         /* we don't count the MIN_QLIMIT */
302         if ((limit == MIN_QLIMIT && !QDATA_IS_BLK(qdata)) ||
303             (toqb(limit) == MIN_QLIMIT && QDATA_IS_BLK(qdata)))
304                 limit = 0;
305
306         usage += pending_write;
307         limit_org = limit;
308         /* when a releasing quota req is sent, before it returned
309            limit is assigned a small value. limit will overflow */
310         if (limit + record < 0)
311                 usage -= record;
312         else
313                 limit += record;
314
315         LASSERT(qdata->qd_count == 0);
316         if (limit <= usage + tune_sz) {
317                 while (qdata->qd_count + limit <=
318                        usage + tune_sz)
319                         qdata->qd_count += qunit_sz;
320                 ret = 1;
321         } else if (limit > usage + qunit_sz + tune_sz &&
322                    limit_org > qdata->qd_count + qunit_sz) {
323                 while (limit - qdata->qd_count > usage + qunit_sz + tune_sz &&
324                        limit_org > qdata->qd_count + qunit_sz)
325                         qdata->qd_count += qunit_sz;
326                 ret = 2;
327                 /* if there are other pending writes for this uid/gid, releasing
328                  * quota is put off until the last pending write b=16645 */
329                 if (ret == 2 && pending_write) {
330                         CDEBUG(D_QUOTA, "delay quota release\n");
331                         ret = 0;
332                 }
333         }
334         CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
335                ", pending_write: "LPU64", record: "LPD64
336                ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
337                QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write,
338                (__s64)record, qunit_sz, tune_sz, ret);
339         LASSERT(ret == 0 || qdata->qd_count);
340
341         spin_unlock(&lqs->lqs_lock);
342         lqs_putref(lqs);
343
344         EXIT;
345  out:
346         OBD_FREE_PTR(qctl);
347         return ret;
348 }
349
350 /* compute the remaining quota for certain gid or uid b=11693 */
351 int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
352                      struct qunit_data *qdata, int isblk)
353 {
354         struct super_block *sb = qctxt->lqc_sb;
355         __u64 usage, limit;
356         struct obd_quotactl *qctl;
357         int ret = QUOTA_RET_OK;
358         ENTRY;
359
360         if (!ll_sb_any_quota_active(sb))
361                 RETURN(QUOTA_RET_NOQUOTA);
362
363         /* ignore root user */
364         if (qdata->qd_id == 0 && QDATA_IS_GRP(qdata) == USRQUOTA)
365                 RETURN(QUOTA_RET_NOLIMIT);
366
367         OBD_ALLOC_PTR(qctl);
368         if (qctl == NULL)
369                 RETURN(-ENOMEM);
370
371         /* get fs quota usage & limit */
372         qctl->qc_cmd = Q_GETQUOTA;
373         qctl->qc_id = qdata->qd_id;
374         qctl->qc_type = QDATA_IS_GRP(qdata);
375         ret = fsfilt_quotactl(obd, sb, qctl);
376         if (ret) {
377                 if (ret == -ESRCH)      /* no limit */
378                         ret = QUOTA_RET_NOLIMIT;
379                 else
380                         CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)",
381                                ret);
382                 GOTO(out, ret);
383         }
384
385         usage = isblk ? qctl->qc_dqblk.dqb_curspace :
386                 qctl->qc_dqblk.dqb_curinodes;
387         limit = isblk ? qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS :
388                 qctl->qc_dqblk.dqb_ihardlimit;
389         if (!limit){            /* no limit */
390                 ret = QUOTA_RET_NOLIMIT;
391                 GOTO(out, ret);
392         }
393
394         if (limit >= usage)
395                 qdata->qd_count = limit - usage;
396         else
397                 qdata->qd_count = 0;
398         EXIT;
399 out:
400         OBD_FREE_PTR(qctl);
401         return ret;
402 }
403
404 static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
405                                         struct qunit_data *qdata, int opc)
406 {
407         struct lustre_qunit *qunit = NULL;
408         ENTRY;
409
410         OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
411         if (qunit == NULL)
412                 RETURN(NULL);
413
414         INIT_LIST_HEAD(&qunit->lq_hash);
415         init_waitqueue_head(&qunit->lq_waitq);
416         atomic_set(&qunit->lq_refcnt, 1);
417         qunit->lq_ctxt = qctxt;
418         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
419         qunit->lq_opc = opc;
420         qunit->lq_lock = SPIN_LOCK_UNLOCKED;
421         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
422         RETURN(qunit);
423 }
424
425 static inline void free_qunit(struct lustre_qunit *qunit)
426 {
427         OBD_SLAB_FREE(qunit, qunit_cachep, sizeof(*qunit));
428 }
429
430 static inline void qunit_get(struct lustre_qunit *qunit)
431 {
432         atomic_inc(&qunit->lq_refcnt);
433 }
434
435 static void qunit_put(struct lustre_qunit *qunit)
436 {
437         LASSERT(atomic_read(&qunit->lq_refcnt));
438         if (atomic_dec_and_test(&qunit->lq_refcnt))
439                 free_qunit(qunit);
440 }
441
442 /* caller must hold qunit_hash_lock and release ref of qunit after using it */
443 static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
444                                             struct qunit_data *qdata)
445 {
446         unsigned int hashent = qunit_hashfn(qctxt, qdata);
447         struct lustre_qunit *qunit;
448         ENTRY;
449
450         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
451         qunit = find_qunit(hashent, qctxt, qdata);
452         if (qunit)
453                 qunit_get(qunit);
454         RETURN(qunit);
455 }
456
457 static void
458 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
459 {
460         struct list_head *head;
461
462         LASSERT(list_empty(&qunit->lq_hash));
463         qunit_get(qunit);
464         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
465         list_add(&qunit->lq_hash, head);
466         QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
467 }
468
469 static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
470 {
471         struct lustre_qunit_size *lqs;
472
473         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
474                                        qunit->lq_data.qd_id),
475                                qunit->lq_ctxt, 0);
476         if (lqs && !IS_ERR(lqs)) {
477                 spin_lock(&lqs->lqs_lock);
478                 if (qunit->lq_opc == QUOTA_DQACQ)
479                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
480                 if (qunit->lq_opc == QUOTA_DQREL)
481                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 0);
482                 spin_unlock(&lqs->lqs_lock);
483                 /* this is for quota_search_lqs */
484                 lqs_putref(lqs);
485                 /* this is for schedule_dqacq */
486                 lqs_putref(lqs);
487         }
488 }
489
490 static void remove_qunit_nolock(struct lustre_qunit *qunit)
491 {
492         LASSERT(!list_empty(&qunit->lq_hash));
493         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
494
495         list_del_init(&qunit->lq_hash);
496         QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
497         qunit_put(qunit);
498 }
499
500 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
501                                  (limit = count) : (limit += count)
502
503
504 /* FIXME check if this mds is the master of specified id */
505 static int
506 is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
507           unsigned int id, int type)
508 {
509         return qctxt->lqc_handler ? 1 : 0;
510 }
511
512 static int
513 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
514                struct qunit_data *qdata, int opc, int wait,
515                struct obd_trans_info *oti);
516
517 static inline void qdata_to_oqaq(struct qunit_data *qdata,
518                                  struct quota_adjust_qunit *oqaq)
519 {
520         LASSERT(qdata);
521         LASSERT(oqaq);
522
523         oqaq->qaq_flags = qdata->qd_flags;
524         oqaq->qaq_id    = qdata->qd_id;
525         if (QDATA_IS_ADJBLK(qdata))
526                 oqaq->qaq_bunit_sz = qdata->qd_qunit;
527         if (QDATA_IS_ADJINO(qdata))
528                 oqaq->qaq_iunit_sz = qdata->qd_qunit;
529 }
530
531 static int
532 dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
533                  struct qunit_data *qdata, int rc, int opc)
534 {
535         struct lustre_qunit *qunit = NULL;
536         struct super_block *sb = qctxt->lqc_sb;
537         int err = 0;
538         struct quota_adjust_qunit *oqaq = NULL;
539         int rc1 = 0;
540         ENTRY;
541
542         LASSERT(qdata);
543         QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
544                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
545
546         /* do it only when a releasing quota req more than 5MB b=18491 */
547         if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
548                 OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
549
550         /* update local operational quota file */
551         if (rc == 0) {
552                 __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
553                 struct obd_quotactl *qctl;
554                 __u64 *hardlimit;
555
556                 OBD_ALLOC_PTR(qctl);
557                 if (qctl == NULL)
558                         GOTO(out, err = -ENOMEM);
559
560                 /* acq/rel qunit for specified uid/gid is serialized,
561                  * so there is no race between get fs quota limit and
562                  * set fs quota limit */
563                 qctl->qc_cmd = Q_GETQUOTA;
564                 qctl->qc_id = qdata->qd_id;
565                 qctl->qc_type = QDATA_IS_GRP(qdata);
566                 err = fsfilt_quotactl(obd, sb, qctl);
567                 if (err) {
568                         CERROR("error get quota fs limit! (rc:%d)\n", err);
569                         GOTO(out_mem, err);
570                 }
571
572                 if (QDATA_IS_BLK(qdata)) {
573                         qctl->qc_dqblk.dqb_valid = QIF_BLIMITS;
574                         hardlimit = &qctl->qc_dqblk.dqb_bhardlimit;
575                 } else {
576                         qctl->qc_dqblk.dqb_valid = QIF_ILIMITS;
577                         hardlimit = &qctl->qc_dqblk.dqb_ihardlimit;
578                 }
579
580                 CDEBUG(D_QUOTA, "hardlimt: "LPU64"\n", *hardlimit);
581
582                 if (*hardlimit == 0)
583                         goto out_mem;
584
585                 switch (opc) {
586                 case QUOTA_DQACQ:
587                         INC_QLIMIT(*hardlimit, count);
588                         break;
589                 case QUOTA_DQREL:
590                         LASSERTF(count < *hardlimit,
591                                  "id(%u) flag(%u) type(%c) isblk(%c) "
592                                  "count("LPU64") qd_qunit("LPU64") "
593                                  "hardlimit("LPU64").\n",
594                                  qdata->qd_id, qdata->qd_flags,
595                                  QDATA_IS_GRP(qdata) ? 'g' : 'u',
596                                  QDATA_IS_BLK(qdata) ? 'b': 'i',
597                                  qdata->qd_count, qdata->qd_qunit, *hardlimit);
598                         *hardlimit -= count;
599                         break;
600                 default:
601                         LBUG();
602                 }
603
604                 /* clear quota limit */
605                 if (count == 0)
606                         *hardlimit = 0;
607
608                 qctl->qc_cmd = Q_SETQUOTA;
609                 err = fsfilt_quotactl(obd, sb, qctl);
610                 if (err)
611                         CERROR("error set quota fs limit! (rc:%d)\n", err);
612
613                 QDATA_DEBUG(qdata, "%s completion\n",
614                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
615 out_mem:
616                 OBD_FREE_PTR(qctl);
617         } else if (rc == -EDQUOT) {
618                 QDATA_DEBUG(qdata, "acquire qunit got EDQUOT.\n");
619         } else if (rc == -EBUSY) {
620                 QDATA_DEBUG(qdata, "it's is recovering, got EBUSY.\n");
621         } else {
622                 CERROR("acquire qunit got error! (rc:%d)\n", rc);
623         }
624 out:
625         /* remove the qunit from hash */
626         spin_lock(&qunit_hash_lock);
627
628         qunit = dqacq_in_flight(qctxt, qdata);
629         /* this qunit has been removed by qctxt_cleanup() */
630         if (!qunit) {
631                 spin_unlock(&qunit_hash_lock);
632                 QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n",
633                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
634                 RETURN(err);
635         }
636
637         LASSERT(opc == qunit->lq_opc);
638         /* remove this qunit from lq_hash so that new processes cannot be added
639          * to qunit->lq_waiters */
640         remove_qunit_nolock(qunit);
641         spin_unlock(&qunit_hash_lock);
642
643         compute_lqs_after_removing_qunit(qunit);
644
645
646         if (rc == 0)
647                 rc = QUOTA_REQ_RETURNED;
648         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
649         /* wake up all waiters */
650         wake_up(&qunit->lq_waitq);
651
652         /* this is for dqacq_in_flight() */
653         qunit_put(qunit);
654         /* this is for alloc_qunit() */
655         qunit_put(qunit);
656         if (rc < 0 && rc != -EDQUOT)
657                  RETURN(err);
658
659         /* don't reschedule in such cases:
660          *   - acq/rel failure and qunit isn't changed,
661          *     but not for quota recovery.
662          *   - local dqacq/dqrel.
663          *   - local disk io failure.
664          */
665          OBD_ALLOC_PTR(oqaq);
666          if (!oqaq)
667                  RETURN(-ENOMEM);
668          qdata_to_oqaq(qdata, oqaq);
669          /* adjust the qunit size in slaves */
670          rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
671          OBD_FREE_PTR(oqaq);
672          if (rc1 < 0) {
673                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
674                  RETURN(rc1);
675          }
676          if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) ||
677              is_master(obd, qctxt, qdata->qd_id, QDATA_IS_GRP(qdata)))
678                 RETURN(err);
679
680          if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
681                  OBD_FAIL_RETURN(OBD_FAIL_QUOTA_DELAY_REL, err);
682
683         /* reschedule another dqacq/dqrel if needed */
684         qdata->qd_count = 0;
685         qdata->qd_flags &= LQUOTA_QUNIT_FLAGS;
686         rc1 = check_cur_qunit(obd, qctxt, qdata);
687         if (rc1 > 0) {
688                 int opc;
689                 opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
690                 rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL);
691                 QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
692         }
693         RETURN(err);
694 }
695
696 struct dqacq_async_args {
697         struct lustre_quota_ctxt *aa_ctxt;
698         struct lustre_qunit *aa_qunit;
699 };
700
701 static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
702 {
703         struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
704         struct lustre_quota_ctxt *qctxt = aa->aa_ctxt;
705         struct lustre_qunit *qunit = aa->aa_qunit;
706         struct obd_device *obd = req->rq_import->imp_obd;
707         struct qunit_data *qdata = NULL;
708         int rc1 = 0;
709         ENTRY;
710
711         LASSERT(req);
712         LASSERT(req->rq_import);
713
714         /* there are several forms of qunit(historic causes), so we need to
715          * adjust qunit from slaves to the same form here */
716         OBD_ALLOC(qdata, sizeof(struct qunit_data));
717         if (!qdata)
718                 RETURN(-ENOMEM);
719
720         /* if a quota req timeouts or is dropped, we should update quota
721          * statistics which will be handled in dqacq_completion. And in
722          * this situation we should get qdata from request instead of
723          * reply */
724         rc1 = quota_get_qdata(req, qdata,
725                               (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
726                               QUOTA_IMPORT);
727         if (rc1 < 0) {
728                 DEBUG_REQ(D_ERROR, req,
729                           "error unpacking qunit_data(rc: %d)\n", rc1);
730                 GOTO(exit, rc = rc1);
731         }
732
733         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
734         QDATA_DEBUG((&qunit->lq_data), "lq_data: \n");
735
736         if (qdata->qd_id != qunit->lq_data.qd_id ||
737             OBD_FAIL_CHECK_ONCE(OBD_FAIL_QUOTA_RET_QDATA)) {
738                 CDEBUG(D_ERROR, "the returned qd_id isn't expected!"
739                        "(qdata: %u, lq_data: %u)\n", qdata->qd_id,
740                        qunit->lq_data.qd_id);
741                 qdata->qd_id = qunit->lq_data.qd_id;
742                 rc = -EPROTO;
743         }
744         if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) {
745                 CDEBUG(D_ERROR, "the returned grp/usr isn't expected!"
746                        "(qdata: %u, lq_data: %u)\n", qdata->qd_flags,
747                        qunit->lq_data.qd_flags);
748                 if (QDATA_IS_GRP(&qunit->lq_data))
749                         QDATA_SET_GRP(qdata);
750                 else
751                         QDATA_CLR_GRP(qdata);
752                 rc = -EPROTO;
753         }
754         if (qdata->qd_count > qunit->lq_data.qd_count) {
755                 CDEBUG(D_ERROR, "the returned qd_count isn't expected!"
756                        "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count,
757                        qunit->lq_data.qd_count);
758                 rc = -EPROTO;
759         }
760
761         rc = dqacq_completion(obd, qctxt, qdata, rc,
762                               lustre_msg_get_opc(req->rq_reqmsg));
763
764 exit:
765         OBD_FREE(qdata, sizeof(struct qunit_data));
766
767         RETURN(rc);
768 }
769
770 /* check if quota master is online */
771 int check_qm(struct lustre_quota_ctxt *qctxt)
772 {
773         int rc;
774         ENTRY;
775
776         spin_lock(&qctxt->lqc_lock);
777         /* quit waiting when mds is back or qctxt is cleaned up */
778         rc = qctxt->lqc_import || !qctxt->lqc_valid;
779         spin_unlock(&qctxt->lqc_lock);
780
781         RETURN(rc);
782 }
783
784 /* wake up all waiting threads when lqc_import is NULL */
785 void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
786 {
787         struct lustre_qunit *qunit, *tmp;
788         int i;
789         ENTRY;
790
791         spin_lock(&qunit_hash_lock);
792         for (i = 0; i < NR_DQHASH; i++) {
793                 list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
794                         if (qunit->lq_ctxt != qctxt)
795                                 continue;
796
797                         /* Wake up all waiters. Do not change lq_state.
798                          * The waiters will check lq_rc which is kept as 0
799                          * if no others change it, then the waiters will return
800                          * -EAGAIN to caller who can perform related quota
801                          * acq/rel if necessary. */
802                         wake_up_all(&qunit->lq_waitq);
803                 }
804         }
805         spin_unlock(&qunit_hash_lock);
806         EXIT;
807 }
808
809 static int got_qunit(struct lustre_qunit *qunit)
810 {
811         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
812         int rc = 0;
813         ENTRY;
814
815         spin_lock(&qunit->lq_lock);
816         switch (qunit->lq_state) {
817         case QUNIT_IN_HASH:
818         case QUNIT_RM_FROM_HASH:
819                 break;
820         case QUNIT_FINISHED:
821                 rc = 1;
822                 break;
823         default:
824                 CERROR("invalid qunit state %d\n", qunit->lq_state);
825         }
826         spin_unlock(&qunit->lq_lock);
827
828         if (!rc) {
829                 spin_lock(&qctxt->lqc_lock);
830                 rc = !qctxt->lqc_import || !qctxt->lqc_valid;
831                 spin_unlock(&qctxt->lqc_lock);
832         }
833
834         RETURN(rc);
835 }
836
837 static int
838 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
839                struct qunit_data *qdata, int opc, int wait,
840                struct obd_trans_info *oti)
841 {
842         struct lustre_qunit *qunit, *empty;
843         struct l_wait_info lwi = { 0 };
844         struct ptlrpc_request *req;
845         struct dqacq_async_args *aa;
846         int size[2] = { sizeof(struct ptlrpc_body), 0 };
847         struct obd_import *imp = NULL;
848         struct lustre_qunit_size *lqs = NULL;
849         struct timeval work_start;
850         struct timeval work_end;
851         long timediff;
852         int rc = 0;
853         ENTRY;
854
855         LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
856         do_gettimeofday(&work_start);
857         if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
858                 RETURN(-ENOMEM);
859
860         spin_lock(&qunit_hash_lock);
861         qunit = dqacq_in_flight(qctxt, qdata);
862         if (qunit) {
863                 spin_unlock(&qunit_hash_lock);
864                 qunit_put(empty);
865
866                 goto wait_completion;
867         }
868         qunit = empty;
869         qunit_get(qunit);
870         insert_qunit_nolock(qctxt, qunit);
871         spin_unlock(&qunit_hash_lock);
872
873         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
874                                qctxt, 0);
875         if (lqs && !IS_ERR(lqs)) {
876                 spin_lock(&lqs->lqs_lock);
877                 quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
878                 /* when this qdata returned from mds, it will call lqs_putref */
879                 lqs_getref(lqs);
880                 spin_unlock(&lqs->lqs_lock);
881                 /* this is for quota_search_lqs */
882                 lqs_putref(lqs);
883         } else {
884                 CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
885         }
886
887         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
888                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
889         /* master is going to dqacq/dqrel from itself */
890         if (is_master(obd, qctxt, qdata->qd_id, QDATA_IS_GRP(qdata))) {
891                 int rc2;
892                 QDATA_DEBUG(qdata, "local %s.\n",
893                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
894                 QDATA_SET_CHANGE_QS(qdata);
895                 rc = qctxt->lqc_handler(obd, qdata, opc);
896                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
897                 /* this is for qunit_get() */
898                 qunit_put(qunit);
899
900                 do_gettimeofday(&work_end);
901                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
902                 if (opc == QUOTA_DQACQ)
903                         lprocfs_counter_add(qctxt->lqc_stats,
904                                             wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ,
905                                             timediff);
906                 else
907                         lprocfs_counter_add(qctxt->lqc_stats,
908                                             wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL,
909                                             timediff);
910                 RETURN(rc ? rc : rc2);
911         }
912
913         spin_lock(&qctxt->lqc_lock);
914         if (!qctxt->lqc_import) {
915                 spin_unlock(&qctxt->lqc_lock);
916                 QDATA_DEBUG(qdata, "lqc_import is invalid.\n");
917
918                 spin_lock(&qunit_hash_lock);
919                 remove_qunit_nolock(qunit);
920                 spin_unlock(&qunit_hash_lock);
921
922                 compute_lqs_after_removing_qunit(qunit);
923
924                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
925                 wake_up(&qunit->lq_waitq);
926
927                 /* this is for qunit_get() */
928                 qunit_put(qunit);
929                 /* this for alloc_qunit() */
930                 qunit_put(qunit);
931                 spin_lock(&qctxt->lqc_lock);
932                 if (wait && !qctxt->lqc_import) {
933                         spin_unlock(&qctxt->lqc_lock);
934
935                         LASSERT(oti && oti->oti_thread &&
936                                 oti->oti_thread->t_watchdog);
937
938                         lc_watchdog_disable(oti->oti_thread->t_watchdog);
939                         CDEBUG(D_QUOTA, "sleep for quota master\n");
940                         l_wait_event(qctxt->lqc_wait_for_qmaster,
941                                      check_qm(qctxt), &lwi);
942                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
943                         lc_watchdog_touch(oti->oti_thread->t_watchdog,
944                                  GET_TIMEOUT(oti->oti_thread->t_svc));
945                 } else {
946                         spin_unlock(&qctxt->lqc_lock);
947                 }
948
949                 RETURN(-EAGAIN);
950         }
951         imp = class_import_get(qctxt->lqc_import);
952         spin_unlock(&qctxt->lqc_lock);
953
954         /* build dqacq/dqrel request */
955         LASSERT(imp);
956         size[1] = quota_get_qunit_data_size(imp->
957                                             imp_connect_data.ocd_connect_flags);
958
959         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, opc, 2,
960                               size, NULL);
961         if (!req) {
962                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
963                 class_import_put(imp);
964                 /* this is for qunit_get() */
965                 qunit_put(qunit);
966                 RETURN(-ENOMEM);
967         }
968
969         rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
970         if (rc < 0) {
971                 CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
972                 dqacq_completion(obd, qctxt, qdata, rc, opc);
973                 class_import_put(imp);
974                 /* this is for qunit_get() */
975                 qunit_put(qunit);
976                 RETURN(rc);
977         }
978         ptlrpc_req_set_repsize(req, 2, size);
979         req->rq_no_resend = req->rq_no_delay = 1;
980         class_import_put(imp);
981
982         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
983         aa = ptlrpc_req_async_args(req);
984         aa->aa_ctxt = qctxt;
985         aa->aa_qunit = qunit;
986
987         req->rq_interpret_reply = dqacq_interpret;
988         ptlrpcd_add_req(req);
989
990         QDATA_DEBUG(qdata, "%s scheduled.\n",
991                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
992 wait_completion:
993         if (wait && qunit) {
994                 struct qunit_data *p = &qunit->lq_data;
995
996                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
997                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
998                 /* rc = -EAGAIN, it means the quota master isn't ready yet
999                  * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
1000                  * rc = -EDQUOT, it means out of quota
1001                  * rc = -EBUSY, it means recovery is happening
1002                  * other rc < 0, it means real errors, functions who call
1003                  * schedule_dqacq should take care of this */
1004                 spin_lock(&qunit->lq_lock);
1005                 rc = qunit->lq_rc;
1006                 spin_unlock(&qunit->lq_lock);
1007                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
1008                        qunit, rc);
1009         }
1010
1011         qunit_put(qunit);
1012         do_gettimeofday(&work_end);
1013         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
1014         if (opc == QUOTA_DQACQ)
1015                 lprocfs_counter_add(qctxt->lqc_stats,
1016                                     wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ,
1017                                     timediff);
1018         else
1019                 lprocfs_counter_add(qctxt->lqc_stats,
1020                                     wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL,
1021                                     timediff);
1022
1023         RETURN(rc);
1024 }
1025
1026 int
1027 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
1028                    uid_t uid, gid_t gid, __u32 isblk, int wait,
1029                    struct obd_trans_info *oti)
1030 {
1031         int rc = 0, i = USRQUOTA;
1032         __u32 id[MAXQUOTAS] = { uid, gid };
1033         struct qunit_data qdata[MAXQUOTAS];
1034         ENTRY;
1035
1036         if (quota_is_set(obd, uid, gid, isblk ? QB_SET : QI_SET) == 0)
1037                 RETURN(0);
1038
1039         for (i = 0; i < MAXQUOTAS; i++) {
1040                 qdata[i].qd_id = id[i];
1041                 qdata[i].qd_flags = i;
1042                 if (isblk)
1043                         QDATA_SET_BLK(&qdata[i]);
1044                 qdata[i].qd_count = 0;
1045
1046                 rc = check_cur_qunit(obd, qctxt, &qdata[i]);
1047                 if (rc > 0) {
1048                         int opc;
1049                         /* need acquire or release */
1050                         opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
1051                         rc = schedule_dqacq(obd, qctxt, &qdata[i], opc,
1052                                             wait,oti);
1053                         if (rc < 0)
1054                                 RETURN(rc);
1055                 } else if (wait == 1) {
1056                         /* when wait equates 1, that means mds_quota_acquire
1057                          * or filter_quota_acquire is calling it. */
1058                         rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
1059                         if (rc < 0)
1060                                 RETURN(rc);
1061                 }
1062         }
1063
1064         RETURN(rc);
1065 }
1066
1067 int
1068 qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
1069                          unsigned short type, int isblk)
1070 {
1071         struct lustre_qunit *qunit = NULL;
1072         struct qunit_data qdata;
1073         struct timeval work_start;
1074         struct timeval work_end;
1075         long timediff;
1076         struct l_wait_info lwi = { 0 };
1077         int rc = 0;
1078         ENTRY;
1079
1080         do_gettimeofday(&work_start);
1081         qdata.qd_id = id;
1082         qdata.qd_flags = type;
1083         if (isblk)
1084                 QDATA_SET_BLK(&qdata);
1085         qdata.qd_count = 0;
1086
1087         spin_lock(&qunit_hash_lock);
1088         qunit = dqacq_in_flight(qctxt, &qdata);
1089         spin_unlock(&qunit_hash_lock);
1090
1091         if (qunit) {
1092                 struct qunit_data *p = &qunit->lq_data;
1093
1094                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
1095                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
1096                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
1097                        qunit, qunit->lq_rc);
1098                 /* keep same as schedule_dqacq() b=17030 */
1099                 spin_lock(&qunit->lq_lock);
1100                 rc = qunit->lq_rc;
1101                 spin_unlock(&qunit->lq_lock);
1102                 /* this is for dqacq_in_flight() */
1103                 qunit_put(qunit);
1104                 do_gettimeofday(&work_end);
1105                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
1106                 lprocfs_counter_add(qctxt->lqc_stats,
1107                                     isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA :
1108                                             LQUOTA_WAIT_PENDING_INO_QUOTA,
1109                                     timediff);
1110         } else {
1111                 do_gettimeofday(&work_end);
1112                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
1113                 lprocfs_counter_add(qctxt->lqc_stats,
1114                                     isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA :
1115                                             LQUOTA_NOWAIT_PENDING_INO_QUOTA,
1116                                     timediff);
1117         }
1118
1119         RETURN(rc);
1120 }
1121
1122 int
1123 qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
1124 {
1125         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
1126         struct super_block *sb = obd->u.obt.obt_sb;
1127         int rc = 0;
1128         ENTRY;
1129
1130         LASSERT(qctxt);
1131
1132         rc = ptlrpcd_addref();
1133         if (rc)
1134                 RETURN(rc);
1135
1136         cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
1137         cfs_waitq_init(&qctxt->lqc_lqs_waitq);
1138         atomic_set(&qctxt->lqc_lqs, 0);
1139         spin_lock_init(&qctxt->lqc_lock);
1140         spin_lock(&qctxt->lqc_lock);
1141         qctxt->lqc_handler = handler;
1142         qctxt->lqc_sb = sb;
1143         qctxt->lqc_import = NULL;
1144         qctxt->lqc_recovery = 0;
1145         qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */
1146         qctxt->lqc_valid = 1;
1147         qctxt->lqc_cqs_boundary_factor = 4;
1148         qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE;
1149         qctxt->lqc_cqs_least_iunit = 2;
1150         qctxt->lqc_cqs_qs_factor = 2;
1151         qctxt->lqc_flags = 0;
1152         QUOTA_MASTER_UNREADY(qctxt);
1153         qctxt->lqc_bunit_sz = default_bunit_sz;
1154         qctxt->lqc_btune_sz = default_bunit_sz / 100 * default_btune_ratio;
1155         qctxt->lqc_iunit_sz = default_iunit_sz;
1156         qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100;
1157         qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes
1158                                           * after the last shrinking */
1159         qctxt->lqc_sync_blk = 0;
1160         spin_unlock(&qctxt->lqc_lock);
1161
1162         qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
1163                                                HASH_LQS_CUR_BITS,
1164                                                HASH_LQS_MAX_BITS,
1165                                                &lqs_hash_ops, 0);
1166         if (!qctxt->lqc_lqs_hash)
1167                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
1168
1169 #ifdef LPROCFS
1170         if (lquota_proc_setup(obd, is_master(obd, qctxt, 0, 0)))
1171                 CERROR("initialize proc for %s error!\n", obd->obd_name);
1172 #endif
1173
1174         RETURN(rc);
1175 }
1176
1177 static int check_lqs(struct lustre_quota_ctxt *qctxt)
1178 {
1179         int rc;
1180         ENTRY;
1181
1182         rc = !atomic_read(&qctxt->lqc_lqs);
1183
1184         RETURN(rc);
1185 }
1186
1187
1188 void hash_put_lqs(void *obj, void *data)
1189 {
1190         lqs_putref((struct lustre_qunit_size *)obj);
1191 }
1192
1193 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
1194 {
1195         struct lustre_qunit *qunit, *tmp;
1196         struct list_head tmp_list;
1197         struct l_wait_info lwi = { 0 };
1198         int i;
1199         ENTRY;
1200
1201         INIT_LIST_HEAD(&tmp_list);
1202
1203         spin_lock(&qctxt->lqc_lock);
1204         qctxt->lqc_valid = 0;
1205         spin_unlock(&qctxt->lqc_lock);
1206
1207         spin_lock(&qunit_hash_lock);
1208         for (i = 0; i < NR_DQHASH; i++) {
1209                 list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
1210                         if (qunit->lq_ctxt != qctxt)
1211                                 continue;
1212                         remove_qunit_nolock(qunit);
1213                         list_add(&qunit->lq_hash, &tmp_list);
1214                 }
1215         }
1216         spin_unlock(&qunit_hash_lock);
1217
1218         list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
1219                 list_del_init(&qunit->lq_hash);
1220                 compute_lqs_after_removing_qunit(qunit);
1221
1222                 /* wake up all waiters */
1223                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0);
1224                 wake_up(&qunit->lq_waitq);
1225                 qunit_put(qunit);
1226         }
1227
1228         /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
1229          * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
1230         while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
1231                 cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
1232                 cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
1233                                      cfs_time_seconds(1));
1234         }
1235
1236         lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
1237         l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
1238         lustre_hash_exit(qctxt->lqc_lqs_hash);
1239
1240         ptlrpcd_decref();
1241
1242 #ifdef LPROCFS
1243         if (lquota_proc_cleanup(qctxt))
1244                 CERROR("cleanup proc error!\n");
1245 #endif
1246
1247         EXIT;
1248 }
1249
1250 struct qslave_recov_thread_data {
1251         struct obd_device *obd;
1252         struct lustre_quota_ctxt *qctxt;
1253         struct completion comp;
1254 };
1255
1256 /* FIXME only recovery block quota by now */
1257 static int qslave_recovery_main(void *arg)
1258 {
1259         struct qslave_recov_thread_data *data = arg;
1260         struct obd_device *obd = data->obd;
1261         struct lustre_quota_ctxt *qctxt = data->qctxt;
1262         unsigned int type;
1263         int rc = 0;
1264         ENTRY;
1265
1266         ptlrpc_daemonize("qslave_recovd");
1267
1268         complete(&data->comp);
1269
1270         if (qctxt->lqc_recovery)
1271                 RETURN(0);
1272         qctxt->lqc_recovery = 1;
1273
1274         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
1275                 struct qunit_data qdata;
1276                 struct quota_info *dqopt = sb_dqopt(qctxt->lqc_sb);
1277                 struct list_head id_list;
1278                 struct dquot_id *dqid, *tmp;
1279                 int ret;
1280
1281                 LOCK_DQONOFF_MUTEX(dqopt);
1282                 if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) {
1283                         UNLOCK_DQONOFF_MUTEX(dqopt);
1284                         break;
1285                 }
1286
1287                 LASSERT(dqopt->files[type] != NULL);
1288                 INIT_LIST_HEAD(&id_list);
1289 #ifndef KERNEL_SUPPORTS_QUOTA_READ
1290                 rc = fsfilt_qids(obd, dqopt->files[type], NULL, type, &id_list);
1291 #else
1292                 rc = fsfilt_qids(obd, NULL, dqopt->files[type], type, &id_list);
1293 #endif
1294                 UNLOCK_DQONOFF_MUTEX(dqopt);
1295                 if (rc)
1296                         CERROR("Get ids from quota file failed. (rc:%d)\n", rc);
1297
1298                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
1299                         list_del_init(&dqid->di_link);
1300                         /* skip slave recovery on itself */
1301                         if (is_master(obd, qctxt, dqid->di_id, type))
1302                                 goto free;
1303                         if (rc && rc != -EBUSY)
1304                                 goto free;
1305
1306                         qdata.qd_id = dqid->di_id;
1307                         qdata.qd_flags = type;
1308                         QDATA_SET_BLK(&qdata);
1309                         qdata.qd_count = 0;
1310
1311                         ret = check_cur_qunit(obd, qctxt, &qdata);
1312                         if (ret > 0) {
1313                                 int opc;
1314                                 opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
1315                                 rc = schedule_dqacq(obd, qctxt, &qdata, opc,
1316                                                     0, NULL);
1317                                 if (rc == -EDQUOT)
1318                                         rc = 0;
1319                         } else {
1320                                 rc = 0;
1321                         }
1322
1323                         if (rc)
1324                                 CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
1325                                        "qslave recovery failed! (id:%d type:%d "
1326                                        " rc:%d)\n", dqid->di_id, type, rc);
1327 free:
1328                         OBD_FREE_PTR(dqid);
1329                 }
1330         }
1331
1332         qctxt->lqc_recovery = 0;
1333         RETURN(rc);
1334 }
1335
1336 void
1337 qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
1338 {
1339         struct qslave_recov_thread_data data;
1340         int rc;
1341         ENTRY;
1342
1343         if (!ll_sb_any_quota_active(qctxt->lqc_sb))
1344                 goto exit;
1345
1346         data.obd = obd;
1347         data.qctxt = qctxt;
1348         init_completion(&data.comp);
1349
1350         rc = kernel_thread(qslave_recovery_main, &data, CLONE_VM|CLONE_FILES);
1351         if (rc < 0) {
1352                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
1353                 goto exit;
1354         }
1355         wait_for_completion(&data.comp);
1356 exit:
1357         EXIT;
1358 }
1359
1360
1361 /*
1362  * lqs<->qctxt hash operations
1363  */
1364
1365 /* string hashing using djb2 hash algorithm */
1366 static unsigned
1367 lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
1368 {
1369         struct quota_adjust_qunit *lqs_key;
1370         unsigned hash;
1371         ENTRY;
1372
1373         LASSERT(key);
1374         lqs_key = (struct quota_adjust_qunit *)key;
1375         hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id;
1376
1377         RETURN(hash & mask);
1378 }
1379
1380 static int
1381 lqs_compare(void *key, struct hlist_node *hnode)
1382 {
1383         struct lustre_qunit_size *q;
1384         int rc;
1385         ENTRY;
1386
1387         LASSERT(key);
1388         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
1389
1390         spin_lock(&q->lqs_lock);
1391         rc = (q->lqs_key == *((unsigned long long *)key));
1392         spin_unlock(&q->lqs_lock);
1393
1394         RETURN(rc);
1395 }
1396
1397 static void *
1398 lqs_get(struct hlist_node *hnode)
1399 {
1400         struct lustre_qunit_size *q =
1401             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
1402         ENTRY;
1403
1404         if (atomic_inc_return(&q->lqs_refcount) == 2) /* quota_search_lqs */
1405                 atomic_inc(&q->lqs_ctxt->lqc_lqs);
1406         CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
1407                q, atomic_read(&q->lqs_refcount));
1408
1409         RETURN(q);
1410 }
1411
1412 static void *
1413 lqs_put(struct hlist_node *hnode)
1414 {
1415         struct lustre_qunit_size *q =
1416             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
1417         ENTRY;
1418
1419         LASSERT(atomic_read(&q->lqs_refcount) > 0);
1420
1421         if (atomic_dec_return(&q->lqs_refcount) == 1)
1422                 if (atomic_dec_and_test(&q->lqs_ctxt->lqc_lqs))
1423                         cfs_waitq_signal(&q->lqs_ctxt->lqc_lqs_waitq);
1424
1425         CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
1426                q, atomic_read(&q->lqs_refcount));
1427
1428         RETURN(q);
1429 }
1430
1431 static void
1432 lqs_exit(struct hlist_node *hnode)
1433 {
1434         struct lustre_qunit_size *q;
1435         ENTRY;
1436
1437         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
1438         /*
1439          * Nothing should be left. User of lqs put it and
1440          * lqs also was deleted from table by this time
1441          * so we should have 0 refs.
1442          */
1443         LASSERTF(atomic_read(&q->lqs_refcount) == 0,
1444                  "Busy lqs %p with %d refs\n", q,
1445                  atomic_read(&q->lqs_refcount));
1446         OBD_FREE_PTR(q);
1447         EXIT;
1448 }
1449
1450 static lustre_hash_ops_t lqs_hash_ops = {
1451         .lh_hash    = lqs_hash,
1452         .lh_compare = lqs_compare,
1453         .lh_get     = lqs_get,
1454         .lh_put     = lqs_put,
1455         .lh_exit    = lqs_exit
1456 };
1457 #endif /* HAVE_QUOTA_SUPPORT */