Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / quota / quota_master.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/quota/quota_master.c
5  *  Lustre Quota Master request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Niu YaWei <niu@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
13  *
14  */
15 #ifndef EXPORT_SYMTAB
16 # define EXPORT_SYMTAB
17 #endif
18
19 #define DEBUG_SUBSYSTEM S_MDS
20
21 #include <linux/version.h>
22 #include <linux/fs.h>
23 #include <asm/unistd.h>
24 #include <linux/slab.h>
25 #include <linux/quotaops.h>
26 #include <linux/module.h>
27 #include <linux/init.h>
28 #include <linux/quota.h>
29
30 #include <obd_class.h>
31 #include <lustre_quota.h>
32 #include <lustre_fsfilt.h>
33 #include <lustre_mds.h>
34
35 #include "quota_internal.h"
36
37 /* lock ordering: 
38  * mds->mds_qonoff_sem > dquot->dq_sem */
39 static struct list_head lustre_dquot_hash[NR_DQHASH];
40 static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED;
41
42 cfs_mem_cache_t *lustre_dquot_cachep;
43
44 int lustre_dquot_init(void)
45 {
46         int i;
47         ENTRY;
48
49         LASSERT(lustre_dquot_cachep == NULL);
50         lustre_dquot_cachep = cfs_mem_cache_create("lustre_dquot_cache",
51                                                    sizeof(struct lustre_dquot),
52                                                    0, 0);
53         if (!lustre_dquot_cachep)
54                 return (-ENOMEM);
55
56         for (i = 0; i < NR_DQHASH; i++) {
57                 INIT_LIST_HEAD(lustre_dquot_hash + i);
58         }
59         RETURN(0);
60 }
61
62 void lustre_dquot_exit(void)
63 {
64         int i;
65         ENTRY;
66         /* FIXME cleanup work ?? */
67
68         for (i = 0; i < NR_DQHASH; i++) {
69                 LASSERT(list_empty(lustre_dquot_hash + i));
70         }
71         if (lustre_dquot_cachep) {
72                 int rc;
73                 rc = cfs_mem_cache_destroy(lustre_dquot_cachep);
74                 LASSERTF(rc == 0,"couldn't destroy lustre_dquot_cachep slab\n");
75                 lustre_dquot_cachep = NULL;
76         }
77         EXIT;
78 }
79
80 static inline int
81 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
82              __attribute__((__const__));
83
84 static inline int
85 dquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type)
86 {
87         unsigned long tmp = ((unsigned long)info >> L1_CACHE_SHIFT) ^ id;
88         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
89         return tmp;
90 }
91
92 /* caller must hold dquot_hash_lock */
93 static struct lustre_dquot *find_dquot(int hashent,
94                                        struct lustre_quota_info *lqi, qid_t id,
95                                        int type)
96 {
97         struct lustre_dquot *dquot;
98         ENTRY;
99
100         LASSERT_SPIN_LOCKED(&dquot_hash_lock);
101         list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) {
102                 if (dquot->dq_info == lqi &&
103                     dquot->dq_id == id && dquot->dq_type == type)
104                         RETURN(dquot);
105         }
106         RETURN(NULL);
107 }
108
109 static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi,
110                                         qid_t id, int type)
111 {
112         struct lustre_dquot *dquot = NULL;
113         ENTRY;
114
115         OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot));
116         if (dquot == NULL)
117                 RETURN(NULL);
118
119         INIT_LIST_HEAD(&dquot->dq_hash);
120         init_mutex_locked(&dquot->dq_sem);
121         dquot->dq_refcnt = 1;
122         dquot->dq_info = lqi;
123         dquot->dq_id = id;
124         dquot->dq_type = type;
125         dquot->dq_status = DQ_STATUS_AVAIL;
126
127         RETURN(dquot);
128 }
129
130 static void free_dquot(struct lustre_dquot *dquot)
131 {
132         OBD_SLAB_FREE(dquot, lustre_dquot_cachep, sizeof(*dquot));
133 }
134
135 static void insert_dquot_nolock(struct lustre_dquot *dquot)
136 {
137         struct list_head *head = lustre_dquot_hash +
138             dquot_hashfn(dquot->dq_info, dquot->dq_id, dquot->dq_type);
139         LASSERT(list_empty(&dquot->dq_hash));
140         list_add(&dquot->dq_hash, head);
141 }
142
143 static void remove_dquot_nolock(struct lustre_dquot *dquot)
144 {
145         LASSERT(!list_empty(&dquot->dq_hash));
146         list_del_init(&dquot->dq_hash);
147 }
148
149 static void lustre_dqput(struct lustre_dquot *dquot)
150 {
151         ENTRY;
152         spin_lock(&dquot_hash_lock);
153         LASSERT(dquot->dq_refcnt);
154         dquot->dq_refcnt--;
155         if (!dquot->dq_refcnt) {
156                 remove_dquot_nolock(dquot);
157                 free_dquot(dquot);
158         }
159         spin_unlock(&dquot_hash_lock);
160         EXIT;
161 }
162
163 static struct lustre_dquot *lustre_dqget(struct obd_device *obd,
164                                          struct lustre_quota_info *lqi,
165                                          qid_t id, int type)
166 {
167         unsigned int hashent = dquot_hashfn(lqi, id, type);
168         struct lustre_dquot *dquot, *empty;
169         ENTRY;
170
171         if ((empty = alloc_dquot(lqi, id, type)) == NULL)
172                 RETURN(ERR_PTR(-ENOMEM));
173         
174         spin_lock(&dquot_hash_lock);
175         if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) {
176                 dquot->dq_refcnt++;
177                 spin_unlock(&dquot_hash_lock);
178                 free_dquot(empty);
179         } else {
180                 int rc;
181
182                 dquot = empty;
183                 insert_dquot_nolock(dquot);
184                 spin_unlock(&dquot_hash_lock);
185
186                 rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT);
187                 up(&dquot->dq_sem);
188                 if (rc) {
189                         CERROR("can't read dquot from admin quotafile! "
190                                "(rc:%d)\n", rc);
191                         lustre_dqput(dquot);
192                         RETURN(ERR_PTR(rc));
193                 }
194
195         }
196
197         LASSERT(dquot);
198         RETURN(dquot);
199 }
200
201 static void init_oqaq(struct quota_adjust_qunit *oqaq,
202                       struct lustre_quota_ctxt *qctxt,
203                       qid_t id, int type)
204 {
205         struct lustre_qunit_size *lqs = NULL;
206
207         oqaq->qaq_id = id;
208         oqaq->qaq_flags = type;
209         quota_search_lqs(NULL, oqaq, qctxt, &lqs);
210         if (lqs) {
211                 spin_lock(&lqs->lqs_lock);
212                 oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz;
213                 oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz;
214                 oqaq->qaq_flags    = lqs->lqs_flags;
215                 spin_unlock(&lqs->lqs_lock);
216                 lqs_putref(lqs);
217         } else {
218                 CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
219                 oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz;
220                 oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz;
221         }
222 }
223
224 int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type,
225                           __u32 is_blk)
226 {
227         struct mds_obd *mds = &obd->u.mds;
228         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
229         struct obd_device *lov_mds_obd = class_exp2obd(mds->mds_osc_exp);
230         struct lov_obd *lov = &lov_mds_obd->u.lov;
231         __u32 ost_num = lov->desc.ld_tgt_count, mdt_num = 1;
232         struct quota_adjust_qunit *oqaq = NULL;
233         unsigned int uid = 0, gid = 0;
234         struct lustre_quota_info *info = &mds->mds_quota_info;
235         struct lustre_dquot *dquot = NULL;
236         int adjust_res = 0;
237         int rc = 0;
238         ENTRY;
239
240         LASSERT(mds);
241         dquot = lustre_dqget(obd, info, id, type);
242         if (IS_ERR(dquot))
243                 RETURN(PTR_ERR(dquot));
244
245         OBD_ALLOC_PTR(oqaq);
246         if (!oqaq)
247                 GOTO(out, rc = -ENOMEM);
248
249         down(&dquot->dq_sem);
250         init_oqaq(oqaq, qctxt, id, type);
251
252         rc = dquot_create_oqaq(qctxt, dquot, ost_num, mdt_num,
253                                is_blk ? LQUOTA_FLAGS_ADJBLK :
254                                LQUOTA_FLAGS_ADJINO, oqaq);
255
256         if (rc < 0) {
257                 CDEBUG(D_ERROR, "create oqaq failed! (rc:%d)\n", rc);
258                 GOTO(out_sem, rc);
259         }
260         QAQ_DEBUG(oqaq, "show oqaq.\n")
261
262         if (!QAQ_IS_ADJBLK(oqaq) && !QAQ_IS_ADJINO(oqaq))
263                 GOTO(out_sem, rc);
264
265         /* adjust the mds slave qunit size */
266         adjust_res = quota_adjust_slave_lqs(oqaq, qctxt);
267         if (adjust_res <= 0) {
268                 if (adjust_res < 0) {
269                         rc = adjust_res;
270                         CDEBUG(D_ERROR, "adjust mds slave's qunit size failed! \
271                                (rc:%d)\n", rc);
272                 } else {
273                         CDEBUG(D_QUOTA, "qunit doesn't need to be adjusted.\n");
274                 }
275                 GOTO(out_sem, rc);
276         }
277
278         if (type)
279                 gid = dquot->dq_id;
280         else
281                 uid = dquot->dq_id;
282
283         up(&dquot->dq_sem);
284
285         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
286         if (rc) {
287                 CDEBUG(D_ERROR, "mds fail to adjust file quota! \
288                                (rc:%d)\n", rc);
289                 GOTO(out, rc);
290         }
291
292         /* only when block qunit is reduced, boardcast to osts */
293         if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq))
294                 rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq);
295
296 out:
297         lustre_dqput(dquot);
298         if (oqaq)
299                 OBD_FREE_PTR(oqaq);
300
301         RETURN(rc);
302 out_sem:
303         up(&dquot->dq_sem);
304         goto out;
305 }
306
307 int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
311         struct lustre_quota_info *info = &mds->mds_quota_info;
312         struct lustre_dquot *dquot = NULL;
313         __u64 *usage = NULL;
314         __u32 hlimit = 0, slimit = 0;
315         time_t *time = NULL;
316         unsigned int grace = 0;
317         struct lustre_qunit_size *lqs = NULL;
318         int rc = 0;
319         ENTRY;
320
321         OBD_FAIL_RETURN(OBD_FAIL_OBD_DQACQ, -EIO);
322
323         dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata));
324         if (IS_ERR(dquot))
325                 RETURN(PTR_ERR(dquot));
326
327         DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n");
328         QINFO_DEBUG(dquot->dq_info, "get dquot in dqadq_handler\n");
329
330         down(&mds->mds_qonoff_sem);
331         down(&dquot->dq_sem);
332
333         if (dquot->dq_status & DQ_STATUS_RECOVERY) {
334                 DQUOT_DEBUG(dquot, "this dquot is under recovering.\n");
335                 GOTO(out, rc = -EBUSY);
336         }
337
338         if (QDATA_IS_BLK(qdata)) {
339                 grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace;
340                 usage = &dquot->dq_dqb.dqb_curspace;
341                 hlimit = dquot->dq_dqb.dqb_bhardlimit;
342                 slimit = dquot->dq_dqb.dqb_bsoftlimit;
343                 time = &dquot->dq_dqb.dqb_btime;
344         } else {
345                 grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_igrace;
346                 usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes;
347                 hlimit = dquot->dq_dqb.dqb_ihardlimit;
348                 slimit = dquot->dq_dqb.dqb_isoftlimit;
349                 time = &dquot->dq_dqb.dqb_itime;
350         }
351
352         /* if the quota limit in admin quotafile is zero, we just inform
353          * slave to clear quota limit with zero qd_count */
354         if (hlimit == 0 && slimit == 0) {
355                 qdata->qd_count = 0;
356                 GOTO(out, rc);
357         }
358
359         switch (opc) {
360         case QUOTA_DQACQ:
361                 if (hlimit &&
362                     QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > hlimit)
363                 {
364                         if (QDATA_IS_CHANGE_QS(qdata) &&
365                             QUSG(*usage, QDATA_IS_BLK(qdata)) < hlimit)
366                                 qdata->qd_count = (hlimit -
367                                         QUSG(*usage, QDATA_IS_BLK(qdata)))
368                                         * QUOTABLOCK_SIZE;
369                         else
370                                 GOTO(out, rc = -EDQUOT);
371                 }
372
373                 if (slimit &&
374                     QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > slimit) {
375                         if (*time && cfs_time_current_sec() >= *time)
376                                 GOTO(out, rc = -EDQUOT);
377                         else if (!*time)
378                                 *time = cfs_time_current_sec() + grace;
379                 }
380
381                 *usage += qdata->qd_count;
382                 break;
383         case QUOTA_DQREL:
384                 /* The usage in administrative file might be incorrect before
385                  * recovery done */
386                 if (*usage - qdata->qd_count < 0)
387                         *usage = 0;
388                 else
389                         *usage -= qdata->qd_count;
390
391                 /* (usage <= soft limit) but not (usage < soft limit) */
392                 if (!slimit || QUSG(*usage, QDATA_IS_BLK(qdata)) <= slimit)
393                         *time = 0;
394                 break;
395         default:
396                 LBUG();
397         }
398
399         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
400         EXIT;
401 out:
402         up(&dquot->dq_sem);
403         up(&mds->mds_qonoff_sem);
404         lustre_dqput(dquot);
405         if (rc != -EDQUOT)
406                 dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata));
407
408         quota_search_lqs(qdata, NULL, qctxt, &lqs);
409         if (QDATA_IS_BLK(qdata)) {
410                 if (!lqs) {
411                         CDEBUG(D_INFO, "Can't find the lustre qunit size!\n");
412                         qdata->qd_qunit  = qctxt->lqc_bunit_sz;
413                 } else {
414                         spin_lock(&lqs->lqs_lock);
415                         qdata->qd_qunit  = lqs->lqs_bunit_sz;
416                         spin_unlock(&lqs->lqs_lock);
417                 }
418                 QDATA_SET_ADJBLK(qdata);
419         } else {
420                 if (!lqs) {
421                         CDEBUG(D_INFO, "Can't find the lustre qunit size!\n");
422                         qdata->qd_qunit  = qctxt->lqc_iunit_sz;
423                 } else {
424                         spin_lock(&lqs->lqs_lock);
425                         qdata->qd_qunit  = lqs->lqs_iunit_sz;
426                         spin_unlock(&lqs->lqs_lock);
427                 }
428                 QDATA_SET_ADJINO(qdata);
429         }
430
431         QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n");
432         if (lqs)
433                 lqs_putref(lqs);
434
435         return rc;
436 }
437
438 int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[],
439                      unsigned int qpids[], int rc, int opc)
440 {
441         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
442         int rc2 = 0;
443         ENTRY;
444
445         if (rc && rc != -EDQUOT && rc != ENOLCK)
446                 RETURN(0);
447
448         switch (opc) {
449         case FSFILT_OP_RENAME:
450                 /* acquire/release block quota on owner of original parent */
451                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0);
452                 /* fall-through */
453         case FSFILT_OP_SETATTR:
454                 /* acquire/release file quota on original owner */
455                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0);
456                 /* fall-through */
457         case FSFILT_OP_CREATE:
458         case FSFILT_OP_UNLINK:
459                 /* acquire/release file/block quota on owner of child (or current owner) */
460                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0);
461                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
462                 /* acquire/release block quota on owner of parent (or original owner) */
463                 rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
464                 break;
465         default:
466                 LBUG();
467                 break;
468         }
469
470         if (rc2)
471                 CDEBUG(rc2 == -EAGAIN ? D_QUOTA: D_ERROR,
472                        "mds adjust qunit failed! (opc:%d rc:%d)\n", opc, rc2);
473         RETURN(0);
474 }
475
476 int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[],
477                         unsigned int qpids[], int rc, int opc)
478 {
479         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
480         int rc2 = 0;
481         ENTRY;
482
483         if (rc && rc != -EDQUOT)
484                 RETURN(0);
485
486         switch (opc) {
487         case FSFILT_OP_SETATTR:
488                 /* acquire/release block quota on original & current owner */
489                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
490                 rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
491                 break;
492         case FSFILT_OP_UNLINK:
493                 /* release block quota on this owner */
494         case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */
495                 /* acquire block quota on this owner */
496                 rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
497                 break;
498         default:
499                 LBUG();
500                 break;
501         }
502
503         if (rc || rc2) {
504                 if (!rc)
505                         rc = rc2;
506                 CDEBUG(rc == -EAGAIN ? D_QUOTA: D_ERROR,
507                        "filter adjust qunit failed! (opc:%d rc%d)\n",
508                        opc, rc);
509         }
510
511         RETURN(0);
512 }
513
514 static const char prefix[] = "OBJECTS/";
515
516 int mds_quota_get_version(struct obd_device *obd, 
517                           lustre_quota_version_t *version)
518 {
519         struct mds_obd *mds = &obd->u.mds;
520         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
521
522         *version = qinfo->qi_version;
523
524         return 0;
525 }
526
527 int mds_quota_set_version(struct obd_device *obd, lustre_quota_version_t version)
528 {
529         struct mds_obd *mds = &obd->u.mds;
530         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
531         int rc = 0, i;
532
533         if (version != LUSTRE_QUOTA_V1 &&
534             version != LUSTRE_QUOTA_V2)
535                 return -EINVAL;
536
537         down(&mds->mds_qonoff_sem);
538
539         /* no need to change version? nothing to do then */
540         if (qinfo->qi_version == version)
541                 goto out;
542
543         for (i = 0; i < MAXQUOTAS; i++) {
544                 /* quota file has been opened ? */
545                 if (qinfo->qi_files[i]) {
546                         rc = -EBUSY;
547                         goto out;
548                 }
549         }
550
551         CDEBUG(D_INFO, "changing quota version %d -> %d\n", qinfo->qi_version,
552                version);
553
554         qinfo->qi_version = version;
555
556 out:
557         up(&mds->mds_qonoff_sem);
558
559         return rc;
560 }
561
562 int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl)
563 {
564         struct mds_obd *mds = &obd->u.mds;
565         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
566         int rc = 0, i;
567         char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
568         char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
569         char name[64];
570         struct lvfs_run_ctxt saved;
571
572         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
573
574         down(&mds->mds_qonoff_sem);
575
576         for (i = 0; i < MAXQUOTAS; i++) {
577                 struct file *fp;
578                 char* quotafile = (qinfo->qi_version == LUSTRE_QUOTA_V1)?
579                                    quotafiles_v1[i]:quotafiles_v2[i];
580
581                 if (!Q_TYPESET(oqctl, i))
582                         continue;
583
584                 /* quota file has been opened ? */
585                 if (qinfo->qi_files[i]) {
586                         rc = -EBUSY;
587                         goto out;
588                 }
589
590                 LASSERT(strlen(quotafile) + sizeof(prefix) <= sizeof(name));
591                 sprintf(name, "%s%s", prefix, quotafile);
592
593                 fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
594                 if (IS_ERR(fp)) {
595                         rc = PTR_ERR(fp);
596                         CERROR("error invalidating admin quotafile %s (rc:%d)\n",
597                                name, rc);
598                 }
599                 else
600                         filp_close(fp, 0);
601         }
602
603 out:
604         up(&mds->mds_qonoff_sem);
605
606         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
607
608         return rc;
609 }
610
611 int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl)
612 {
613         struct mds_obd *mds = &obd->u.mds;
614         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
615         char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
616         char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
617         struct lvfs_run_ctxt saved;
618         char name[64];
619         int i, rc = 0;
620         ENTRY;
621
622         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
623
624         down(&mds->mds_qonoff_sem);
625
626         for (i = 0; i < MAXQUOTAS && !rc; i++) {
627                 struct file *fp;
628                 char* quotafile = (qinfo->qi_version == LUSTRE_QUOTA_V1)?
629                                         quotafiles_v1[i]:quotafiles_v2[i];
630
631                 if (!Q_TYPESET(oqctl, i))
632                         continue;
633
634                 /* quota file has been opened ? */
635                 if (qinfo->qi_files[i]) {
636                         CWARN("init %s admin quotafile while quota on.\n",
637                               i == USRQUOTA ? "user" : "group");
638                         continue;
639                 }
640
641                 LASSERT(strlen(quotafile) + sizeof(prefix) <= sizeof(name));
642                 sprintf(name, "%s%s", prefix, quotafile);
643
644                 /* check if quota file exists and is correct */
645                 fp = filp_open(name, O_RDONLY, 0);
646                 if (!IS_ERR(fp)) {
647                         /* irregular file is not the right place for quota */
648                         if (!S_ISREG(fp->f_dentry->d_inode->i_mode)) {
649                                 CERROR("admin quota file %s is not "
650                                        "regular!", quotafile);
651                                 filp_close(fp, 0);
652                                 rc = -EINVAL;
653                                 break;
654                         }
655                         qinfo->qi_files[i] = fp;
656                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK);
657                         qinfo->qi_files[i] = 0;
658                         filp_close(fp, 0);
659                 }
660                 else
661                         rc = PTR_ERR(fp);
662
663                 if (!rc)
664                         continue;
665
666                 /* -EINVAL may be returned by quotainfo for bad quota file */
667                 if (rc != -ENOENT && rc != -EINVAL) {
668                         CERROR("error opening old quota file %s (%d)\n", 
669                                name, rc);
670                         break;
671                 }
672
673                 CDEBUG(D_INFO, "%s new quota file %s\n", name, 
674                        rc == -ENOENT ? "creating" : "overwriting");
675
676                 /* create quota file overwriting old if needed */
677                 fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
678                 if (IS_ERR(fp)) {
679                         rc = PTR_ERR(fp);
680                         CERROR("error creating admin quotafile %s (rc:%d)\n",
681                                name, rc);
682                         break;
683                 }
684
685                 qinfo->qi_files[i] = fp;
686
687                 switch (qinfo->qi_version) {
688                 case LUSTRE_QUOTA_V1:
689                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO);
690                         if (rc)
691                                 CERROR("error init %s admin quotafile! (rc:%d)\n",
692                                        i == USRQUOTA ? "user" : "group", rc);
693                         break;
694                 case LUSTRE_QUOTA_V2:
695                         rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CONVERT);
696                         if (rc)
697                                 CERROR("error convert %s admin quotafile! (rc:%d)\n",
698                                        i == USRQUOTA ? "user" : "group", rc);
699                         break;
700                 default:
701                         LBUG();
702                 }
703
704                 filp_close(fp, 0);
705                 qinfo->qi_files[i] = NULL;
706         }
707         up(&mds->mds_qonoff_sem);
708
709         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
710         RETURN(rc);
711 }
712
713 static int close_quota_files(struct obd_quotactl *oqctl, 
714                              struct lustre_quota_info *qinfo)
715 {
716         int i, rc = 0;
717         ENTRY;
718
719         for (i = 0; i < MAXQUOTAS; i++) {
720                 if (!Q_TYPESET(oqctl, i))
721                         continue;
722                 if (qinfo->qi_files[i] == NULL) {
723                         rc = -ESRCH;
724                         continue;
725                 }
726                 filp_close(qinfo->qi_files[i], 0);
727                 qinfo->qi_files[i] = NULL;
728         }
729         RETURN(rc);
730 }
731
732 int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
733 {
734         struct mds_obd *mds = &obd->u.mds;
735         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
736         const char *quotafiles_v1[] = LUSTRE_ADMIN_QUOTAFILES_V1;
737         const char *quotafiles_v2[] = LUSTRE_ADMIN_QUOTAFILES_V2;
738         char name[64];
739         int i, rc = 0;
740         ENTRY;
741
742         /* open admin quota files and read quotafile info */
743         for (i = 0; i < MAXQUOTAS; i++) {
744                 struct file *fp;
745                 const char* quotafile = qinfo->qi_version == LUSTRE_QUOTA_V1?
746                                         quotafiles_v1[i] : quotafiles_v2[i];
747
748                 if (!Q_TYPESET(oqctl, i))
749                         continue;
750
751                 LASSERT(strlen(quotafile) 
752                         + sizeof(prefix) <= sizeof(name));
753                 sprintf(name, "%s%s", prefix, quotafile);
754
755                 if (qinfo->qi_files[i] != NULL) {
756                         rc = -EBUSY;
757                         break;
758                 }
759
760                 fp = filp_open(name, O_RDWR, 0);
761                 /* handle transparent migration to 64 bit quota file */
762                 if (IS_ERR(fp) && PTR_ERR(fp) == -ENOENT && 
763                     qinfo->qi_version == LUSTRE_QUOTA_V2) {
764                         CDEBUG(D_INFO, "attempting to convert V1 quota file to V2 format.\n");
765                         fp = filp_open(name, O_CREAT | O_TRUNC, 0644);
766                         if (!IS_ERR(fp)) {
767                                 qinfo->qi_files[i] = fp;
768                                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CONVERT);
769                                 if (rc) {
770                                         CERROR("error convert %s admin quotafile! (rc:%d)\n",
771                                                i == USRQUOTA ? "user" : "group", rc);
772                                         break;
773                                 }
774                         }
775                 }
776
777                 if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) {
778                         rc = IS_ERR(fp) ? PTR_ERR(fp) : -EINVAL;
779                         CERROR("error open/create %s! (rc:%d)\n", name, rc);
780                         break;
781                 }
782                 qinfo->qi_files[i] = fp;
783
784                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK);
785                 if (rc) {
786                         CERROR("invalid quota file %s! (rc:%d)\n",
787                                name, rc);
788                         break;
789                 }
790
791                 rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_RD_INFO);
792                 if (rc) {
793                         CERROR("error read quotainfo of %s! (rc:%d)\n",
794                                name, rc);
795                         break;
796                 }
797         }
798
799         if (rc && rc != -EBUSY)
800                 close_quota_files(oqctl, qinfo);
801
802         RETURN(rc);
803 }
804
805 static int mds_admin_quota_off(struct obd_device *obd, 
806                                struct obd_quotactl *oqctl)
807 {
808         struct mds_obd *mds = &obd->u.mds;
809         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
810         int rc;
811         ENTRY;
812
813         /* close admin quota files */
814         rc = close_quota_files(oqctl, qinfo);
815         RETURN(rc);
816 }
817
818 int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl)
819 {
820         struct mds_obd *mds = &obd->u.mds;
821         struct obd_device_target *obt = &obd->u.obt;
822         struct lvfs_run_ctxt saved;
823         int rc;
824         ENTRY;
825
826         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
827                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
828                 atomic_inc(&obt->obt_quotachecking);
829                 RETURN(-EBUSY);
830         }
831
832         down(&mds->mds_qonoff_sem);
833         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
834         rc = mds_admin_quota_on(obd, oqctl);
835         if (rc)
836                 goto out;
837
838         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
839         if (rc)
840                 goto out;
841
842         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
843         if (!rc)
844                 obt->obt_qctxt.lqc_status = 1;
845 out:
846         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
847         up(&mds->mds_qonoff_sem);
848         atomic_inc(&obt->obt_quotachecking);
849         RETURN(rc);
850 }
851
852 int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl)
853 {
854         struct mds_obd *mds = &obd->u.mds;
855         struct obd_device_target *obt = &obd->u.obt;
856         struct lvfs_run_ctxt saved;
857         int rc, rc2;
858         ENTRY;
859
860         if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
861                 CDEBUG(D_INFO, "other people are doing quotacheck\n");
862                 atomic_inc(&obt->obt_quotachecking);
863                 RETURN(-EBUSY);
864         }
865
866         down(&mds->mds_qonoff_sem);
867         /* close admin quota files */
868         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
869         mds_admin_quota_off(obd, oqctl);
870
871         rc = obd_quotactl(mds->mds_osc_exp, oqctl);
872         rc2 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
873         if (!rc2)
874                 obt->obt_qctxt.lqc_status = 0;
875
876         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
877         up(&mds->mds_qonoff_sem);
878         atomic_inc(&obt->obt_quotachecking);
879
880         RETURN(rc ?: rc2);
881 }
882
883 int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
884 {
885         struct mds_obd *mds = &obd->u.mds;
886         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
887         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
888         int rc;
889         ENTRY;
890
891         down(&mds->mds_qonoff_sem);
892         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
893                 rc = -ESRCH;
894                 goto out;
895         }
896
897         qinfo->qi_info[oqctl->qc_type].dqi_bgrace = dqinfo->dqi_bgrace;
898         qinfo->qi_info[oqctl->qc_type].dqi_igrace = dqinfo->dqi_igrace;
899         qinfo->qi_info[oqctl->qc_type].dqi_flags = dqinfo->dqi_flags;
900
901         rc = fsfilt_quotainfo(obd, qinfo, oqctl->qc_type, QFILE_WR_INFO);
902
903 out:
904         up(&mds->mds_qonoff_sem);
905         RETURN(rc);
906 }
907
908 int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl)
909 {
910         struct mds_obd *mds = &obd->u.mds;
911         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
912         struct obd_dqinfo *dqinfo = &oqctl->qc_dqinfo;
913         int rc = 0;
914         ENTRY;
915
916         down(&mds->mds_qonoff_sem);
917         if (qinfo->qi_files[oqctl->qc_type] == NULL) {
918                 rc = -ESRCH;
919                 goto out;
920         }
921
922         dqinfo->dqi_bgrace = qinfo->qi_info[oqctl->qc_type].dqi_bgrace;
923         dqinfo->dqi_igrace = qinfo->qi_info[oqctl->qc_type].dqi_igrace;
924         dqinfo->dqi_flags = qinfo->qi_info[oqctl->qc_type].dqi_flags;
925
926 out:
927         up(&mds->mds_qonoff_sem);
928         RETURN(rc);
929 }
930
931 int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt,
932                             struct lustre_dquot *dquot, __u32 ost_num,
933                             __u32 mdt_num, int type,
934                             struct quota_adjust_qunit *oqaq)
935 {
936         __u64 bunit_curr_o, iunit_curr_o;
937         unsigned long shrink_qunit_limit = qctxt->lqc_cqs_boundary_factor;
938         unsigned long cqs_factor = qctxt->lqc_cqs_qs_factor;
939         __u64 blimit = dquot->dq_dqb.dqb_bhardlimit ?
940                 dquot->dq_dqb.dqb_bhardlimit : dquot->dq_dqb.dqb_bsoftlimit;
941         __u64 ilimit = dquot->dq_dqb.dqb_ihardlimit ?
942                 dquot->dq_dqb.dqb_ihardlimit : dquot->dq_dqb.dqb_isoftlimit;
943         int rc = 0;
944         ENTRY;
945
946         if (!dquot || !oqaq)
947                 RETURN(-EINVAL);
948         LASSERT_SEM_LOCKED(&dquot->dq_sem);
949         LASSERT(oqaq->qaq_iunit_sz);
950         LASSERT(oqaq->qaq_bunit_sz);
951
952         /* don't change qunit size */
953         if (!qctxt->lqc_switch_qs)
954                 RETURN(rc);
955
956         bunit_curr_o = oqaq->qaq_bunit_sz;
957         iunit_curr_o = oqaq->qaq_iunit_sz;
958
959         if (dquot->dq_type == GRPQUOTA)
960                 QAQ_SET_GRP(oqaq);
961
962         if ((type & LQUOTA_FLAGS_ADJBLK) && blimit) {
963                 __u64 b_limitation =
964                         oqaq->qaq_bunit_sz * ost_num * shrink_qunit_limit;
965                 /* enlarge block qunit size */
966                 while (blimit >
967                        QUSG(dquot->dq_dqb.dqb_curspace + 2 * b_limitation, 1)) {
968                         oqaq->qaq_bunit_sz =
969                                 QUSG(oqaq->qaq_bunit_sz * cqs_factor, 1)
970                                 << QUOTABLOCK_BITS;
971                         b_limitation = oqaq->qaq_bunit_sz * ost_num *
972                                 shrink_qunit_limit;
973                 }
974
975                 if (oqaq->qaq_bunit_sz > qctxt->lqc_bunit_sz)
976                         oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz;
977
978                 /* shrink block qunit size */
979                 while (blimit <
980                        QUSG(dquot->dq_dqb.dqb_curspace + b_limitation, 1)) {
981                         do_div(oqaq->qaq_bunit_sz , cqs_factor);
982                         oqaq->qaq_bunit_sz = QUSG(oqaq->qaq_bunit_sz, 1) <<
983                                 QUOTABLOCK_BITS;
984                         b_limitation = oqaq->qaq_bunit_sz * ost_num *
985                                 shrink_qunit_limit;
986                         if (oqaq->qaq_bunit_sz <  qctxt->lqc_cqs_least_bunit)
987                                 break;
988                 }
989
990                 if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit)
991                         oqaq->qaq_bunit_sz = qctxt->lqc_cqs_least_bunit;
992
993                 if (bunit_curr_o != oqaq->qaq_bunit_sz)
994                         QAQ_SET_ADJBLK(oqaq);
995
996         }
997
998         if ((type & LQUOTA_FLAGS_ADJINO) && ilimit) {
999                 __u64 i_limitation =
1000                         oqaq->qaq_iunit_sz * mdt_num * shrink_qunit_limit;
1001                 /* enlarge file qunit size */
1002                 while (ilimit > dquot->dq_dqb.dqb_curinodes
1003                        + 2 * i_limitation) {
1004                         oqaq->qaq_iunit_sz = oqaq->qaq_iunit_sz * cqs_factor;
1005                         i_limitation = oqaq->qaq_iunit_sz * mdt_num *
1006                                 shrink_qunit_limit;
1007                 }
1008
1009                 if (oqaq->qaq_iunit_sz > qctxt->lqc_iunit_sz)
1010                         oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz;
1011
1012                 /* shrink file qunit size */
1013                 while (ilimit < dquot->dq_dqb.dqb_curinodes
1014                        + i_limitation) {
1015                         do_div(oqaq->qaq_iunit_sz, cqs_factor);
1016                         i_limitation = oqaq->qaq_iunit_sz * mdt_num *
1017                                        shrink_qunit_limit;
1018                         if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit)
1019                                 break;
1020                 }
1021
1022                 if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit)
1023                         oqaq->qaq_iunit_sz = qctxt->lqc_cqs_least_iunit;
1024
1025                 if (iunit_curr_o != oqaq->qaq_iunit_sz)
1026                         QAQ_SET_ADJINO(oqaq);
1027
1028         }
1029
1030         if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit &&
1031             !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) {
1032                 oqaq->qaq_bunit_sz = 0;
1033                 oqaq->qaq_iunit_sz = 0;
1034                 QAQ_SET_ADJBLK(oqaq);
1035                 QAQ_SET_ADJINO(oqaq);
1036         }
1037
1038         QAQ_DEBUG(oqaq, "the oqaq computed\n");
1039
1040         RETURN(rc);
1041 }
1042
1043 static int mds_init_slave_ilimits(struct obd_device *obd,
1044                                   struct obd_quotactl *oqctl, int set,
1045                                   struct quota_adjust_qunit *oqaq)
1046 {
1047         /* XXX: for file limits only adjust local now */
1048         struct obd_device_target *obt = &obd->u.obt;
1049         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
1050         unsigned int uid = 0, gid = 0;
1051         struct obd_quotactl *ioqc = NULL;
1052         int flag;
1053         int rc;
1054         ENTRY;
1055
1056         /* if we are going to set zero limit, needn't init slaves */
1057         if (!oqctl->qc_dqblk.dqb_ihardlimit && !oqctl->qc_dqblk.dqb_isoftlimit &&
1058             set)
1059                 RETURN(0);
1060
1061         OBD_ALLOC_PTR(ioqc);
1062         if (!ioqc)
1063                 RETURN(-ENOMEM);
1064
1065         flag = oqctl->qc_dqblk.dqb_ihardlimit ||
1066                oqctl->qc_dqblk.dqb_isoftlimit || set;
1067         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
1068         ioqc->qc_id = oqctl->qc_id;
1069         ioqc->qc_type = oqctl->qc_type;
1070         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
1071         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
1072
1073         if (QAQ_IS_ADJINO(oqaq)) {
1074                 /* adjust the mds slave's inode qunit size */
1075                 rc = quota_adjust_slave_lqs(oqaq, qctxt);
1076                 if (rc < 0)
1077                         CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \
1078                                failed! (rc:%d)\n", rc);
1079         }
1080
1081         /* set local limit to MIN_QLIMIT */
1082         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
1083         if (rc)
1084                 GOTO(out, rc);
1085
1086         /* trigger local qunit pre-acquire */
1087         if (oqctl->qc_type == USRQUOTA)
1088                 uid = oqctl->qc_id;
1089         else
1090                 gid = oqctl->qc_id;
1091
1092         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
1093         if (rc) {
1094                 CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n",
1095                        rc);
1096                 GOTO(out, rc);
1097         }
1098         /* FIXME initialize all slaves in CMD */
1099         EXIT;
1100 out:
1101         if (ioqc)
1102                 OBD_FREE_PTR(ioqc);
1103         return rc;
1104 }
1105
1106 static int mds_init_slave_blimits(struct obd_device *obd,
1107                                   struct obd_quotactl *oqctl, int set,
1108                                   struct quota_adjust_qunit *oqaq)
1109 {
1110         struct obd_device_target *obt = &obd->u.obt;
1111         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
1112         struct mds_obd *mds = &obd->u.mds;
1113         struct obd_quotactl *ioqc;
1114         unsigned int uid = 0, gid = 0;
1115         int rc, rc1 = 0;
1116         int flag;
1117         ENTRY;
1118
1119         /* if we are going to set zero limit, needn't init slaves */
1120         if (!oqctl->qc_dqblk.dqb_bhardlimit && !oqctl->qc_dqblk.dqb_bsoftlimit &&
1121             set)
1122                 RETURN(0);
1123
1124         OBD_ALLOC_PTR(ioqc);
1125         if (!ioqc)
1126                 RETURN(-ENOMEM);
1127
1128         flag = oqctl->qc_dqblk.dqb_bhardlimit || 
1129                oqctl->qc_dqblk.dqb_bsoftlimit || set;
1130         ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA;
1131         ioqc->qc_id = oqctl->qc_id;
1132         ioqc->qc_type = oqctl->qc_type;
1133         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
1134         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
1135         if (QAQ_IS_ADJBLK(oqaq)) {
1136                 /* adjust the mds slave's block qunit size */
1137                 rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
1138                 if (rc1 < 0)
1139                         CERROR("adjust mds slave's block qunit size failed!"
1140                                "(rc:%d)\n", rc1);
1141         }
1142
1143         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
1144         if (rc)
1145                 GOTO(out, rc);
1146
1147         /* trigger local qunit pre-acquire */
1148         if (oqctl->qc_type == USRQUOTA)
1149                 uid = oqctl->qc_id;
1150         else
1151                 gid = oqctl->qc_id;
1152
1153         /* initialize all slave's limit */
1154         rc = obd_quotactl(mds->mds_osc_exp, ioqc);
1155
1156         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
1157         if (rc) {
1158                 CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
1159                 GOTO(out, rc);
1160         }
1161
1162         /* adjust all slave's qunit size when setting quota
1163          * this is will create a lqs for every ost, which will present
1164          * certain uid/gid is set quota or not */
1165         QAQ_SET_ADJBLK(oqaq);
1166         rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq);
1167
1168         EXIT;
1169 out:
1170         OBD_FREE_PTR(ioqc);
1171         return rc;
1172 }
1173
1174 int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
1175 {
1176         struct mds_obd *mds = &obd->u.mds;
1177         struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt;
1178         struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp);
1179         struct lov_obd *lov = &lov_obd->u.lov;
1180         struct quota_adjust_qunit *oqaq = NULL;
1181         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1182         __u64 ihardlimit, isoftlimit, bhardlimit, bsoftlimit;
1183         time_t btime, itime;
1184         struct lustre_dquot *dquot;
1185         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
1186         int set, rc, flag = 0;
1187         ENTRY;
1188
1189         OBD_ALLOC_PTR(oqaq);
1190         if (!oqaq)
1191                 RETURN(-ENOMEM);
1192         down(&mds->mds_qonoff_sem);
1193         init_oqaq(oqaq, qctxt, oqctl->qc_id, oqctl->qc_type);
1194
1195         if (qinfo->qi_files[oqctl->qc_type] == NULL)
1196                 GOTO(out_sem, rc = -ESRCH);
1197
1198         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
1199         if (IS_ERR(dquot))
1200                 GOTO(out_sem, rc = PTR_ERR(dquot));
1201         DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n");
1202         QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n");
1203
1204         down(&dquot->dq_sem);
1205
1206         if (dquot->dq_status) {
1207                 up(&dquot->dq_sem);
1208                 lustre_dqput(dquot);
1209                 GOTO(out_sem, rc = -EBUSY);
1210         }
1211         dquot->dq_status |= DQ_STATUS_SET;
1212
1213         ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
1214         isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
1215         bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
1216         bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
1217         btime = dquot->dq_dqb.dqb_btime;
1218         itime = dquot->dq_dqb.dqb_itime;
1219
1220         if (dqblk->dqb_valid & QIF_BTIME)
1221                 dquot->dq_dqb.dqb_btime = dqblk->dqb_btime;
1222         if (dqblk->dqb_valid & QIF_ITIME)
1223                 dquot->dq_dqb.dqb_itime = dqblk->dqb_itime;
1224
1225         if (dqblk->dqb_valid & QIF_BLIMITS) {
1226                 dquot->dq_dqb.dqb_bhardlimit = dqblk->dqb_bhardlimit;
1227                 dquot->dq_dqb.dqb_bsoftlimit = dqblk->dqb_bsoftlimit;
1228                 /* clear usage (limit pool) */
1229                 if (!dquot->dq_dqb.dqb_bhardlimit &&
1230                     !dquot->dq_dqb.dqb_bsoftlimit)
1231                         dquot->dq_dqb.dqb_curspace = 0;
1232
1233                 /* clear grace time */
1234                 if (!dqblk->dqb_bsoftlimit ||
1235                     toqb(dquot->dq_dqb.dqb_curspace) <= dqblk->dqb_bsoftlimit)
1236                         dquot->dq_dqb.dqb_btime = 0;
1237                 /* set grace only if user hasn't provided his own */
1238                 else if (!(dqblk->dqb_valid & QIF_BTIME))
1239                         dquot->dq_dqb.dqb_btime = cfs_time_current_sec() +
1240                                 qinfo->qi_info[dquot->dq_type].dqi_bgrace;
1241
1242                 flag |= LQUOTA_FLAGS_ADJBLK;
1243         }
1244
1245         if (dqblk->dqb_valid & QIF_ILIMITS) {
1246                 dquot->dq_dqb.dqb_ihardlimit = dqblk->dqb_ihardlimit;
1247                 dquot->dq_dqb.dqb_isoftlimit = dqblk->dqb_isoftlimit;
1248                 /* clear usage (limit pool) */
1249                 if (!dquot->dq_dqb.dqb_ihardlimit &&
1250                     !dquot->dq_dqb.dqb_isoftlimit)
1251                         dquot->dq_dqb.dqb_curinodes = 0;
1252
1253                 if (!dqblk->dqb_isoftlimit ||
1254                     dquot->dq_dqb.dqb_curinodes <= dqblk->dqb_isoftlimit)
1255                         dquot->dq_dqb.dqb_itime = 0;
1256                 else if (!(dqblk->dqb_valid & QIF_ITIME))
1257                         dquot->dq_dqb.dqb_itime = cfs_time_current_sec() +
1258                                 qinfo->qi_info[dquot->dq_type].dqi_igrace;
1259
1260                 flag |= LQUOTA_FLAGS_ADJINO;
1261         }
1262         QAQ_DEBUG(oqaq, "before dquot_create_oqaq\n");
1263         rc = dquot_create_oqaq(qctxt, dquot, lov->desc.ld_tgt_count, 1,
1264                                flag, oqaq);
1265         QAQ_DEBUG(oqaq, "after dquot_create_oqaq\n");
1266         if (rc < 0)
1267                 CDEBUG(D_QUOTA, "adjust qunit size failed! (rc:%d)\n", rc);
1268
1269
1270         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1271
1272         up(&dquot->dq_sem);
1273
1274         if (rc) {
1275                 CERROR("set limit failed! (rc:%d)\n", rc);
1276                 goto out;
1277         }
1278
1279         up(&mds->mds_qonoff_sem);
1280         if (dqblk->dqb_valid & QIF_ILIMITS) {
1281                 set = !(ihardlimit || isoftlimit);
1282                 down(&dquot->dq_sem);
1283                 dquot->dq_dqb.dqb_curinodes = 0;
1284                 up(&dquot->dq_sem);
1285                 rc = mds_init_slave_ilimits(obd, oqctl, set, oqaq);
1286                 if (rc) {
1287                         CERROR("init slave ilimits failed! (rc:%d)\n", rc);
1288                         goto revoke_out;
1289                 }
1290         }
1291
1292         if (dqblk->dqb_valid & QIF_BLIMITS) {
1293                 set = !(bhardlimit || bsoftlimit);
1294                 down(&dquot->dq_sem);
1295                 dquot->dq_dqb.dqb_curspace = 0;
1296                 up(&dquot->dq_sem);
1297                 rc = mds_init_slave_blimits(obd, oqctl, set, oqaq);
1298                 if (rc) {
1299                         CERROR("init slave blimits failed! (rc:%d)\n", rc);
1300                         goto revoke_out;
1301                 }
1302         }
1303         down(&mds->mds_qonoff_sem);
1304
1305 revoke_out:
1306         if (rc) {
1307                 /* cancel previous setting */
1308                 down(&dquot->dq_sem);
1309                 dquot->dq_dqb.dqb_ihardlimit = ihardlimit;
1310                 dquot->dq_dqb.dqb_isoftlimit = isoftlimit;
1311                 dquot->dq_dqb.dqb_bhardlimit = bhardlimit;
1312                 dquot->dq_dqb.dqb_bsoftlimit = bsoftlimit;
1313                 dquot->dq_dqb.dqb_btime = btime;
1314                 dquot->dq_dqb.dqb_itime = itime;
1315                 fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1316                 up(&dquot->dq_sem);
1317         }
1318 out:
1319         down(&dquot->dq_sem);
1320         dquot->dq_status &= ~DQ_STATUS_SET;
1321         up(&dquot->dq_sem);
1322         lustre_dqput(dquot);
1323         EXIT;
1324 out_sem:
1325         up(&mds->mds_qonoff_sem);
1326
1327         if (oqaq)
1328                 OBD_FREE_PTR(oqaq);
1329
1330         return rc;
1331 }
1332
1333 static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl)
1334 {
1335         struct obd_quotactl *soqc;
1336         struct lvfs_run_ctxt saved;
1337         int rc, rc1;
1338         ENTRY;
1339
1340         OBD_ALLOC_PTR(soqc);
1341         if (!soqc)
1342                 RETURN(-ENOMEM);
1343
1344         soqc->qc_cmd = Q_GETOQUOTA;
1345         soqc->qc_id = oqctl->qc_id;
1346         soqc->qc_type = oqctl->qc_type;
1347
1348         rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc);
1349
1350         oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace;
1351
1352         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1353         soqc->qc_dqblk.dqb_curspace = 0;
1354         rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc);
1355         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1356
1357         oqctl->qc_dqblk.dqb_curinodes += soqc->qc_dqblk.dqb_curinodes;
1358         if (!rc1)
1359                 oqctl->qc_dqblk.dqb_valid |= QIF_INODES;
1360         oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace;
1361         if (!rc && !rc1)
1362                 oqctl->qc_dqblk.dqb_valid |= QIF_USAGE;
1363
1364         OBD_FREE_PTR(soqc);
1365
1366         if (!rc)
1367                 rc = rc1;
1368         RETURN(rc);
1369 }
1370
1371 int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
1372 {
1373         struct mds_obd *mds = &obd->u.mds;
1374         struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1375         struct lustre_dquot *dquot;
1376         struct obd_dqblk *dqblk = &oqctl->qc_dqblk;
1377         int rc;
1378         ENTRY;
1379
1380         down(&mds->mds_qonoff_sem);
1381         dqblk->dqb_valid = 0;
1382         if (qinfo->qi_files[oqctl->qc_type] == NULL)
1383                 GOTO(out, rc = -ESRCH);
1384
1385         dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type);
1386         if (IS_ERR(dquot))
1387                 GOTO(out, rc = PTR_ERR(dquot));
1388
1389         down(&dquot->dq_sem);
1390         dqblk->dqb_ihardlimit = dquot->dq_dqb.dqb_ihardlimit;
1391         dqblk->dqb_isoftlimit = dquot->dq_dqb.dqb_isoftlimit;
1392         dqblk->dqb_bhardlimit = dquot->dq_dqb.dqb_bhardlimit;
1393         dqblk->dqb_bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit;
1394         dqblk->dqb_btime = dquot->dq_dqb.dqb_btime;
1395         dqblk->dqb_itime = dquot->dq_dqb.dqb_itime;
1396         dqblk->dqb_valid |= QIF_LIMITS | QIF_TIMES;
1397         up(&dquot->dq_sem);
1398
1399         lustre_dqput(dquot);
1400
1401         /* the usages in admin quota file is inaccurate */
1402         dqblk->dqb_curinodes = 0;
1403         dqblk->dqb_curspace = 0;
1404         rc = mds_get_space(obd, oqctl);
1405         EXIT;
1406 out:
1407         up(&mds->mds_qonoff_sem);
1408         return rc;
1409 }
1410
1411 int mds_get_obd_quota(struct obd_device *obd, struct obd_quotactl *oqctl)
1412 {
1413         struct lvfs_run_ctxt saved;
1414         int rc;
1415         ENTRY;
1416
1417         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1418         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
1419         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1420
1421         RETURN(rc);
1422 }
1423
1424
1425 /* FIXME we only recovery block limit by now, need recovery inode
1426  * limits also after CMD involved in */
1427 static int 
1428 dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type)
1429 {
1430         struct mds_obd *mds = &obd->u.mds;
1431         struct lustre_quota_info *qinfo= &obd->u.mds.mds_quota_info;
1432         struct lustre_dquot *dquot;
1433         struct obd_quotactl *qctl;
1434         __u64 total_limits = 0;
1435         int rc;
1436         ENTRY;
1437
1438         OBD_ALLOC_PTR(qctl);
1439         if (qctl == NULL)
1440                 RETURN(-ENOMEM);
1441
1442         dquot = lustre_dqget(obd, qinfo, id, type);
1443         if (IS_ERR(dquot)) {
1444                 CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot));
1445                 OBD_FREE_PTR(qctl);
1446                 RETURN(PTR_ERR(dquot));
1447         }
1448
1449         down(&dquot->dq_sem);
1450
1451         /* don't recovery the dquot without limits or under setting */
1452         if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) ||
1453             dquot->dq_status)
1454                 GOTO(skip, rc = 0);
1455         dquot->dq_status |= DQ_STATUS_RECOVERY;
1456
1457         up(&dquot->dq_sem);
1458
1459         /* get real bhardlimit from all slaves. */
1460         qctl->qc_cmd = Q_GETOQUOTA;
1461         qctl->qc_type = type;
1462         qctl->qc_id = id;
1463         qctl->qc_stat = QUOTA_RECOVERING;
1464         rc = obd_quotactl(obd->u.mds.mds_osc_exp, qctl);
1465         if (rc)
1466                 GOTO(out, rc);
1467         total_limits = qctl->qc_dqblk.dqb_bhardlimit;
1468
1469         /* get real bhardlimit from master */
1470         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, qctl);
1471         if (rc)
1472                 GOTO(out, rc);
1473         total_limits += qctl->qc_dqblk.dqb_bhardlimit;
1474
1475         /* amend the usage of the administrative quotafile */
1476         down(&mds->mds_qonoff_sem);
1477         down(&dquot->dq_sem);
1478
1479         dquot->dq_dqb.dqb_curspace = total_limits << QUOTABLOCK_BITS;
1480
1481         rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT);
1482         if (rc)
1483                 CERROR("write dquot failed! (rc:%d)\n", rc);
1484
1485         up(&dquot->dq_sem);
1486         up(&mds->mds_qonoff_sem);
1487         EXIT;
1488 out:
1489         down(&dquot->dq_sem);
1490         dquot->dq_status &= ~DQ_STATUS_RECOVERY;
1491 skip:
1492         up(&dquot->dq_sem);
1493
1494         lustre_dqput(dquot);
1495         OBD_FREE_PTR(qctl);
1496         return rc;
1497 }
1498
1499 struct qmaster_recov_thread_data {
1500         struct obd_device *obd;
1501         struct completion comp;
1502 };
1503
1504 static int qmaster_recovery_main(void *arg)
1505 {
1506         struct qmaster_recov_thread_data *data = arg;
1507         struct obd_device *obd = data->obd;
1508         int rc = 0;
1509         unsigned short type;
1510         ENTRY;
1511
1512         ptlrpc_daemonize("qmaster_recovd");
1513
1514         complete(&data->comp);
1515
1516         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
1517                 struct mds_obd *mds = &obd->u.mds;
1518                 struct lustre_quota_info *qinfo = &mds->mds_quota_info;
1519                 struct list_head id_list;
1520                 struct dquot_id *dqid, *tmp;
1521
1522                 down(&mds->mds_qonoff_sem);
1523                 if (qinfo->qi_files[type] == NULL) {
1524                         up(&mds->mds_qonoff_sem);
1525                         continue;
1526                 }
1527                 INIT_LIST_HEAD(&id_list);
1528                 rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, 
1529                                  &id_list);
1530                 up(&mds->mds_qonoff_sem);
1531
1532                 if (rc)
1533                         CERROR("error get ids from admin quotafile.(%d)\n", rc);
1534
1535                 list_for_each_entry_safe(dqid, tmp, &id_list, di_link) {
1536                         list_del_init(&dqid->di_link);
1537                         if (rc)
1538                                 goto free;
1539
1540                         rc = dquot_recovery(obd, dqid->di_id, type);
1541                         if (rc)
1542                                 CERROR("qmaster recovery failed! (id:%d type:%d"
1543                                        " rc:%d)\n", dqid->di_id, type, rc);
1544 free:
1545                         kfree(dqid);
1546                 }
1547         }
1548         RETURN(rc);
1549 }
1550
1551 int mds_quota_recovery(struct obd_device *obd)
1552 {
1553         struct lov_obd *lov = &obd->u.mds.mds_osc_obd->u.lov;
1554         struct qmaster_recov_thread_data data;
1555         int rc = 0;
1556         ENTRY;
1557
1558         mutex_down(&lov->lov_lock);
1559         if (lov->desc.ld_tgt_count != lov->desc.ld_active_tgt_count) {
1560                 CWARN("Not all osts are active, abort quota recovery\n");
1561                 mutex_up(&lov->lov_lock);
1562                 RETURN(rc);
1563         }
1564         mutex_up(&lov->lov_lock);
1565
1566         data.obd = obd;
1567         init_completion(&data.comp);
1568
1569         rc = kernel_thread(qmaster_recovery_main, &data, CLONE_VM|CLONE_FILES);
1570         if (rc < 0)
1571                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
1572
1573         wait_for_completion(&data.comp);
1574         RETURN(rc);
1575 }