Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / quota / quota_master.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/quota/quota_master.c
5  *  Lustre Quota Master request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Niu YaWei <niu@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
13  *
14  */
15 #ifndef EXPORT_SYMTAB
16 # define EXPORT_SYMTAB
17 #endif
18
19 #define DEBUG_SUBSYSTEM S_MDS
20
21 #include <linux/version.h>
22 #include <linux/fs.h>
23 #include <asm/unistd.h>
24 #include <linux/slab.h>
25 #include <linux/quotaops.h>
26 #include <linux/module.h>
27 #include <linux/init.h>
28 #include <linux/quota.h>
29
30 #include <obd_class.h>
31 #include <lustre_quota.h>
32 #include <lustre_fsfilt.h>
33 #include <lustre_mds.h>
34
35 #include "quota_internal.h"
36
37 /* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */
38 static struct list_head lustre_dquot_hash[NR_DQHASH];
39 static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED;
40
41 cfs_mem_cache_t *lustre_dquot_cachep;
42
43 int lustre_dquot_init(void)
44 {
45         int i;
46         ENTRY;
47
48         LASSERT(lustre_dquot_cachep == NULL);
49         lustre_dquot_cachep = cfs_mem_cache_create("lustre_dquot_cache",
50                                                    sizeof(struct lustre_dquot),
51                                                    0, 0);
52         if (!lustre_dquot_cachep)
53                 return (-ENOMEM);
54
55         for (i = 0; i < NR_DQHASH; i++) {
56                 INIT_LIST_HEAD(lustre_dquot_hash + i);
57         }
58         RETURN(0);
59 }
60
61 void lustre_dquot_exit(void)
62 {
63         int i;
64         ENTRY;
65         /* FIXME cleanup work ?? */
66
67         for (i = 0; i < NR_DQHASH; i++) {
68                 LASSERT(list_empty(lustre_dquot_hash + i));
69         }
70         if (lustre_dquot_cachep) {
71                 int rc;
72                 rc = cfs_mem_cache_destroy(lustre_dquot_cachep);
73                 LASSERTF(rc == 0,"couldn't destroy lustre_dquot_cachep slab\n");
74                 lustre_dquot_cachep = NULL;
75         }
76         EXIT;
77 }
78
79 static inline int
80 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
81              __attribute__((__const__));
82
83 static inline int
84 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
85 {
86         unsigned long tmp = ((unsigned long)info >> L1_CACHE_SHIFT) ^ id;
87         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
88         return tmp;
89 }
90
91 /* caller must hold dquot_hash_lock */
92 static struct lustre_dquot *find_dquot(int hashent,
93                                        struct lustre_quota_info *lqi, qid_t id,
94                                        int type)
95 {
96         struct lustre_dquot *dquot;
97         ENTRY;
98
99         LASSERT_SPIN_LOCKED(&dquot_hash_lock);
100         list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) {
101                 if (dquot->dq_info == lqi &&
102                     dquot->dq_id == id && dquot->dq_type == type)
103                         RETURN(dquot);
104         }
105         RETURN(NULL);
106 }
107
108 static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi,
109                                         qid_t id, int type)
110 {
111         struct lustre_dquot *dquot = NULL;
112         ENTRY;
113
114         OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot));
115         if (dquot == NULL)
116                 RETURN(NULL);
117
118         INIT_LIST_HEAD(&dquot->dq_hash);
119         init_mutex_locked(&dquot->dq_sem);
120         dquot->dq_refcnt = 1;
121         dquot->dq_info = lqi;
122         dquot->dq_id = id;
123         dquot->dq_type = type;
124         dquot->dq_status = DQ_STATUS_AVAIL;
125
126         RETURN(dquot);
127 }
128
129 static void free_dquot(struct lustre_dquot *dquot)
130 {
131         OBD_SLAB_FREE(dquot, lustre_dquot_cachep, sizeof(*dquot));
132 }
133
134 static void insert_dquot_nolock(struct lustre_dquot *dquot)
135 {
136         struct list_head *head = lustre_dquot_hash +
137             dquot_hashfn(dquot->dq_info, dquot->dq_id, dquot->dq_type);
138         LASSERT(list_empty(&dquot->dq_hash));
139         list_add(&dquot->dq_hash, head);
140 }
141
142 static void remove_dquot_nolock(struct lustre_dquot *dquot)
143 {
144         LASSERT(!list_empty(&dquot->dq_hash));
145         list_del_init(&dquot->dq_hash);
146 }
147
148 static void lustre_dqput(struct lustre_dquot *dquot)
149 {
150         ENTRY;
151         spin_lock(&dquot_hash_lock);
152         LASSERT(dquot->dq_refcnt);
153         dquot->dq_refcnt--;
154         if (!dquot->dq_refcnt) {
155                 remove_dquot_nolock(dquot);
156                 free_dquot(dquot);
157         }
158         spin_unlock(&dquot_hash_lock);
159         EXIT;
160 }
161
162 static struct lustre_dquot *lustre_dqget(struct obd_device *obd,
163                                          struct lustre_quota_info *lqi,
164                                          qid_t id, int type)
165 {
166         unsigned int hashent = dquot_hashfn(lqi, id, type);
167         struct lustre_dquot *dquot, *empty;
168         ENTRY;
169
170         if ((empty = alloc_dquot(lqi, id, type)) == NULL)
171                 RETURN(ERR_PTR(-ENOMEM));
172
173         spin_lock(&dquot_hash_lock);
174         if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) {
175                 dquot->dq_refcnt++;
176                 spin_unlock(&dquot_hash_lock);
177                 free_dquot(empty);
178         } else {
179                 int rc;
180
181                 dquot = empty;
182                 insert_dquot_nolock(dquot);
183                 spin_unlock(&dquot_hash_lock);
184
185                 rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT);
186                 up(&dquot->dq_sem);
187                 if (rc) {
188                         CERROR("can't read dquot from admin quotafile! "
189                                "(rc:%d)\n", rc);
190                         lustre_dqput(dquot);
191                         RETURN(ERR_PTR(rc));
192                 }
193
194         }
195
196         LASSERT(dquot);
197         RETURN(dquot);
198 }
199
200 static void init_oqaq(struct quota_adjust_qunit *oqaq,
201                       struct lustre_quota_ctxt *qctxt,
202                       qid_t id, int type)
203 {
204         struct lustre_qunit_size *lqs = NULL;
205
206         oqaq->qaq_id = id;
207         oqaq->qaq_flags = type;
208         quota_search_lqs(NULL, oqaq, qctxt, &lqs);
209         if (lqs) {
210                 spin_lock(&lqs->lqs_lock);
211                 oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz;
212                 oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz;
213                 oqaq->qaq_flags    = lqs->lqs_flags;
214                 spin_unlock(&lqs->lqs_lock);
215                 lqs_putref(lqs);
216         } else {
217                 CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
218                 oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz;
219                 oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz;
220         }
221 }
222
223 int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type,
224                           __u32 is_blk)
225 {
226         struct mds_obd *mds = &obd->u.mds;
227         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
228         struct obd_device *lov_mds_obd = class_exp2obd(mds->mds_osc_exp);
229         struct lov_obd *lov = &lov_mds_obd->u.lov;
230         __u32 ost_num = lov->desc.ld_tgt_count, mdt_num = 1;
231         struct quota_adjust_qunit *oqaq = NULL;
232         unsigned int uid = 0, gid = 0;
233         struct lustre_quota_info *info = &mds->mds_quota_info;
234         struct lustre_dquot *dquot = NULL;
235         int adjust_res = 0;
236         int rc = 0;
237         ENTRY;
238
239         LASSERT(mds);
240         dquot = lustre_dqget(obd, info, id, type);
241         if (IS_ERR(dquot))
242                 RETURN(PTR_ERR(dquot));
243
244         OBD_ALLOC_PTR(oqaq);
245         if (!oqaq)
246                 GOTO(out, rc = -ENOMEM);
247
248         down(&dquot->dq_sem);
249         init_oqaq(oqaq, qctxt, id, type);
250
251         rc = dquot_create_oqaq(qctxt, dquot, ost_num, mdt_num,
252                                is_blk ? LQUOTA_FLAGS_ADJBLK :
253                                LQUOTA_FLAGS_ADJINO, oqaq);
254
255         if (rc < 0) {
256                 CDEBUG(D_ERROR, "create oqaq failed! (rc:%d)\n", rc);
257                 GOTO(out_sem, rc);
258         }
259         QAQ_DEBUG(oqaq, "show oqaq.\n")
260
261         if (!QAQ_IS_ADJBLK(oqaq) && !QAQ_IS_ADJINO(oqaq))
262                 GOTO(out_sem, rc);
263
264         /* adjust the mds slave qunit size */
265         adjust_res = quota_adjust_slave_lqs(oqaq, qctxt);
266         if (adjust_res <= 0) {
267                 if (adjust_res < 0) {
268                         rc = adjust_res;
269                         CDEBUG(D_ERROR, "adjust mds slave's qunit size failed! \
270                                (rc:%d)\n", rc);
271                 } else {
272                         CDEBUG(D_QUOTA, "qunit doesn't need to be adjusted.\n");
273                 }
274                 GOTO(out_sem, rc);
275         }
276
277         if (type)
278                 gid = dquot->dq_id;
279         else
280                 uid = dquot->dq_id;
281
282         up(&dquot->dq_sem);
283
284         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
285         if (rc == -EDQUOT || rc == -EBUSY) {
286                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
287                 rc = 0;
288         }
289         if (rc) {
290                 CDEBUG(D_ERROR, "mds fail to adjust file quota! \
291                                (rc:%d)\n", rc);
292                 GOTO(out, rc);
293         }
294
295         /* only when block qunit is reduced, boardcast to osts */
296         if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq))
297                 rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq);
298
299 out:
300         lustre_dqput(dquot);
301         if (oqaq)
302                 OBD_FREE_PTR(oqaq);
303
304         RETURN(rc);
305 out_sem:
306         up(&dquot->dq_sem);
307         goto out;
308 }
309
310 int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc)
311 {
312         struct mds_obd *mds = &obd->u.mds;
313         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
314         struct lustre_quota_info *info = &mds->mds_quota_info;
315         struct lustre_dquot *dquot = NULL;
316         __u64 *usage = NULL;
317         __u32 hlimit = 0, slimit = 0;
318         time_t *time = NULL;
319         unsigned int grace = 0;
320         struct lustre_qunit_size *lqs = NULL;
321         int rc = 0;
322         ENTRY;
323
324         OBD_FAIL_RETURN(OBD_FAIL_OBD_DQACQ, -EIO);
325
326         dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata));
327         if (IS_ERR(dquot))
328                 RETURN(PTR_ERR(dquot));
329
330         DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n");
331         QINFO_DEBUG(dquot->dq_info, "get dquot in dqadq_handler\n");
332
333         down(&mds->mds_qonoff_sem);
334         down(&dquot->dq_sem);
335
336         if (dquot->dq_status & DQ_STATUS_RECOVERY) {
337                 DQUOT_DEBUG(dquot, "this dquot is under recovering.\n");
338                 GOTO(out, rc = -EBUSY);
339         }
340
341         if (QDATA_IS_BLK(qdata)) {
342                 grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace;
343                 usage = &dquot->dq_dqb.dqb_curspace;
344                 hlimit = dquot->dq_dqb.dqb_bhardlimit;
345                 slimit = dquot->dq_dqb.dqb_bsoftlimit;
346                 time = &dquot->dq_dqb.dqb_btime;
347         } else {
348                 grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_igrace;
349                 usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes;
350                 hlimit = dquot->dq_dqb.dqb_ihardlimit;
351                 slimit = dquot->dq_dqb.dqb_isoftlimit;
352                 time = &dquot->dq_dqb.dqb_itime;
353         }
354
355         /* if the quota limit in admin quotafile is zero, we just inform
356          * slave to clear quota limit with zero qd_count */
357         if (hlimit == 0 && slimit == 0) {
358                 qdata->qd_count = 0;
359                 GOTO(out, rc);
360         }
361
362         switch (opc) {
363         case QUOTA_DQACQ:
364                 if (hlimit &&
365                     QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > hlimit)
366                 {
367                         if (QDATA_IS_CHANGE_QS(qdata) &&
368                             QUSG(*usage, QDATA_IS_BLK(qdata)) < hlimit)
369                                 qdata->qd_count = (hlimit -
370                                         QUSG(*usage, QDATA_IS_BLK(qdata)))
371                                         * QUOTABLOCK_SIZE;
372                         else
373                                 GOTO(out, rc = -EDQUOT);
374                 }
375
376                 if (slimit &&
377                     QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > slimit) {
378                         if (*time && cfs_time_current_sec() >= *time)
379                                 GOTO(out, rc = -EDQUOT);
380                         else if (!*time)
381                                 *time = cfs_time_current_sec() + grace;
382                 }
383
384                 *usage += qdata->qd_count;
385                 break;
386         case QUOTA_DQREL:
387                 /* The usage in administrative file might be incorrect before
388                  * recovery done */
389                 if (*usage - qdata->qd_count < 0)
390                         *usage = 0;
391                 else
392                         *usage -= qdata->qd_count;
393
394                 /* (usage <= soft limit) but not (usage < soft limit) */
395                 if (!slimit || QUSG(*usage, QDATA_IS_BLK(qdata)) <= slimit)
396                         *time = 0;
397                 break;
398         default:
399                 LBUG();
400         }
401
402         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
403         EXIT;
404 out:
405         up(&dquot->dq_sem);
406         up(&mds->mds_qonoff_sem);
407         lustre_dqput(dquot);
408         if (rc != -EDQUOT)
409                 dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata),
410                                       QDATA_IS_BLK(qdata));
411
412         quota_search_lqs(qdata, NULL, qctxt, &lqs);
413         if (QDATA_IS_BLK(qdata)) {
414                 if (!lqs) {
415                         CDEBUG(D_INFO, "Can't find the lustre qunit size!\n");
416                         qdata->qd_qunit  = qctxt->lqc_bunit_sz;
417                 } else {
418                         spin_lock(&lqs->lqs_lock);
419                         qdata->qd_qunit  = lqs->lqs_bunit_sz;
420                         spin_unlock(&lqs->lqs_lock);
421                 }
422                 QDATA_SET_ADJBLK(qdata);
423         } else {
424                 if (!lqs) {
425                         CDEBUG(D_INFO, "Can't find the lustre qunit size!\n");
426                         qdata->qd_qunit  = qctxt->lqc_iunit_sz;
427                 } else {
428                         spin_lock(&lqs->lqs_lock);
429                         qdata->qd_qunit  = lqs->lqs_iunit_sz;
430                         spin_unlock(&lqs->lqs_lock);
431                 }
432                 QDATA_SET_ADJINO(qdata);
433         }
434
435         QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n");
436         if (lqs)
437                 lqs_putref(lqs);
438
439         return rc;
440 }
441
442 int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[],
443                      unsigned int qpids[], int rc, int opc)
444 {
445         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
446         int rc2 = 0;
447         ENTRY;
448
449         if (rc && rc != -EDQUOT && rc != ENOLCK)
450                 RETURN(0);
451
452         switch (opc) {
453         case FSFILT_OP_RENAME:
454                 /* acquire/release block quota on owner of original parent */
455                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0);
456                 /* fall-through */
457         case FSFILT_OP_SETATTR:
458                 /* acquire/release file quota on original owner */
459                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0);
460                 /* fall-through */
461         case FSFILT_OP_CREATE:
462         case FSFILT_OP_UNLINK:
463                 /* acquire/release file/block quota on owner of child
464                  * (or current owner) */
465                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0);
466                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
467                 /* acquire/release block quota on owner of parent
468                  * (or original owner) */
469                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
470                 break;
471         default:
472                 LBUG();
473                 break;
474         }
475
476         if (rc2)
477                 CDEBUG(rc2 == -EAGAIN ? D_QUOTA: D_ERROR,
478                        "mds adjust qunit failed! (opc:%d rc:%d)\n", opc, rc2);
479         RETURN(0);
480 }
481
482 int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[],
483                         unsigned int qpids[], int rc, int opc)
484 {
485         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
486         int rc2 = 0;
487         ENTRY;
488
489         if (rc && rc != -EDQUOT)
490                 RETURN(0);
491
492         switch (opc) {
493         case FSFILT_OP_SETATTR:
494                 /* acquire/release block quota on original & current owner */
495                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
496                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
497                 break;
498         case FSFILT_OP_UNLINK:
499                 /* release block quota on this owner */
500         case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */
501                 /* acquire block quota on this owner */
502                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
503                 break;
504         default:
505                 LBUG();
506                 break;
507         }
508
509         if (rc || rc2) {
510                 if (!rc)
511                         rc = rc2;
512                 CDEBUG(rc == -EAGAIN ? D_QUOTA: D_ERROR,
513                        "filter adjust qunit failed! (opc:%d rc%d)\n",
514                        opc, rc);
515         }
516
517         RETURN(0);
518 }
519
520 static const char prefix[] = "OBJECTS/";
521
522 int mds_quota_get_version(struct obd_device *obd,
523                           lustre_quota_version_t *version)
524 {
525         struct mds_obd *mds = &obd->u.mds;
526         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
527
528         *version = qinfo->qi_version;
529
530         return 0;
531 }
532
533 int mds_quota_set_version(struct obd_device *obd, lustre_quota_version_t version)
534 {
535         struct mds_obd *mds = &obd->u.mds;
536         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
537         int rc = 0, i;
538
539         if (version != LUSTRE_QUOTA_V1 &&
540             version != LUSTRE_QUOTA_V2)
541                 return -EINVAL;
542
543         down(&mds->mds_qonoff_sem);
544
545         /* no need to change version? nothing to do then */
546         if (qinfo->qi_version == version)
547                 goto out;
548
549         for (i = 0; i < MAXQUOTAS; i++) {
550                 /* quota file has been opened ? */
551                 if (qinfo->qi_files[i]) {
552                         rc = -EBUSY;
553                         goto out;
554                 }
555         }
556
557         CDEBUG(D_INFO, "changing quota version %d -> %d\n", qinfo->qi_version,
558                version);
559
560         qinfo->qi_version = version;
561
562 out:
563         up(&mds->mds_qonoff_sem);
564
565         return rc;
566 }
567
568 int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl)
569 {
570         struct mds_obd *mds = &obd->u.mds;
571         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
572         int rc = 0, i;
573         char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
574         char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
575         char name[64];
576         struct lvfs_run_ctxt saved;
577
578         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
579
580         down(&mds->mds_qonoff_sem);
581
582         for (i = 0; i < MAXQUOTAS; i++) {
583                 struct file *fp;
584                 char* quotafile = (qinfo->qi_version == LUSTRE_QUOTA_V1)?
585                                    quotafiles_v1[i]:quotafiles_v2[i];
586
587                 if (!Q_TYPESET(oqctl, i))
588                         continue;
589
590                 /* quota file has been opened ? */
591                 if (qinfo->qi_files[i]) {
592                         rc = -EBUSY;
593                         goto out;
594                 }
595
596                 LASSERT(strlen(quotafile) + sizeof(prefix) <= sizeof(name));
597                 sprintf(name, "%s%s", prefix, quotafile);
598
599                 fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
600                 if (IS_ERR(fp)) {
601                         rc = PTR_ERR(fp);
602                         CERROR("error invalidating admin quotafile %s (rc:%d)\n",
603                                name, rc);
604                 }
605                 else
606                         filp_close(fp, 0);
607         }
608
609 out:
610         up(&mds->mds_qonoff_sem);
611
612         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
613
614         return rc;
615 }
616
617 int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl)
618 {
619         struct mds_obd *mds = &obd->u.mds;
620         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
621         char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
622         char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
623         struct lvfs_run_ctxt saved;
624         char name[64];
625         int i, rc = 0;
626         ENTRY;
627
628         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
629
630         down(&mds->mds_qonoff_sem);
631
632         for (i = 0; i < MAXQUOTAS && !rc; i++) {
633                 struct file *fp;
634                 char* quotafile = (qinfo->qi_version == LUSTRE_QUOTA_V1)?
635                                         quotafiles_v1[i]:quotafiles_v2[i];
636
637                 if (!Q_TYPESET(oqctl, i))
638                         continue;
639
640                 /* quota file has been opened ? */
641                 if (qinfo->qi_files[i]) {
642                         CWARN("init %s admin quotafile while quota on.\n",
643                               i == USRQUOTA ? "user" : "group");
644                         continue;
645                 }
646
647                 LASSERT(strlen(quotafile) + sizeof(prefix) <= sizeof(name));
648                 sprintf(name, "%s%s", prefix, quotafile);
649
650                 /* check if quota file exists and is correct */
651                 fp = filp_open(name, O_RDONLY, 0);
652                 if (!IS_ERR(fp)) {
653                         /* irregular file is not the right place for quota */
654                         if (!S_ISREG(fp->f_dentry->d_inode->i_mode)) {
655                                 CERROR("admin quota file %s is not "
656                                        "regular!", quotafile);
657                                 filp_close(fp, 0);
658                                 rc = -EINVAL;
659                                 break;
660                         }
661                         qinfo->qi_files[i] = fp;
662                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK);
663                         qinfo->qi_files[i] = 0;
664                         filp_close(fp, 0);
665                 }
666                 else
667                         rc = PTR_ERR(fp);
668
669                 if (!rc)
670                         continue;
671
672                 /* -EINVAL may be returned by quotainfo for bad quota file */
673                 if (rc != -ENOENT && rc != -EINVAL) {
674                         CERROR("error opening old quota file %s (%d)\n",
675                                name, rc);
676                         break;
677                 }
678
679                 CDEBUG(D_INFO, "%s new quota file %s\n", name,
680                        rc == -ENOENT ? "creating" : "overwriting");
681
682                 /* create quota file overwriting old if needed */
683                 fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
684                 if (IS_ERR(fp)) {
685                         rc = PTR_ERR(fp);
686                         CERROR("error creating admin quotafile %s (rc:%d)\n",
687                                name, rc);
688                         break;
689                 }
690
691                 qinfo->qi_files[i] = fp;
692
693                 switch (qinfo->qi_version) {
694                 case LUSTRE_QUOTA_V1:
695                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO);
696                         if (rc)
697                                 CERROR("error init %s admin quotafile! (rc:%d)\n",
698                                        i == USRQUOTA ? "user" : "group", rc);
699                         break;
700                 case LUSTRE_QUOTA_V2:
701                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CONVERT);
702                         if (rc)
703                                 CERROR("error convert %s admin quotafile! (rc:%d)\n",
704                                        i == USRQUOTA ? "user" : "group", rc);
705                         break;
706                 default:
707                         LBUG();
708                 }
709
710                 filp_close(fp, 0);
711                 qinfo->qi_files[i] = NULL;
712         }
713         up(&mds->mds_qonoff_sem);
714
715         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
716         RETURN(rc);
717 }
718
719 static int close_quota_files(struct obd_quotactl *oqctl,
720                              struct lustre_quota_info *qinfo)
721 {
722         int i, rc = 0;
723         ENTRY;
724
725         for (i = 0; i < MAXQUOTAS; i++) {
726                 if (!Q_TYPESET(oqctl, i))
727                         continue;
728                 if (qinfo->qi_files[i] == NULL) {
729                         rc = -ESRCH;
730                         continue;
731                 }
732                 filp_close(qinfo->qi_files[i], 0);
733                 qinfo->qi_files[i] = NULL;
734         }
735         RETURN(rc);
736 }
737
738 int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
739 {
740         struct mds_obd *mds = &obd->u.mds;
741         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
742         const char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
743         const char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
744         char name[64];
745         int i, rc = 0;
746         ENTRY;
747
748         /* open admin quota files and read quotafile info */
749         for (i = 0; i < MAXQUOTAS; i++) {
750                 struct file *fp;
751                 const char* quotafile = qinfo->qi_version == LUSTRE_QUOTA_V1?
752                                         quotafiles_v1[i] : quotafiles_v2[i];
753
754                 if (!Q_TYPESET(oqctl, i))
755                         continue;
756
757                 LASSERT(strlen(quotafile)
758                         + sizeof(prefix) <= sizeof(name));
759                 sprintf(name, "%s%s", prefix, quotafile);
760
761                 if (qinfo->qi_files[i] != NULL) {
762                         rc = -EBUSY;
763                         break;
764                 }
765
766                 fp = filp_open(name, O_RDWR, 0);
767                 /* handle transparent migration to 64 bit quota file */
768                 if (IS_ERR(fp) && PTR_ERR(fp) == -ENOENT &&
769                     qinfo->qi_version == LUSTRE_QUOTA_V2) {
770                         CDEBUG(D_INFO, "attempting to convert V1 quota file to"
771                                        " V2 format\n");
772                         fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
773                         if (!IS_ERR(fp)) {
774                                 qinfo->qi_files[i] = fp;
775                                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CONVERT);
776                                 if (rc) {
777                                         CERROR("error convert %s admin "
778                                                "quotafile! (rc:%d)\n",
779                                                i == USRQUOTA ? "user" : "group",
780                                                rc);
781                                         break;
782                                 }
783                         }
784                 }
785
786                 if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) {
787                         rc = IS_ERR(fp) ? PTR_ERR(fp) : -EINVAL;
788                         CERROR("error open/create %s! (rc:%d)\n", name, rc);
789                         break;
790                 }
791                 qinfo->qi_files[i] = fp;
792
793                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK);
794                 if (rc) {
795                         CERROR("invalid quota file %s! (rc:%d)\n", name, rc);
796                         break;
797                 }
798
799                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_RD_INFO);
800                 if (rc) {
801                         CERROR("error read quotainfo of %s! (rc:%d)\n", name,
802                                rc);
803                         break;
804                 }
805         }
806
807         if (rc && rc != -EBUSY)
808                 close_quota_files(oqctl, qinfo);
809
810         RETURN(rc);
811 }
812
813 static int mds_admin_quota_off(struct obd_device *obd,
814                                struct obd_quotactl *oqctl)
815 {
816         struct mds_obd *mds = &obd->u.mds;
817         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
818         int rc;
819         ENTRY;
820
821         /* close admin quota files */
822         rc = close_quota_files(oqctl, qinfo);
823         RETURN(rc);
824 }
825
826 int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
827 {
828         struct mds_obd *mds = &obd->u.mds;
829         struct obd_device_target *obt = &obd->u.obt;
830         struct lvfs_run_ctxt saved;
831         int rc;
832         ENTRY;
833
834         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
835                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
836                 atomic_inc(&obt->obt_quotachecking);
837                 RETURN(-EBUSY);
838         }
839
840         down(&mds->mds_qonoff_sem);
841         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
842         rc = mds_admin_quota_on(obd, oqctl);
843         if (rc)
844                 goto out;
845
846         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
847         if (rc)
848                 goto out;
849
850         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
851         if (!rc)
852                 obt->obt_qctxt.lqc_status = 1;
853 out:
854         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
855         up(&mds->mds_qonoff_sem);
856         atomic_inc(&obt->obt_quotachecking);
857         RETURN(rc);
858 }
859
860 int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl)
861 {
862         struct mds_obd *mds = &obd->u.mds;
863         struct obd_device_target *obt = &obd->u.obt;
864         struct lvfs_run_ctxt saved;
865         int rc, rc2;
866         ENTRY;
867
868         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
869                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
870                 atomic_inc(&obt->obt_quotachecking);
871                 RETURN(-EBUSY);
872         }
873
874         down(&mds->mds_qonoff_sem);
875         /* close admin quota files */
876         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
877         mds_admin_quota_off(obd, oqctl);
878
879         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
880         rc2 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
881         if (!rc2)
882                 obt->obt_qctxt.lqc_status = 0;
883
884         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
885         up(&mds->mds_qonoff_sem);
886         atomic_inc(&obt->obt_quotachecking);
887
888         RETURN(rc ?: rc2);
889 }
890
891 int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
892 {
893         struct mds_obd *mds = &obd->u.mds;
894         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
895         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
896         int rc;
897         ENTRY;
898
899         down(&mds->mds_qonoff_sem);
900         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
901                 rc = -ESRCH;
902                 goto out;
903         }
904
905         qinfo->qi_info[oqctl->qc_type].dqi_bgrace = dqinfo->dqi_bgrace;
906         qinfo->qi_info[oqctl->qc_type].dqi_igrace = dqinfo->dqi_igrace;
907         qinfo->qi_info[oqctl->qc_type].dqi_flags = dqinfo->dqi_flags;
908
909         rc = fsfilt_quotainfo(obd, qinfo, oqctl->qc_type, QFILE_WR_INFO);
910
911 out:
912         up(&mds->mds_qonoff_sem);
913         RETURN(rc);
914 }
915
916 int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
917 {
918         struct mds_obd *mds = &obd->u.mds;
919         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
920         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
921         int rc = 0;
922         ENTRY;
923
924         down(&mds->mds_qonoff_sem);
925         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
926                 rc = -ESRCH;
927                 goto out;
928         }
929
930         dqinfo->dqi_bgrace = qinfo->qi_info[oqctl->qc_type].dqi_bgrace;
931         dqinfo->dqi_igrace = qinfo->qi_info[oqctl->qc_type].dqi_igrace;
932         dqinfo->dqi_flags = qinfo->qi_info[oqctl->qc_type].dqi_flags;
933
934 out:
935         up(&mds->mds_qonoff_sem);
936         RETURN(rc);
937 }
938
939 int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt,
940                       struct lustre_dquot *dquot, __u32 ost_num, __u32 mdt_num,
941                       int type, struct quota_adjust_qunit *oqaq)
942 {
943         __u64 bunit_curr_o, iunit_curr_o;
944         unsigned long shrink_qunit_limit = qctxt->lqc_cqs_boundary_factor;
945         unsigned long cqs_factor = qctxt->lqc_cqs_qs_factor;
946         __u64 blimit = dquot->dq_dqb.dqb_bhardlimit ?
947                 dquot->dq_dqb.dqb_bhardlimit : dquot->dq_dqb.dqb_bsoftlimit;
948         __u64 ilimit = dquot->dq_dqb.dqb_ihardlimit ?
949                 dquot->dq_dqb.dqb_ihardlimit : dquot->dq_dqb.dqb_isoftlimit;
950         int rc = 0;
951         ENTRY;
952
953         if (!dquot || !oqaq)
954                 RETURN(-EINVAL);
955         LASSERT_SEM_LOCKED(&dquot->dq_sem);
956         LASSERT(oqaq->qaq_iunit_sz);
957         LASSERT(oqaq->qaq_bunit_sz);
958
959         /* don't change qunit size */
960         if (!qctxt->lqc_switch_qs)
961                 RETURN(rc);
962
963         bunit_curr_o = oqaq->qaq_bunit_sz;
964         iunit_curr_o = oqaq->qaq_iunit_sz;
965
966         if (dquot->dq_type == GRPQUOTA)
967                 QAQ_SET_GRP(oqaq);
968
969         if ((type & LQUOTA_FLAGS_ADJBLK) && blimit) {
970                 __u64 b_limitation =
971                         oqaq->qaq_bunit_sz * ost_num * shrink_qunit_limit;
972                 /* enlarge block qunit size */
973                 while (blimit >
974                        QUSG(dquot->dq_dqb.dqb_curspace + 2 * b_limitation, 1)) {
975                         oqaq->qaq_bunit_sz =
976                                 QUSG(oqaq->qaq_bunit_sz * cqs_factor, 1)
977                                 << QUOTABLOCK_BITS;
978                         b_limitation = oqaq->qaq_bunit_sz * ost_num *
979                                 shrink_qunit_limit;
980                 }
981
982                 if (oqaq->qaq_bunit_sz > qctxt->lqc_bunit_sz)
983                         oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz;
984
985                 /* shrink block qunit size */
986                 while (blimit <
987                        QUSG(dquot->dq_dqb.dqb_curspace + b_limitation, 1)) {
988                         do_div(oqaq->qaq_bunit_sz , cqs_factor);
989                         oqaq->qaq_bunit_sz = QUSG(oqaq->qaq_bunit_sz, 1) <<
990                                 QUOTABLOCK_BITS;
991                         b_limitation = oqaq->qaq_bunit_sz * ost_num *
992                                 shrink_qunit_limit;
993                         if (oqaq->qaq_bunit_sz <  qctxt->lqc_cqs_least_bunit)
994                                 break;
995                 }
996
997                 if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit)
998                         oqaq->qaq_bunit_sz = qctxt->lqc_cqs_least_bunit;
999
1000                 if (bunit_curr_o != oqaq->qaq_bunit_sz)
1001                         QAQ_SET_ADJBLK(oqaq);
1002
1003         }
1004
1005         if ((type & LQUOTA_FLAGS_ADJINO) && ilimit) {
1006                 __u64 i_limitation =
1007                         oqaq->qaq_iunit_sz * mdt_num * shrink_qunit_limit;
1008                 /* enlarge file qunit size */
1009                 while (ilimit > dquot->dq_dqb.dqb_curinodes
1010                        + 2 * i_limitation) {
1011                         oqaq->qaq_iunit_sz = oqaq->qaq_iunit_sz * cqs_factor;
1012                         i_limitation = oqaq->qaq_iunit_sz * mdt_num *
1013                                 shrink_qunit_limit;
1014                 }
1015
1016                 if (oqaq->qaq_iunit_sz > qctxt->lqc_iunit_sz)
1017                         oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz;
1018
1019                 /* shrink file qunit size */
1020                 while (ilimit < dquot->dq_dqb.dqb_curinodes
1021                        + i_limitation) {
1022                         do_div(oqaq->qaq_iunit_sz, cqs_factor);
1023                         i_limitation = oqaq->qaq_iunit_sz * mdt_num *
1024                                        shrink_qunit_limit;
1025                         if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit)
1026                                 break;
1027                 }
1028
1029                 if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit)
1030                         oqaq->qaq_iunit_sz = qctxt->lqc_cqs_least_iunit;
1031
1032                 if (iunit_curr_o != oqaq->qaq_iunit_sz)
1033                         QAQ_SET_ADJINO(oqaq);
1034
1035         }
1036
1037         if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit &&
1038             !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) {
1039                 oqaq->qaq_bunit_sz = 0;
1040                 oqaq->qaq_iunit_sz = 0;
1041                 QAQ_SET_ADJBLK(oqaq);
1042                 QAQ_SET_ADJINO(oqaq);
1043         }
1044
1045         QAQ_DEBUG(oqaq, "the oqaq computed\n");
1046
1047         RETURN(rc);
1048 }
1049
1050 static int mds_init_slave_ilimits(struct obd_device *obd,
1051                                   struct obd_quotactl *oqctl, int set,
1052                                   struct quota_adjust_qunit *oqaq)
1053 {
1054         /* XXX: for file limits only adjust local now */
1055         struct obd_device_target *obt = &obd->u.obt;
1056         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
1057         unsigned int uid = 0, gid = 0;
1058         struct obd_quotactl *ioqc = NULL;
1059         int flag;
1060         int rc;
1061         ENTRY;
1062
1063         /* if we are going to set zero limit, needn't init slaves */
1064         if (!oqctl->qc_dqblk.dqb_ihardlimit && !oqctl->qc_dqblk.dqb_isoftlimit &&
1065             set)
1066                 RETURN(0);
1067
1068         OBD_ALLOC_PTR(ioqc);
1069         if (!ioqc)
1070                 RETURN(-ENOMEM);
1071
1072         flag = oqctl->qc_dqblk.dqb_ihardlimit ||
1073                oqctl->qc_dqblk.dqb_isoftlimit || set;
1074         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
1075         ioqc->qc_id = oqctl->qc_id;
1076         ioqc->qc_type = oqctl->qc_type;
1077         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
1078         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
1079
1080         if (QAQ_IS_ADJINO(oqaq)) {
1081                 /* adjust the mds slave's inode qunit size */
1082                 rc = quota_adjust_slave_lqs(oqaq, qctxt);
1083                 if (rc < 0)
1084                         CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \
1085                                failed! (rc:%d)\n", rc);
1086         }
1087
1088         /* set local limit to MIN_QLIMIT */
1089         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
1090         if (rc)
1091                 GOTO(out, rc);
1092
1093         /* trigger local qunit pre-acquire */
1094         if (oqctl->qc_type == USRQUOTA)
1095                 uid = oqctl->qc_id;
1096         else
1097                 gid = oqctl->qc_id;
1098
1099         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
1100         if (rc == -EDQUOT || rc == -EBUSY) {
1101                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
1102                 rc = 0;
1103         }
1104         if (rc) {
1105                 CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n",
1106                        rc);
1107                 GOTO(out, rc);
1108         }
1109         /* FIXME initialize all slaves in CMD */
1110         EXIT;
1111 out:
1112         if (ioqc)
1113                 OBD_FREE_PTR(ioqc);
1114         return rc;
1115 }
1116
1117 static int mds_init_slave_blimits(struct obd_device *obd,
1118                                   struct obd_quotactl *oqctl, int set,
1119                                   struct quota_adjust_qunit *oqaq)
1120 {
1121         struct obd_device_target *obt = &obd->u.obt;
1122         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
1123         struct mds_obd *mds = &obd->u.mds;
1124         struct obd_quotactl *ioqc;
1125         unsigned int uid = 0, gid = 0;
1126         int rc, rc1 = 0;
1127         int flag;
1128         ENTRY;
1129
1130         /* if we are going to set zero limit, needn't init slaves */
1131         if (!oqctl->qc_dqblk.dqb_bhardlimit && !oqctl->qc_dqblk.dqb_bsoftlimit &&
1132             set)
1133                 RETURN(0);
1134
1135         OBD_ALLOC_PTR(ioqc);
1136         if (!ioqc)
1137                 RETURN(-ENOMEM);
1138
1139         flag = oqctl->qc_dqblk.dqb_bhardlimit ||
1140                oqctl->qc_dqblk.dqb_bsoftlimit || set;
1141         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
1142         ioqc->qc_id = oqctl->qc_id;
1143         ioqc->qc_type = oqctl->qc_type;
1144         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
1145         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
1146         if (QAQ_IS_ADJBLK(oqaq)) {
1147                 /* adjust the mds slave's block qunit size */
1148                 rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
1149                 if (rc1 < 0)
1150                         CERROR("adjust mds slave's block qunit size failed!"
1151                                "(rc:%d)\n", rc1);
1152         }
1153
1154         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
1155         if (rc)
1156                 GOTO(out, rc);
1157
1158         /* trigger local qunit pre-acquire */
1159         if (oqctl->qc_type == USRQUOTA)
1160                 uid = oqctl->qc_id;
1161         else
1162                 gid = oqctl->qc_id;
1163
1164         /* initialize all slave's limit */
1165         rc = obd_quotactl(mds->mds_osc_exp, ioqc);
1166
1167         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
1168         if (rc == -EDQUOT || rc == -EBUSY) {
1169                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
1170                 rc = 0;
1171         }
1172         if (rc) {
1173                 CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
1174                 GOTO(out, rc);
1175         }
1176
1177         /* adjust all slave's qunit size when setting quota
1178          * this is will create a lqs for every ost, which will present
1179          * certain uid/gid is set quota or not */
1180         QAQ_SET_ADJBLK(oqaq);
1181         rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq);
1182
1183         EXIT;
1184 out:
1185         OBD_FREE_PTR(ioqc);
1186         return rc;
1187 }
1188
1189 int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
1190 {
1191         struct mds_obd *mds = &obd->u.mds;
1192         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
1193         struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp);
1194         struct lov_obd *lov = &lov_obd->u.lov;
1195         struct quota_adjust_qunit *oqaq = NULL;
1196         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1197         __u64 ihardlimit, isoftlimit, bhardlimit, bsoftlimit;
1198         time_t btime, itime;
1199         struct lustre_dquot *dquot;
1200         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
1201         int set, rc, flag = 0;
1202         ENTRY;
1203
1204         OBD_ALLOC_PTR(oqaq);
1205         if (!oqaq)
1206                 RETURN(-ENOMEM);
1207         down(&mds->mds_qonoff_sem);
1208         init_oqaq(oqaq, qctxt, oqctl->qc_id, oqctl->qc_type);
1209
1210         if (qinfo->qi_files[oqctl->qc_type] == NULL)
1211                 GOTO(out_sem, rc = -ESRCH);
1212
1213         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
1214         if (IS_ERR(dquot))
1215                 GOTO(out_sem, rc = PTR_ERR(dquot));
1216         DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n");
1217         QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n");
1218
1219         down(&dquot->dq_sem);
1220
1221         if (dquot->dq_status) {
1222                 up(&dquot->dq_sem);
1223                 lustre_dqput(dquot);
1224                 GOTO(out_sem, rc = -EBUSY);
1225         }
1226         dquot->dq_status |= DQ_STATUS_SET;
1227
1228         ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
1229         isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
1230         bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
1231         bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
1232         btime = dquot->dq_dqb.dqb_btime;
1233         itime = dquot->dq_dqb.dqb_itime;
1234
1235         if (dqblk->dqb_valid & QIF_BTIME)
1236                 dquot->dq_dqb.dqb_btime = dqblk->dqb_btime;
1237         if (dqblk->dqb_valid & QIF_ITIME)
1238                 dquot->dq_dqb.dqb_itime = dqblk->dqb_itime;
1239
1240         if (dqblk->dqb_valid & QIF_BLIMITS) {
1241                 dquot->dq_dqb.dqb_bhardlimit = dqblk->dqb_bhardlimit;
1242                 dquot->dq_dqb.dqb_bsoftlimit = dqblk->dqb_bsoftlimit;
1243                 /* clear usage (limit pool) */
1244                 if (!dquot->dq_dqb.dqb_bhardlimit &&
1245                     !dquot->dq_dqb.dqb_bsoftlimit)
1246                         dquot->dq_dqb.dqb_curspace = 0;
1247
1248                 /* clear grace time */
1249                 if (!dqblk->dqb_bsoftlimit ||
1250                     toqb(dquot->dq_dqb.dqb_curspace) <= dqblk->dqb_bsoftlimit)
1251                         dquot->dq_dqb.dqb_btime = 0;
1252                 /* set grace only if user hasn't provided his own */
1253                 else if (!(dqblk->dqb_valid & QIF_BTIME))
1254                         dquot->dq_dqb.dqb_btime = cfs_time_current_sec() +
1255                                 qinfo->qi_info[dquot->dq_type].dqi_bgrace;
1256
1257                 flag |= LQUOTA_FLAGS_ADJBLK;
1258         }
1259
1260         if (dqblk->dqb_valid & QIF_ILIMITS) {
1261                 dquot->dq_dqb.dqb_ihardlimit = dqblk->dqb_ihardlimit;
1262                 dquot->dq_dqb.dqb_isoftlimit = dqblk->dqb_isoftlimit;
1263                 /* clear usage (limit pool) */
1264                 if (!dquot->dq_dqb.dqb_ihardlimit &&
1265                     !dquot->dq_dqb.dqb_isoftlimit)
1266                         dquot->dq_dqb.dqb_curinodes = 0;
1267
1268                 if (!dqblk->dqb_isoftlimit ||
1269                     dquot->dq_dqb.dqb_curinodes <= dqblk->dqb_isoftlimit)
1270                         dquot->dq_dqb.dqb_itime = 0;
1271                 else if (!(dqblk->dqb_valid & QIF_ITIME))
1272                         dquot->dq_dqb.dqb_itime = cfs_time_current_sec() +
1273                                 qinfo->qi_info[dquot->dq_type].dqi_igrace;
1274
1275                 flag |= LQUOTA_FLAGS_ADJINO;
1276         }
1277         QAQ_DEBUG(oqaq, "before dquot_create_oqaq\n");
1278         rc = dquot_create_oqaq(qctxt, dquot, lov->desc.ld_tgt_count, 1,
1279                                flag, oqaq);
1280         QAQ_DEBUG(oqaq, "after dquot_create_oqaq\n");
1281         if (rc < 0)
1282                 CDEBUG(D_QUOTA, "adjust qunit size failed! (rc:%d)\n", rc);
1283
1284
1285         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1286
1287         up(&dquot->dq_sem);
1288
1289         if (rc) {
1290                 CERROR("set limit failed! (rc:%d)\n", rc);
1291                 goto out;
1292         }
1293
1294         up(&mds->mds_qonoff_sem);
1295         if (dqblk->dqb_valid & QIF_ILIMITS) {
1296                 set = !(ihardlimit || isoftlimit);
1297                 down(&dquot->dq_sem);
1298                 dquot->dq_dqb.dqb_curinodes = 0;
1299                 up(&dquot->dq_sem);
1300                 rc = mds_init_slave_ilimits(obd, oqctl, set, oqaq);
1301                 if (rc) {
1302                         CERROR("init slave ilimits failed! (rc:%d)\n", rc);
1303                         goto revoke_out;
1304                 }
1305         }
1306
1307         if (dqblk->dqb_valid & QIF_BLIMITS) {
1308                 set = !(bhardlimit || bsoftlimit);
1309                 down(&dquot->dq_sem);
1310                 dquot->dq_dqb.dqb_curspace = 0;
1311                 up(&dquot->dq_sem);
1312                 rc = mds_init_slave_blimits(obd, oqctl, set, oqaq);
1313                 if (rc) {
1314                         CERROR("init slave blimits failed! (rc:%d)\n", rc);
1315                         goto revoke_out;
1316                 }
1317         }
1318         down(&mds->mds_qonoff_sem);
1319
1320 revoke_out:
1321         if (rc) {
1322                 /* cancel previous setting */
1323                 down(&dquot->dq_sem);
1324                 dquot->dq_dqb.dqb_ihardlimit = ihardlimit;
1325                 dquot->dq_dqb.dqb_isoftlimit = isoftlimit;
1326                 dquot->dq_dqb.dqb_bhardlimit = bhardlimit;
1327                 dquot->dq_dqb.dqb_bsoftlimit = bsoftlimit;
1328                 dquot->dq_dqb.dqb_btime = btime;
1329                 dquot->dq_dqb.dqb_itime = itime;
1330                 fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1331                 up(&dquot->dq_sem);
1332         }
1333 out:
1334         down(&dquot->dq_sem);
1335         dquot->dq_status &= ~DQ_STATUS_SET;
1336         up(&dquot->dq_sem);
1337         lustre_dqput(dquot);
1338         EXIT;
1339 out_sem:
1340         up(&mds->mds_qonoff_sem);
1341
1342         if (oqaq)
1343                 OBD_FREE_PTR(oqaq);
1344
1345         return rc;
1346 }
1347
1348 static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl)
1349 {
1350         struct obd_quotactl *soqc;
1351         struct lvfs_run_ctxt saved;
1352         int rc, rc1;
1353         ENTRY;
1354
1355         OBD_ALLOC_PTR(soqc);
1356         if (!soqc)
1357                 RETURN(-ENOMEM);
1358
1359         soqc->qc_cmd = Q_GETOQUOTA;
1360         soqc->qc_id = oqctl->qc_id;
1361         soqc->qc_type = oqctl->qc_type;
1362
1363         rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc);
1364
1365         oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
1366
1367         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1368         soqc->qc_dqblk.dqb_curspace = 0;
1369         rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc);
1370         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1371
1372         oqctl->qc_dqblk.dqb_curinodes += soqc->qc_dqblk.dqb_curinodes;
1373         if (!rc1)
1374                 oqctl->qc_dqblk.dqb_valid |= QIF_INODES;
1375         oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace;
1376         if (!rc && !rc1)
1377                 oqctl->qc_dqblk.dqb_valid |= QIF_USAGE;
1378
1379         OBD_FREE_PTR(soqc);
1380
1381         if (!rc)
1382                 rc = rc1;
1383         RETURN(rc);
1384 }
1385
1386 int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
1387 {
1388         struct mds_obd *mds = &obd->u.mds;
1389         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1390         struct lustre_dquot *dquot;
1391         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
1392         int rc;
1393         ENTRY;
1394
1395         down(&mds->mds_qonoff_sem);
1396         dqblk->dqb_valid = 0;
1397         if (qinfo->qi_files[oqctl->qc_type] == NULL)
1398                 GOTO(out, rc = -ESRCH);
1399
1400         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
1401         if (IS_ERR(dquot))
1402                 GOTO(out, rc = PTR_ERR(dquot));
1403
1404         down(&dquot->dq_sem);
1405         dqblk->dqb_ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
1406         dqblk->dqb_isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
1407         dqblk->dqb_bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
1408         dqblk->dqb_bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
1409         dqblk->dqb_btime = dquot->dq_dqb.dqb_btime;
1410         dqblk->dqb_itime = dquot->dq_dqb.dqb_itime;
1411         dqblk->dqb_valid |= QIF_LIMITS | QIF_TIMES;
1412         up(&dquot->dq_sem);
1413
1414         lustre_dqput(dquot);
1415
1416         /* the usages in admin quota file is inaccurate */
1417         dqblk->dqb_curinodes = 0;
1418         dqblk->dqb_curspace = 0;
1419         rc = mds_get_space(obd, oqctl);
1420         EXIT;
1421 out:
1422         up(&mds->mds_qonoff_sem);
1423         return rc;
1424 }
1425
1426 int mds_get_obd_quota(struct obd_device *obd, struct obd_quotactl *oqctl)
1427 {
1428         struct lvfs_run_ctxt saved;
1429         int rc;
1430         ENTRY;
1431
1432         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1433         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
1434         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1435
1436         RETURN(rc);
1437 }
1438
1439
1440 /* FIXME we only recovery block limit by now, need recovery inode
1441  * limits also after CMD involved in */
1442 static int 
1443 dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type)
1444 {
1445         struct mds_obd *mds = &obd->u.mds;
1446         struct lustre_quota_info *qinfo= &obd->u.mds.mds_quota_info;
1447         struct lustre_dquot *dquot;
1448         struct obd_quotactl *qctl;
1449         __u64 total_limits = 0;
1450         int rc;
1451         ENTRY;
1452
1453         OBD_ALLOC_PTR(qctl);
1454         if (qctl == NULL)
1455                 RETURN(-ENOMEM);
1456
1457         dquot = lustre_dqget(obd, qinfo, id, type);
1458         if (IS_ERR(dquot)) {
1459                 CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot));
1460                 OBD_FREE_PTR(qctl);
1461                 RETURN(PTR_ERR(dquot));
1462         }
1463
1464         down(&dquot->dq_sem);
1465
1466         /* don't recovery the dquot without limits or under setting */
1467         if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) ||
1468             dquot->dq_status)
1469                 GOTO(skip, rc = 0);
1470         dquot->dq_status |= DQ_STATUS_RECOVERY;
1471
1472         up(&dquot->dq_sem);
1473
1474         /* get real bhardlimit from all slaves. */
1475         qctl->qc_cmd = Q_GETOQUOTA;
1476         qctl->qc_type = type;
1477         qctl->qc_id = id;
1478         qctl->qc_stat = QUOTA_RECOVERING;
1479         rc = obd_quotactl(obd->u.mds.mds_osc_exp, qctl);
1480         if (rc)
1481                 GOTO(out, rc);
1482         total_limits = qctl->qc_dqblk.dqb_bhardlimit;
1483
1484         /* get real bhardlimit from master */
1485         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, qctl);
1486         if (rc)
1487                 GOTO(out, rc);
1488         total_limits += qctl->qc_dqblk.dqb_bhardlimit;
1489
1490         /* amend the usage of the administrative quotafile */
1491         down(&mds->mds_qonoff_sem);
1492         down(&dquot->dq_sem);
1493
1494         dquot->dq_dqb.dqb_curspace = total_limits << QUOTABLOCK_BITS;
1495
1496         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1497         if (rc)
1498                 CERROR("write dquot failed! (rc:%d)\n", rc);
1499
1500         up(&dquot->dq_sem);
1501         up(&mds->mds_qonoff_sem);
1502         EXIT;
1503 out:
1504         down(&dquot->dq_sem);
1505         dquot->dq_status &= ~DQ_STATUS_RECOVERY;
1506 skip:
1507         up(&dquot->dq_sem);
1508
1509         lustre_dqput(dquot);
1510         OBD_FREE_PTR(qctl);
1511         return rc;
1512 }
1513
1514 struct qmaster_recov_thread_data {
1515         struct obd_device *obd;
1516         struct completion comp;
1517 };
1518
1519 static int qmaster_recovery_main(void *arg)
1520 {
1521         struct qmaster_recov_thread_data *data = arg;
1522         struct obd_device *obd = data->obd;
1523         int rc = 0;
1524         unsigned short type;
1525         ENTRY;
1526
1527         ptlrpc_daemonize("qmaster_recovd");
1528
1529         complete(&data->comp);
1530
1531         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
1532                 struct mds_obd *mds = &obd->u.mds;
1533                 struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1534                 struct list_head id_list;
1535                 struct dquot_id *dqid, *tmp;
1536
1537                 down(&mds->mds_qonoff_sem);
1538                 if (qinfo->qi_files[type] == NULL) {
1539                         up(&mds->mds_qonoff_sem);
1540                         continue;
1541                 }
1542                 INIT_LIST_HEAD(&id_list);
1543                 rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type,
1544                                  &id_list);
1545                 up(&mds->mds_qonoff_sem);
1546
1547                 if (rc)
1548                         CERROR("error get ids from admin quotafile.(%d)\n", rc);
1549
1550                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
1551                         list_del_init(&dqid->di_link);
1552                         if (rc)
1553                                 goto free;
1554
1555                         rc = dquot_recovery(obd, dqid->di_id, type);
1556                         if (rc)
1557                                 CERROR("qmaster recovery failed! (id:%d type:%d"
1558                                        " rc:%d)\n", dqid->di_id, type, rc);
1559 free:
1560                         kfree(dqid);
1561                 }
1562         }
1563         RETURN(rc);
1564 }
1565
1566 int mds_quota_recovery(struct obd_device *obd)
1567 {
1568         struct lov_obd *lov = &obd->u.mds.mds_osc_obd->u.lov;
1569         struct qmaster_recov_thread_data data;
1570         int rc = 0;
1571         ENTRY;
1572
1573         mutex_down(&lov->lov_lock);
1574         if (lov->desc.ld_tgt_count != lov->desc.ld_active_tgt_count) {
1575                 CWARN("Not all osts are active, abort quota recovery\n");
1576                 mutex_up(&lov->lov_lock);
1577                 RETURN(rc);
1578         }
1579         mutex_up(&lov->lov_lock);
1580
1581         data.obd = obd;
1582         init_completion(&data.comp);
1583
1584         rc = kernel_thread(qmaster_recovery_main, &data, CLONE_VM|CLONE_FILES);
1585         if (rc < 0)
1586                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
1587
1588         wait_for_completion(&data.comp);
1589         RETURN(rc);
1590 }