Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / quota / quota_master.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/quota/quota_master.c
37  *
38  * Lustre Quota Master request handler
39  *
40  * Author: Niu YaWei <niu@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/version.h>
50 #include <linux/fs.h>
51 #include <asm/unistd.h>
52 #include <linux/slab.h>
53 #include <linux/quotaops.h>
54 #include <linux/module.h>
55 #include <linux/init.h>
56 #include <linux/quota.h>
57
58 #include <obd_class.h>
59 #include <lustre_quota.h>
60 #include <lustre_fsfilt.h>
61 #include <lustre_mds.h>
62
63 #include "quota_internal.h"
64
65 /* lock ordering: 
66  * mds->mds_qonoff_sem > dquot->dq_sem */
67 static struct list_head lustre_dquot_hash[NR_DQHASH];
68 static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED;
69
70 cfs_mem_cache_t *lustre_dquot_cachep;
71
72 int lustre_dquot_init(void)
73 {
74         int i;
75         ENTRY;
76
77         LASSERT(lustre_dquot_cachep == NULL);
78         lustre_dquot_cachep = cfs_mem_cache_create("lustre_dquot_cache",
79                                                    sizeof(struct lustre_dquot),
80                                                    0, 0);
81         if (!lustre_dquot_cachep)
82                 return (-ENOMEM);
83
84         for (i = 0; i < NR_DQHASH; i++) {
85                 CFS_INIT_LIST_HEAD(lustre_dquot_hash + i);
86         }
87         RETURN(0);
88 }
89
90 void lustre_dquot_exit(void)
91 {
92         int i;
93         ENTRY;
94         /* FIXME cleanup work ?? */
95
96         for (i = 0; i < NR_DQHASH; i++) {
97                 LASSERT(list_empty(lustre_dquot_hash + i));
98         }
99         if (lustre_dquot_cachep) {
100                 int rc;
101                 rc = cfs_mem_cache_destroy(lustre_dquot_cachep);
102                 LASSERTF(rc == 0,"couldn't destroy lustre_dquot_cachep slab\n");
103                 lustre_dquot_cachep = NULL;
104         }
105         EXIT;
106 }
107
108 static inline int
109 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
110              __attribute__((__const__));
111
112 static inline int
113 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
114 {
115         unsigned long tmp = ((unsigned long)info >> L1_CACHE_SHIFT) ^ id;
116         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
117         return tmp;
118 }
119
120 /* caller must hold dquot_hash_lock */
121 static struct lustre_dquot *find_dquot(int hashent,
122                                        struct lustre_quota_info *lqi, qid_t id,
123                                        int type)
124 {
125         struct lustre_dquot *dquot;
126         ENTRY;
127
128         LASSERT_SPIN_LOCKED(&dquot_hash_lock);
129         list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) {
130                 if (dquot->dq_info == lqi &&
131                     dquot->dq_id == id && dquot->dq_type == type)
132                         RETURN(dquot);
133         }
134         RETURN(NULL);
135 }
136
137 static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi,
138                                         qid_t id, int type)
139 {
140         struct lustre_dquot *dquot = NULL;
141         ENTRY;
142
143         OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot));
144         if (dquot == NULL)
145                 RETURN(NULL);
146
147         CFS_INIT_LIST_HEAD(&dquot->dq_hash);
148         init_mutex_locked(&dquot->dq_sem);
149         dquot->dq_refcnt = 1;
150         dquot->dq_info = lqi;
151         dquot->dq_id = id;
152         dquot->dq_type = type;
153         dquot->dq_status = DQ_STATUS_AVAIL;
154
155         RETURN(dquot);
156 }
157
158 static void free_dquot(struct lustre_dquot *dquot)
159 {
160         OBD_SLAB_FREE(dquot, lustre_dquot_cachep, sizeof(*dquot));
161 }
162
163 static void insert_dquot_nolock(struct lustre_dquot *dquot)
164 {
165         struct list_head *head = lustre_dquot_hash +
166             dquot_hashfn(dquot->dq_info, dquot->dq_id, dquot->dq_type);
167         LASSERT(list_empty(&dquot->dq_hash));
168         list_add(&dquot->dq_hash, head);
169 }
170
171 static void remove_dquot_nolock(struct lustre_dquot *dquot)
172 {
173         LASSERT(!list_empty(&dquot->dq_hash));
174         list_del_init(&dquot->dq_hash);
175 }
176
177 static void lustre_dqput(struct lustre_dquot *dquot)
178 {
179         ENTRY;
180         spin_lock(&dquot_hash_lock);
181         LASSERT(dquot->dq_refcnt);
182         dquot->dq_refcnt--;
183         if (!dquot->dq_refcnt) {
184                 remove_dquot_nolock(dquot);
185                 free_dquot(dquot);
186         }
187         spin_unlock(&dquot_hash_lock);
188         EXIT;
189 }
190
191 static struct lustre_dquot *lustre_dqget(struct obd_device *obd,
192                                          struct lustre_quota_info *lqi,
193                                          qid_t id, int type)
194 {
195         unsigned int hashent = dquot_hashfn(lqi, id, type);
196         struct lustre_dquot *dquot, *empty;
197         ENTRY;
198
199         if ((empty = alloc_dquot(lqi, id, type)) == NULL)
200                 RETURN(ERR_PTR(-ENOMEM));
201         
202         spin_lock(&dquot_hash_lock);
203         if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) {
204                 dquot->dq_refcnt++;
205                 spin_unlock(&dquot_hash_lock);
206                 free_dquot(empty);
207         } else {
208                 int rc;
209
210                 dquot = empty;
211                 insert_dquot_nolock(dquot);
212                 spin_unlock(&dquot_hash_lock);
213
214                 rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT);
215                 up(&dquot->dq_sem);
216                 if (rc) {
217                         CERROR("can't read dquot from admin quotafile! "
218                                "(rc:%d)\n", rc);
219                         lustre_dqput(dquot);
220                         RETURN(ERR_PTR(rc));
221                 }
222
223         }
224
225         LASSERT(dquot);
226         RETURN(dquot);
227 }
228
229 int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc)
230 {
231         struct mds_obd *mds = &obd->u.mds;
232         struct lustre_quota_info *info = &mds->mds_quota_info;
233         struct lustre_dquot *dquot = NULL;
234         __u64 *usage = NULL;
235         __u32 hlimit = 0, slimit = 0;
236         __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP;
237         __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1;
238         time_t *time = NULL;
239         unsigned int grace = 0;
240         int rc = 0;
241         ENTRY;
242
243         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ))
244                 RETURN(-EIO);
245
246         dquot = lustre_dqget(obd, info, qdata->qd_id, qdata_type);
247         if (IS_ERR(dquot))
248                 RETURN(PTR_ERR(dquot));
249
250         DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n");
251         QINFO_DEBUG(dquot->dq_info, "get dquot in dqadq_handler\n");
252
253         down(&mds->mds_qonoff_sem);
254         down(&dquot->dq_sem);
255
256         if (dquot->dq_status & DQ_STATUS_RECOVERY) {
257                 DQUOT_DEBUG(dquot, "this dquot is under recovering.\n");
258                 GOTO(out, rc = -EBUSY);
259         }
260
261         if (is_blk) {
262                 grace = info->qi_info[qdata_type].dqi_bgrace;
263                 usage = &dquot->dq_dqb.dqb_curspace;
264                 hlimit = dquot->dq_dqb.dqb_bhardlimit;
265                 slimit = dquot->dq_dqb.dqb_bsoftlimit;
266                 time = &dquot->dq_dqb.dqb_btime;
267         } else {
268                 grace = info->qi_info[qdata_type].dqi_igrace;
269                 usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes;
270                 hlimit = dquot->dq_dqb.dqb_ihardlimit;
271                 slimit = dquot->dq_dqb.dqb_isoftlimit;
272                 time = &dquot->dq_dqb.dqb_itime;
273         }
274
275         /* if the quota limit in admin quotafile is zero, we just inform
276          * slave to clear quota limit with zero qd_count */
277         if (hlimit == 0 && slimit == 0) {
278                 qdata->qd_count = 0;
279                 GOTO(out, rc);
280         }
281
282         switch (opc) {
283         case QUOTA_DQACQ:
284                 if (hlimit && 
285                     QUSG(*usage + qdata->qd_count, is_blk) > hlimit)
286                         GOTO(out, rc = -EDQUOT);
287
288                 if (slimit &&
289                     QUSG(*usage + qdata->qd_count, is_blk) > slimit) {
290                         if (*time && cfs_time_current_sec() >= *time)
291                                 GOTO(out, rc = -EDQUOT);
292                         else if (!*time)
293                                 *time = cfs_time_current_sec() + grace;
294                 }
295
296                 *usage += qdata->qd_count;
297                 break;
298         case QUOTA_DQREL:
299                 /* The usage in administrative file might be incorrect before
300                  * recovery done */
301                 if (*usage - qdata->qd_count < 0)
302                         *usage = 0;
303                 else
304                         *usage -= qdata->qd_count;
305
306                 /* (usage <= soft limit) but not (usage < soft limit) */
307                 if (!slimit || QUSG(*usage, is_blk) <= slimit)
308                         *time = 0;
309                 break;
310         default:
311                 LBUG();
312         }
313
314         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
315         EXIT;
316 out:
317         up(&dquot->dq_sem);
318         up(&mds->mds_qonoff_sem);
319         lustre_dqput(dquot);
320         return rc;
321 }
322
323 int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[],
324                      unsigned int qpids[], int rc, int opc)
325 {
326         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
327         int rc2 = 0;
328         ENTRY;
329
330         if (rc && rc != -EDQUOT)
331                 RETURN(0);
332
333         switch (opc) {
334         case FSFILT_OP_RENAME:
335                 /* acquire/release block quota on owner of original parent */
336                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0);
337                 /* fall-through */
338         case FSFILT_OP_SETATTR:
339                 /* acquire/release file quota on original owner */
340                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0);
341                 /* fall-through */
342         case FSFILT_OP_CREATE:
343         case FSFILT_OP_UNLINK:
344                 /* acquire/release file/block quota on owner of child (or current owner) */
345                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0);
346                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
347                 /* acquire/release block quota on owner of parent (or original owner) */
348                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
349                 break;
350         default:
351                 LBUG();
352                 break;
353         }
354
355         if (rc2)
356                 CERROR("mds adjust qunit failed! (opc:%d rc:%d)\n", opc, rc2);
357         RETURN(0);
358 }
359
360 int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[],
361                         unsigned int qpids[], int rc, int opc)
362 {
363         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
364         int rc2 = 0;
365         ENTRY;
366
367         if (rc && rc != -EDQUOT)
368                 RETURN(0);
369
370         switch (opc) {
371         case FSFILT_OP_SETATTR:
372                 /* acquire/release block quota on original & current owner */
373                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
374                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
375                 break;
376         case FSFILT_OP_UNLINK:
377                 /* release block quota on this owner */
378         case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */
379                 /* acquire block quota on this owner */
380                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
381                 break;
382         default:
383                 LBUG();
384                 break;
385         }
386
387         if (rc || rc2)
388                 CERROR("filter adjust qunit failed! (opc:%d rc%d)\n",
389                        opc, rc ?: rc2);
390         RETURN(0);
391 }
392
393 #define LUSTRE_ADMIN_QUOTAFILES {\
394         "admin_quotafile.usr",  /* user admin quotafile */\
395         "admin_quotafile.grp"   /* group admin quotafile */\
396 }
397 static const char prefix[] = "OBJECTS/";
398
399 int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl)
400 {
401         struct mds_obd *mds = &obd->u.mds;
402         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
403         const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES;
404         struct lvfs_run_ctxt saved;
405         char name[64];
406         int i, rc = 0;
407         struct dentry *dparent = mds->mds_objects_dir;
408         struct inode *iparent = dparent->d_inode;
409         ENTRY;
410
411         LASSERT(iparent);
412         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
413
414         down(&mds->mds_qonoff_sem);
415         for (i = 0; i < MAXQUOTAS; i++) {
416                 struct dentry *de;
417                 struct file *fp;
418
419                 if (!Q_TYPESET(oqctl, i))
420                         continue;
421
422                 /* quota file has been opened ? */
423                 if (qinfo->qi_files[i]) {
424                         CWARN("init %s admin quotafile while quota on.\n",
425                               i == USRQUOTA ? "user" : "group");
426                         continue;
427                 }
428
429                 /* lookup quota file */
430                 rc = 0;
431                 LOCK_INODE_MUTEX(iparent);
432                 de = lookup_one_len(quotafiles[i], dparent,
433                                     strlen(quotafiles[i]));
434                 UNLOCK_INODE_MUTEX(iparent);
435                 if (IS_ERR(de) || de->d_inode == NULL || 
436                     !S_ISREG(de->d_inode->i_mode))
437                         rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT;
438                 if (!IS_ERR(de))
439                         dput(de);
440
441                 if (rc && rc != -ENOENT) {
442                         CERROR("error lookup quotafile %s! (rc:%d)\n",
443                                name, rc);
444                         break;
445                 } else if (!rc) {
446                         continue;
447                 }
448
449                 LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name));
450                 sprintf(name, "%s%s", prefix, quotafiles[i]);
451
452                 LASSERT(rc == -ENOENT);
453                 /* create quota file */
454                 fp = filp_open(name, O_CREAT | O_EXCL, 0644);
455                 if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) {
456                         rc = PTR_ERR(fp);
457                         CERROR("error creating admin quotafile %s (rc:%d)\n",
458                                name, rc);
459                         break;
460                 }
461
462                 qinfo->qi_files[i] = fp;
463                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO);
464                 filp_close(fp, 0);
465                 qinfo->qi_files[i] = NULL;
466
467                 if (rc) {
468                         CERROR("error init %s admin quotafile! (rc:%d)\n",
469                                i == USRQUOTA ? "user" : "group", rc);
470                         break;
471                 }
472         }
473         up(&mds->mds_qonoff_sem);
474
475         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
476         RETURN(rc);
477 }
478
479 static int close_quota_files(struct obd_quotactl *oqctl, 
480                              struct lustre_quota_info *qinfo)
481 {
482         int i, rc = 0;
483         ENTRY;
484
485         for (i = 0; i < MAXQUOTAS; i++) {
486                 if (!Q_TYPESET(oqctl, i))
487                         continue;
488                 if (qinfo->qi_files[i] == NULL) {
489                         rc = -ESRCH;
490                         continue;
491                 }
492                 filp_close(qinfo->qi_files[i], 0);
493                 qinfo->qi_files[i] = NULL;
494         }
495         RETURN(rc);
496 }
497
498 int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
499 {
500         struct mds_obd *mds = &obd->u.mds;
501         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
502         const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES;
503         char name[64];
504         int i, rc = 0;
505         struct inode *iparent = mds->mds_objects_dir->d_inode;
506         ENTRY;
507
508         LASSERT(iparent);
509
510         /* open admin quota files and read quotafile info */
511         for (i = 0; i < MAXQUOTAS; i++) {
512                 struct file *fp;
513
514                 if (!Q_TYPESET(oqctl, i))
515                         continue;
516
517                 LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name));
518                 sprintf(name, "%s%s", prefix, quotafiles[i]);
519
520                 if (qinfo->qi_files[i] != NULL) {
521                         rc = -EBUSY;
522                         break;
523                 }
524
525                 fp = filp_open(name, O_RDWR | O_EXCL, 0644);
526                 if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) {
527                         rc = PTR_ERR(fp);
528                         CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
529                                "open %s failed! (rc:%d)\n", name, rc);
530                         break;
531                 }
532                 qinfo->qi_files[i] = fp;
533
534                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_RD_INFO);
535                 if (rc) {
536                         CERROR("error read quotainfo of %s! (rc:%d)\n",
537                                name, rc);
538                         break;
539                 }
540         }
541
542         if (rc && rc != -EBUSY)
543                 close_quota_files(oqctl, qinfo);
544
545         RETURN(rc);
546 }
547
548 static int mds_admin_quota_off(struct obd_device *obd, 
549                                struct obd_quotactl *oqctl)
550 {
551         struct mds_obd *mds = &obd->u.mds;
552         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
553         int rc;
554         ENTRY;
555
556         /* close admin quota files */
557         rc = close_quota_files(oqctl, qinfo);
558         RETURN(rc);
559 }
560
561 int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
562 {
563         struct mds_obd *mds = &obd->u.mds;
564         struct obd_device_target *obt = &obd->u.obt;
565         struct lvfs_run_ctxt saved;
566         int rc;
567         ENTRY;
568
569         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
570                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
571                 atomic_inc(&obt->obt_quotachecking);
572                 RETURN(-EBUSY);
573         }
574
575         down(&mds->mds_qonoff_sem);
576         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
577         rc = mds_admin_quota_on(obd, oqctl);
578         if (rc)
579                 goto out;
580
581         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
582         if (rc)
583                 goto out;
584
585         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
586         if (!rc)
587                 obt->obt_qctxt.lqc_status = 1;
588 out:
589         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
590         up(&mds->mds_qonoff_sem);
591         atomic_inc(&obt->obt_quotachecking);
592         RETURN(rc);
593 }
594
595 int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl)
596 {
597         struct mds_obd *mds = &obd->u.mds;
598         struct obd_device_target *obt = &obd->u.obt;
599         struct lvfs_run_ctxt saved;
600         int rc, rc2;
601         ENTRY;
602
603         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
604                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
605                 atomic_inc(&obt->obt_quotachecking);
606                 RETURN(-EBUSY);
607         }
608
609         down(&mds->mds_qonoff_sem);
610         /* close admin quota files */
611         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
612         mds_admin_quota_off(obd, oqctl);
613
614         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
615         rc2 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
616         if (!rc2)
617                 obt->obt_qctxt.lqc_status = 0;
618
619         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
620         up(&mds->mds_qonoff_sem);
621         atomic_inc(&obt->obt_quotachecking);
622
623         RETURN(rc ?: rc2);
624 }
625
626 int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
627 {
628         struct mds_obd *mds = &obd->u.mds;
629         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
630         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
631         int rc;
632         ENTRY;
633
634         down(&mds->mds_qonoff_sem);
635         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
636                 rc = -ESRCH;
637                 goto out;
638         }
639
640         qinfo->qi_info[oqctl->qc_type].dqi_bgrace = dqinfo->dqi_bgrace;
641         qinfo->qi_info[oqctl->qc_type].dqi_igrace = dqinfo->dqi_igrace;
642         qinfo->qi_info[oqctl->qc_type].dqi_flags = dqinfo->dqi_flags;
643
644         rc = fsfilt_quotainfo(obd, qinfo, oqctl->qc_type, QFILE_WR_INFO);
645
646 out:
647         up(&mds->mds_qonoff_sem);
648         RETURN(rc);
649 }
650
651 int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
652 {
653         struct mds_obd *mds = &obd->u.mds;
654         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
655         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
656         int rc = 0;
657         ENTRY;
658
659         down(&mds->mds_qonoff_sem);
660         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
661                 rc = -ESRCH;
662                 goto out;
663         }
664
665         dqinfo->dqi_bgrace = qinfo->qi_info[oqctl->qc_type].dqi_bgrace;
666         dqinfo->dqi_igrace = qinfo->qi_info[oqctl->qc_type].dqi_igrace;
667         dqinfo->dqi_flags = qinfo->qi_info[oqctl->qc_type].dqi_flags;
668
669 out:
670         up(&mds->mds_qonoff_sem);
671         RETURN(rc);
672 }
673
674 static int mds_init_slave_ilimits(struct obd_device *obd,
675                                   struct obd_quotactl *oqctl, int set)
676 {
677         /* XXX: for file limits only adjust local now */
678         unsigned int uid = 0, gid = 0;
679         struct obd_quotactl *ioqc = NULL;
680         int flag;
681         int rc;
682         ENTRY;
683
684         /* if we are going to set zero limit, needn't init slaves */
685         if (!oqctl->qc_dqblk.dqb_ihardlimit && !oqctl->qc_dqblk.dqb_isoftlimit &&
686             set)
687                 RETURN(0);
688
689         OBD_ALLOC_PTR(ioqc);
690         if (!ioqc)
691                 RETURN(-ENOMEM);
692         
693         flag = oqctl->qc_dqblk.dqb_ihardlimit || 
694                oqctl->qc_dqblk.dqb_isoftlimit || set;
695         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
696         ioqc->qc_id = oqctl->qc_id;
697         ioqc->qc_type = oqctl->qc_type;
698         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
699         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
700
701         /* set local limit to MIN_QLIMIT */
702         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
703         if (rc)
704                 GOTO(out, rc);
705
706         /* trigger local qunit pre-acquire */
707         if (oqctl->qc_type == USRQUOTA)
708                 uid = oqctl->qc_id;
709         else
710                 gid = oqctl->qc_id;
711
712         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
713         if (rc) {
714                 CERROR("error mds adjust local file quota! (rc:%d)\n", rc);
715                 GOTO(out, rc);
716         }
717         /* FIXME initialize all slaves in CMD */
718         EXIT;
719 out:
720         if (ioqc)
721                 OBD_FREE_PTR(ioqc);
722         return rc;
723 }
724
725 static int mds_init_slave_blimits(struct obd_device *obd,
726                                   struct obd_quotactl *oqctl, int set)
727 {
728         struct mds_obd *mds = &obd->u.mds;
729         struct obd_quotactl *ioqc;
730         unsigned int uid = 0, gid = 0;
731         int flag;
732         int rc;
733         ENTRY;
734
735         /* if we are going to set zero limit, needn't init slaves */
736         if (!oqctl->qc_dqblk.dqb_bhardlimit && !oqctl->qc_dqblk.dqb_bsoftlimit &&
737             set)
738                 RETURN(0);
739
740         OBD_ALLOC_PTR(ioqc);
741         if (!ioqc)
742                 RETURN(-ENOMEM);
743
744         flag = oqctl->qc_dqblk.dqb_bhardlimit || 
745                oqctl->qc_dqblk.dqb_bsoftlimit || set;
746         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
747         ioqc->qc_id = oqctl->qc_id;
748         ioqc->qc_type = oqctl->qc_type;
749         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
750         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
751
752         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
753         if (rc)
754                 GOTO(out, rc);
755
756         /* trigger local qunit pre-acquire */
757         if (oqctl->qc_type == USRQUOTA)
758                 uid = oqctl->qc_id;
759         else
760                 gid = oqctl->qc_id;
761
762         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
763         if (rc) {
764                 CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
765                 GOTO(out, rc);
766         }
767
768         /* initialize all slave's limit */
769         rc = obd_quotactl(mds->mds_osc_exp, ioqc);
770         EXIT;
771 out:
772         OBD_FREE_PTR(ioqc);
773         return rc;
774 }
775
776 int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
777 {
778         struct mds_obd *mds = &obd->u.mds;
779         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
780         __u32 ihardlimit, isoftlimit, bhardlimit, bsoftlimit;
781         time_t btime, itime;
782         struct lustre_dquot *dquot;
783         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
784         int set, rc;
785         ENTRY;
786
787         down(&mds->mds_qonoff_sem);
788         if (qinfo->qi_files[oqctl->qc_type] == NULL)
789                 GOTO(out_sem, rc = -ESRCH);
790
791         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
792         if (IS_ERR(dquot))
793                 GOTO(out_sem, rc = PTR_ERR(dquot));
794         DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n");
795         QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n");
796
797         down(&dquot->dq_sem);
798
799         if (dquot->dq_status) {
800                 up(&dquot->dq_sem);
801                 lustre_dqput(dquot);
802                 GOTO(out_sem, rc = -EBUSY);
803         }
804         dquot->dq_status |= DQ_STATUS_SET;
805
806         ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
807         isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
808         bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
809         bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
810         btime = dquot->dq_dqb.dqb_btime;
811         itime = dquot->dq_dqb.dqb_itime;
812
813         if (dqblk->dqb_valid & QIF_BTIME)
814                 dquot->dq_dqb.dqb_btime = dqblk->dqb_btime;
815         if (dqblk->dqb_valid & QIF_ITIME)
816                 dquot->dq_dqb.dqb_itime = dqblk->dqb_itime;
817
818         if (dqblk->dqb_valid & QIF_BLIMITS) {
819                 dquot->dq_dqb.dqb_bhardlimit = dqblk->dqb_bhardlimit;
820                 dquot->dq_dqb.dqb_bsoftlimit = dqblk->dqb_bsoftlimit;
821                 /* clear usage (limit pool) */
822                 if (!dquot->dq_dqb.dqb_bhardlimit && 
823                     !dquot->dq_dqb.dqb_bsoftlimit)
824                         dquot->dq_dqb.dqb_curspace = 0;
825
826                 /* clear grace time */
827                 if (!dqblk->dqb_bsoftlimit || 
828                     toqb(dquot->dq_dqb.dqb_curspace) <= dqblk->dqb_bsoftlimit)
829                         dquot->dq_dqb.dqb_btime = 0;
830                 /* set grace only if user hasn't provided his own */
831                 else if (!(dqblk->dqb_valid & QIF_BTIME))
832                         dquot->dq_dqb.dqb_btime = cfs_time_current_sec() + 
833                                 qinfo->qi_info[dquot->dq_type].dqi_bgrace;
834         }
835
836         if (dqblk->dqb_valid & QIF_ILIMITS) {
837                 dquot->dq_dqb.dqb_ihardlimit = dqblk->dqb_ihardlimit;
838                 dquot->dq_dqb.dqb_isoftlimit = dqblk->dqb_isoftlimit;
839                 /* clear usage (limit pool) */
840                 if (!dquot->dq_dqb.dqb_ihardlimit &&
841                     !dquot->dq_dqb.dqb_isoftlimit)
842                         dquot->dq_dqb.dqb_curinodes = 0;
843
844                 if (!dqblk->dqb_isoftlimit ||
845                     dquot->dq_dqb.dqb_curinodes <= dqblk->dqb_isoftlimit)
846                         dquot->dq_dqb.dqb_itime = 0;
847                 else if (!(dqblk->dqb_valid & QIF_ITIME))
848                         dquot->dq_dqb.dqb_itime = cfs_time_current_sec() +
849                                 qinfo->qi_info[dquot->dq_type].dqi_igrace;
850         }
851
852         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
853
854         up(&dquot->dq_sem);
855
856         if (rc) {
857                 CERROR("set limit failed! (rc:%d)\n", rc);
858                 goto out;
859         }
860
861         up(&mds->mds_qonoff_sem);
862         if (dqblk->dqb_valid & QIF_ILIMITS) {
863                 set = !(ihardlimit || isoftlimit);
864                 rc = mds_init_slave_ilimits(obd, oqctl, set);
865                 if (rc) {
866                         CERROR("init slave ilimits failed! (rc:%d)\n", rc);
867                         goto revoke_out;
868                 }
869         }
870
871         if (dqblk->dqb_valid & QIF_BLIMITS) {
872                 set = !(bhardlimit || bsoftlimit);
873                 rc = mds_init_slave_blimits(obd, oqctl, set);
874                 if (rc) {
875                         CERROR("init slave blimits failed! (rc:%d)\n", rc);
876                         goto revoke_out;
877                 }
878         }
879         down(&mds->mds_qonoff_sem);
880
881 revoke_out:
882         if (rc) {
883                 /* cancel previous setting */
884                 down(&dquot->dq_sem);
885                 dquot->dq_dqb.dqb_ihardlimit = ihardlimit;
886                 dquot->dq_dqb.dqb_isoftlimit = isoftlimit;
887                 dquot->dq_dqb.dqb_bhardlimit = bhardlimit;
888                 dquot->dq_dqb.dqb_bsoftlimit = bsoftlimit;
889                 dquot->dq_dqb.dqb_btime = btime;
890                 dquot->dq_dqb.dqb_itime = itime;
891                 fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
892                 up(&dquot->dq_sem);
893         }
894 out:
895         down(&dquot->dq_sem);
896         dquot->dq_status &= ~DQ_STATUS_SET;
897         up(&dquot->dq_sem);
898         lustre_dqput(dquot);
899         EXIT;
900 out_sem:
901         up(&mds->mds_qonoff_sem);
902         return rc;
903 }
904
905 static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl)
906 {
907         struct obd_quotactl *soqc;
908         struct lvfs_run_ctxt saved;
909         int rc;
910         ENTRY;
911
912         OBD_ALLOC_PTR(soqc);
913         if (!soqc)
914                 RETURN(-ENOMEM);
915
916         soqc->qc_cmd = Q_GETOQUOTA;
917         soqc->qc_id = oqctl->qc_id;
918         soqc->qc_type = oqctl->qc_type;
919
920         rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc);
921         if (rc)
922                GOTO(out, rc);
923
924         oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
925
926         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
927         soqc->qc_dqblk.dqb_curspace = 0;
928         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc);
929         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
930
931         if (rc)
932                 GOTO(out, rc);
933
934         oqctl->qc_dqblk.dqb_curinodes += soqc->qc_dqblk.dqb_curinodes;
935         oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace;
936         EXIT;
937 out:
938         OBD_FREE_PTR(soqc);
939         return rc;
940 }
941
942 int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
943 {
944         struct mds_obd *mds = &obd->u.mds;
945         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
946         struct lustre_dquot *dquot;
947         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
948         int rc;
949         ENTRY;
950
951         down(&mds->mds_qonoff_sem);
952         if (qinfo->qi_files[oqctl->qc_type] == NULL)
953                 GOTO(out, rc = -ESRCH);
954
955         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
956         if (IS_ERR(dquot))
957                 GOTO(out, rc = PTR_ERR(dquot));
958
959         down(&dquot->dq_sem);
960         dqblk->dqb_ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
961         dqblk->dqb_isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
962         dqblk->dqb_bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
963         dqblk->dqb_bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
964         dqblk->dqb_btime = dquot->dq_dqb.dqb_btime;
965         dqblk->dqb_itime = dquot->dq_dqb.dqb_itime;
966         up(&dquot->dq_sem);
967
968         lustre_dqput(dquot);
969
970         /* the usages in admin quota file is inaccurate */
971         dqblk->dqb_curinodes = 0;
972         dqblk->dqb_curspace = 0;
973         rc = mds_get_space(obd, oqctl);
974         EXIT;
975 out:
976         up(&mds->mds_qonoff_sem);
977         return rc;
978 }
979
980 int mds_get_obd_quota(struct obd_device *obd, struct obd_quotactl *oqctl)
981 {
982         struct lvfs_run_ctxt saved;
983         int rc;
984         ENTRY;
985
986         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
987         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
988         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
989
990         RETURN(rc);
991 }
992
993
994 /* FIXME we only recovery block limit by now, need recovery inode
995  * limits also after CMD involved in */
996 static int 
997 dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type)
998 {
999         struct mds_obd *mds = &obd->u.mds;
1000         struct lustre_quota_info *qinfo= &obd->u.mds.mds_quota_info;
1001         struct lustre_dquot *dquot;
1002         struct obd_quotactl *qctl;
1003         __u64 total_limits = 0;
1004         int rc;
1005         ENTRY;
1006
1007         OBD_ALLOC_PTR(qctl);
1008         if (qctl == NULL)
1009                 RETURN(-ENOMEM);
1010
1011         dquot = lustre_dqget(obd, qinfo, id, type);
1012         if (IS_ERR(dquot)) {
1013                 CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot));
1014                 OBD_FREE_PTR(qctl);
1015                 RETURN(PTR_ERR(dquot));
1016         }
1017
1018         down(&dquot->dq_sem);
1019
1020         /* don't recovery the dquot without limits or under setting */
1021         if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) ||
1022             dquot->dq_status)
1023                 GOTO(skip, rc = 0);
1024         dquot->dq_status |= DQ_STATUS_RECOVERY;
1025
1026         up(&dquot->dq_sem);
1027
1028         /* get real bhardlimit from all slaves. */
1029         qctl->qc_cmd = Q_GETOQUOTA;
1030         qctl->qc_type = type;
1031         qctl->qc_id = id;
1032         qctl->qc_stat = QUOTA_RECOVERING;
1033         rc = obd_quotactl(obd->u.mds.mds_osc_exp, qctl);
1034         if (rc)
1035                 GOTO(out, rc);
1036         total_limits = qctl->qc_dqblk.dqb_bhardlimit;
1037
1038         /* get real bhardlimit from master */
1039         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, qctl);
1040         if (rc)
1041                 GOTO(out, rc);
1042         total_limits += qctl->qc_dqblk.dqb_bhardlimit;
1043
1044         /* amend the usage of the administrative quotafile */
1045         down(&mds->mds_qonoff_sem);
1046         down(&dquot->dq_sem);
1047
1048         dquot->dq_dqb.dqb_curspace = total_limits << QUOTABLOCK_BITS;
1049
1050         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1051         if (rc)
1052                 CERROR("write dquot failed! (rc:%d)\n", rc);
1053
1054         up(&dquot->dq_sem);
1055         up(&mds->mds_qonoff_sem);
1056         EXIT;
1057 out:
1058         down(&dquot->dq_sem);
1059         dquot->dq_status &= ~DQ_STATUS_RECOVERY;
1060 skip:
1061         up(&dquot->dq_sem);
1062
1063         lustre_dqput(dquot);
1064         OBD_FREE_PTR(qctl);
1065         return rc;
1066 }
1067
1068 struct qmaster_recov_thread_data {
1069         struct obd_device *obd;
1070         struct completion comp;
1071 };
1072
1073 static int qmaster_recovery_main(void *arg)
1074 {
1075         struct qmaster_recov_thread_data *data = arg;
1076         struct obd_device *obd = data->obd;
1077         int rc = 0;
1078         unsigned short type;
1079         ENTRY;
1080
1081         ptlrpc_daemonize("qmaster_recovd");
1082
1083         complete(&data->comp);
1084
1085         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
1086                 struct mds_obd *mds = &obd->u.mds;
1087                 struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1088                 struct list_head id_list;
1089                 struct dquot_id *dqid, *tmp;
1090
1091                 down(&mds->mds_qonoff_sem);
1092                 if (qinfo->qi_files[type] == NULL) {
1093                         up(&mds->mds_qonoff_sem);
1094                         continue;
1095                 }
1096                 CFS_INIT_LIST_HEAD(&id_list);
1097                 rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, 
1098                                  &id_list);
1099                 up(&mds->mds_qonoff_sem);
1100
1101                 if (rc)
1102                         CERROR("error get ids from admin quotafile.(%d)\n", rc);
1103
1104                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
1105                         list_del_init(&dqid->di_link);
1106                         if (rc)
1107                                 goto free;
1108
1109                         rc = dquot_recovery(obd, dqid->di_id, type);
1110                         if (rc)
1111                                 CERROR("qmaster recovery failed! (id:%d type:%d"
1112                                        " rc:%d)\n", dqid->di_id, type, rc);
1113 free:
1114                         kfree(dqid);
1115                 }
1116         }
1117         RETURN(rc);
1118 }
1119
1120 int mds_quota_recovery(struct obd_device *obd)
1121 {
1122         struct lov_obd *lov = &obd->u.mds.mds_osc_obd->u.lov;
1123         struct qmaster_recov_thread_data data;
1124         int rc = 0;
1125         ENTRY;
1126
1127         mutex_down(&lov->lov_lock);
1128         if (lov->desc.ld_tgt_count != lov->desc.ld_active_tgt_count) {
1129                 CWARN("Not all osts are active, abort quota recovery\n");
1130                 mutex_up(&lov->lov_lock);
1131                 RETURN(rc);
1132         }
1133         mutex_up(&lov->lov_lock);
1134
1135         data.obd = obd;
1136         init_completion(&data.comp);
1137
1138         rc = kernel_thread(qmaster_recovery_main, &data, CLONE_VM|CLONE_FILES);
1139         if (rc < 0)
1140                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
1141
1142         wait_for_completion(&data.comp);
1143         RETURN(rc);
1144 }