Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_interface.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LQUOTA
41
42 #ifdef __KERNEL__
43 # include <linux/version.h>
44 # include <linux/module.h>
45 # include <linux/init.h>
46 # include <linux/fs.h>
47 # include <linux/jbd.h>
48 # include <linux/smp_lock.h>
49 # include <linux/buffer_head.h>
50 # include <linux/workqueue.h>
51 # include <linux/mount.h>
52 #else /* __KERNEL__ */
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_mds.h>
58 #include <lustre_dlm.h>
59 #include <lustre_cfg.h>
60 #include <obd_ost.h>
61 #include <lustre_fsfilt.h>
62 #include <lustre_quota.h>
63 #include <lprocfs_status.h>
64 #include "quota_internal.h"
65
66 #ifdef __KERNEL__
67
68 #ifdef HAVE_QUOTA_SUPPORT
69
70 static cfs_time_t last_print = 0;
71 static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED;
72
73 static int filter_quota_setup(struct obd_device *obd)
74 {
75         int rc = 0;
76         struct obd_device_target *obt = &obd->u.obt;
77         ENTRY;
78
79         init_rwsem(&obt->obt_rwsem);
80         obt->obt_qfmt = LUSTRE_QUOTA_V2;
81         atomic_set(&obt->obt_quotachecking, 1);
82         rc = qctxt_init(obd, NULL);
83         if (rc)
84                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
85
86         RETURN(rc);
87 }
88
89 static int filter_quota_cleanup(struct obd_device *obd)
90 {
91         ENTRY;
92         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
93         RETURN(0);
94 }
95
96 static int filter_quota_setinfo(struct obd_device *obd, void *data)
97 {
98         struct obd_export *exp = data;
99         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
100         struct obd_import *imp;
101         ENTRY;
102
103         /* setup the quota context import */
104         spin_lock(&qctxt->lqc_lock);
105         qctxt->lqc_import = exp->exp_imp_reverse;
106         spin_unlock(&qctxt->lqc_lock);
107         CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
108                obd->obd_name,exp->exp_imp_reverse, obd);
109
110         /* make imp's connect flags equal relative exp's connect flags
111          * adding it to avoid the scan export list
112          */
113         imp = qctxt->lqc_import;
114         if (likely(imp))
115                 imp->imp_connect_data.ocd_connect_flags |=
116                         (exp->exp_connect_flags &
117                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
118
119         cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
120         /* start quota slave recovery thread. (release high limits) */
121         qslave_start_recovery(obd, qctxt);
122         RETURN(0);
123 }
124
125 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
126 {
127         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
128         ENTRY;
129
130         /* lquota may be not set up before destroying export, b=14896 */
131         if (!obd->obd_set_up)
132                 RETURN(0);
133
134         /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
135          * should be invalid b=12374 */
136         if (qctxt->lqc_import && qctxt->lqc_import == exp->exp_imp_reverse) {
137                 spin_lock(&qctxt->lqc_lock);
138                 qctxt->lqc_import = NULL;
139                 spin_unlock(&qctxt->lqc_lock);
140                 dqacq_interrupt(qctxt);
141                 CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
142                        obd->obd_name, obd);
143         }
144         RETURN(0);
145 }
146
147 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
148 {
149         ENTRY;
150
151         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
152                 RETURN(0);
153
154         if (ignore) {
155                 CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n");
156                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
157         } else {
158                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
159         }
160
161         RETURN(0);
162 }
163
164 static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
165 {
166         struct obd_device_target *obt = &obd->u.obt;
167         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
168         int err, cnt, rc = 0;
169         struct obd_quotactl *oqctl;
170         ENTRY;
171
172         if (!sb_any_quota_enabled(obt->obt_sb))
173                 RETURN(0);
174
175         OBD_ALLOC_PTR(oqctl);
176         if (!oqctl) {
177                 CERROR("Not enough memory!");
178                 RETURN(-ENOMEM);
179         }
180
181         /* set over quota flags for a uid/gid */
182         oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA;
183         oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA);
184
185         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
186                 struct quota_adjust_qunit oqaq_tmp;
187                 struct lustre_qunit_size *lqs = NULL;
188
189                 oqaq_tmp.qaq_flags = cnt;
190                 oqaq_tmp.qaq_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid;
191
192                 quota_search_lqs(NULL, &oqaq_tmp, qctxt, &lqs);
193                 if (lqs) {
194                         spin_lock(&lqs->lqs_lock);
195                         if (lqs->lqs_bunit_sz <= qctxt->lqc_sync_blk) {
196                                 oa->o_flags |= (cnt == USRQUOTA) ?
197                                         OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA;
198                                 spin_unlock(&lqs->lqs_lock);
199                                 CDEBUG(D_QUOTA, "set sync flag: bunit(%lu), "
200                                        "sync_blk(%d)\n", lqs->lqs_bunit_sz,
201                                        qctxt->lqc_sync_blk);
202                                 /* this is for quota_search_lqs */
203                                 lqs_putref(lqs);
204                                 continue;
205                         }
206                         spin_unlock(&lqs->lqs_lock);
207                         /* this is for quota_search_lqs */
208                         lqs_putref(lqs);
209                 }
210
211                 memset(oqctl, 0, sizeof(*oqctl));
212
213                 oqctl->qc_cmd = Q_GETQUOTA;
214                 oqctl->qc_type = cnt;
215                 oqctl->qc_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid;
216                 err = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
217                 if (err) {
218                         if (!rc)
219                                 rc = err;
220                         oa->o_valid &= ~((cnt == USRQUOTA) ? OBD_MD_FLUSRQUOTA :
221                                                              OBD_MD_FLGRPQUOTA);
222                         continue;
223                 }
224
225                 if (oqctl->qc_dqblk.dqb_bhardlimit &&
226                    (toqb(oqctl->qc_dqblk.dqb_curspace) >=
227                     oqctl->qc_dqblk.dqb_bhardlimit))
228                         oa->o_flags |= (cnt == USRQUOTA) ?
229                                 OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA;
230         }
231         OBD_FREE_PTR(oqctl);
232         RETURN(rc);
233 }
234
235 /**
236  * check whether the left quota of certain uid and gid can satisfy a block_write
237  * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA
238  */
239 static int quota_check_common(struct obd_device *obd, unsigned int uid,
240                               unsigned int gid, int count, int cycle, int isblk,
241                               struct inode *inode, int frags, int *pending)
242 {
243         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
244         int i;
245         __u32 id[MAXQUOTAS] = { uid, gid };
246         struct qunit_data qdata[MAXQUOTAS];
247         int mb = 0;
248         int rc = 0, rc2[2] = { 0, 0 };
249         ENTRY;
250
251         CLASSERT(MAXQUOTAS < 4);
252         if (!sb_any_quota_enabled(qctxt->lqc_sb))
253                 RETURN(rc);
254
255         spin_lock(&qctxt->lqc_lock);
256         if (!qctxt->lqc_valid){
257                 spin_unlock(&qctxt->lqc_lock);
258                 RETURN(rc);
259         }
260         spin_unlock(&qctxt->lqc_lock);
261
262         for (i = 0; i < MAXQUOTAS; i++) {
263                 struct lustre_qunit_size *lqs = NULL;
264
265                 qdata[i].qd_id = id[i];
266                 qdata[i].qd_flags = i;
267                 if (isblk)
268                         QDATA_SET_BLK(&qdata[i]);
269                 qdata[i].qd_count = 0;
270
271                 /* ignore root user */
272                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
273                         continue;
274
275                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
276                 if (!lqs)
277                         continue;
278
279                 rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk);
280                 spin_lock(&lqs->lqs_lock);
281                 if (!cycle) {
282                         if (isblk) {
283                                 *pending = count * CFS_PAGE_SIZE;
284                                 /* in order to complete this write, we need extra
285                                  * meta blocks. This function can get it through
286                                  * data needed to be written b=16542 */
287                                 if (inode) {
288                                         mb = *pending;
289                                         rc = fsfilt_get_mblk(obd, qctxt->lqc_sb,
290                                                              &mb, inode,frags);
291                                         if (rc)
292                                                 CDEBUG(D_ERROR,
293                                                        "can't get extra "
294                                                        "meta blocks.\n");
295                                         else
296                                                 *pending += mb;
297                                 }
298                                 lqs->lqs_bwrite_pending += *pending;
299                         } else {
300                                 *pending = count;
301                                 lqs->lqs_iwrite_pending += *pending;
302                         }
303                 }
304
305                 /* if xx_rec < 0, that means quota are releasing,
306                  * and it may return before we use quota. So if
307                  * we find this situation, we assuming it has
308                  * returned b=18491 */
309                 if (isblk && lqs->lqs_blk_rec < 0) {
310                         if (qdata[i].qd_count < -lqs->lqs_blk_rec)
311                                 qdata[i].qd_count = 0;
312                         else
313                                 qdata[i].qd_count += lqs->lqs_blk_rec;
314                 }
315                 if (!isblk && lqs->lqs_ino_rec < 0) {
316                         if (qdata[i].qd_count < -lqs->lqs_ino_rec)
317                                 qdata[i].qd_count = 0;
318                         else
319                                 qdata[i].qd_count += lqs->lqs_ino_rec;
320                 }
321
322
323                 CDEBUG(D_QUOTA, "count: %d, lqs pending: %lu, qd_count: "LPU64
324                        ", metablocks: %d, isblk: %d, pending: %d.\n", count,
325                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
326                        qdata[i].qd_count, mb, isblk, *pending);
327                 if (rc2[i] == QUOTA_RET_OK) {
328                         if (isblk && qdata[i].qd_count < lqs->lqs_bwrite_pending)
329                                 rc2[i] = QUOTA_RET_ACQUOTA;
330                         if (!isblk && qdata[i].qd_count <
331                             lqs->lqs_iwrite_pending)
332                                 rc2[i] = QUOTA_RET_ACQUOTA;
333                 }
334
335                 spin_unlock(&lqs->lqs_lock);
336
337                 if (lqs->lqs_blk_rec  < 0 &&
338                     qdata[i].qd_count <
339                     lqs->lqs_bwrite_pending - lqs->lqs_blk_rec - mb)
340                         OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
341
342                 /* When cycle is zero, lqs_*_pending will be changed. We will
343                  * get reference of the lqs here and put reference of lqs in
344                  * quota_pending_commit b=14784 */
345                 if (!cycle)
346                         lqs_getref(lqs);
347
348                 /* this is for quota_search_lqs */
349                 lqs_putref(lqs);
350         }
351
352         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
353                 RETURN(QUOTA_RET_ACQUOTA);
354         else
355                 RETURN(rc);
356 }
357
358 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
359                                 unsigned int gid, int count, int *pending,
360                                 quota_acquire acquire,
361                                 struct obd_trans_info *oti, int isblk,
362                                 struct inode *inode, int frags)
363 {
364         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
365         struct timeval work_start;
366         struct timeval work_end;
367         long timediff;
368         struct l_wait_info lwi = { 0 };
369         int rc = 0, cycle = 0, count_err = 1;
370         ENTRY;
371
372         CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
373         *pending = 0;
374         /* Unfortunately, if quota master is too busy to handle the
375          * pre-dqacq in time and quota hash on ost is used up, we
376          * have to wait for the completion of in flight dqacq/dqrel,
377          * in order to get enough quota for write b=12588 */
378         do_gettimeofday(&work_start);
379         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk,
380                                         inode, frags, pending)) &
381                QUOTA_RET_ACQUOTA) {
382
383                 spin_lock(&qctxt->lqc_lock);
384                 if (!qctxt->lqc_import && oti) {
385                         spin_unlock(&qctxt->lqc_lock);
386
387                         LASSERT(oti && oti->oti_thread &&
388                                 oti->oti_thread->t_watchdog);
389
390                         lc_watchdog_disable(oti->oti_thread->t_watchdog);
391                         CDEBUG(D_QUOTA, "sleep for quota master\n");
392                         l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt),
393                                      &lwi);
394                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
395                         lc_watchdog_touch(oti->oti_thread->t_watchdog);
396                 } else {
397                         spin_unlock(&qctxt->lqc_lock);
398                 }
399
400                 cycle++;
401                 if (isblk)
402                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
403                 /* after acquire(), we should run quota_check_common again
404                  * so that we confirm there are enough quota to finish write */
405                 rc = acquire(obd, uid, gid, oti, isblk);
406
407                 /* please reference to dqacq_completion for the below */
408                 /* a new request is finished, try again */
409                 if (rc == QUOTA_REQ_RETURNED) {
410                         CDEBUG(D_QUOTA, "finish a quota req, try again\n");
411                         continue;
412                 }
413
414                 /* it is out of quota already */
415                 if (rc == -EDQUOT) {
416                         CDEBUG(D_QUOTA, "out of quota,  return -EDQUOT\n");
417                         break;
418                 }
419
420                 /* -EBUSY and others, wait a second and try again */
421                 if (rc < 0) {
422                         cfs_waitq_t        waitq;
423                         struct l_wait_info lwi;
424
425                         if (oti && oti->oti_thread && oti->oti_thread->t_watchdog)
426                                 lc_watchdog_touch(oti->oti_thread->t_watchdog);
427                         CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc,
428                                count_err++);
429
430                         init_waitqueue_head(&waitq);
431                         lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL,
432                                           NULL);
433                         l_wait_event(waitq, 0, &lwi);
434                 }
435
436                 if (rc < 0 || cycle % 10 == 2) {
437                         spin_lock(&last_print_lock);
438                         if (last_print == 0 ||
439                             cfs_time_before((last_print + cfs_time_seconds(30)),
440                                             cfs_time_current())) {
441                                 last_print = cfs_time_current();
442                                 spin_unlock(&last_print_lock);
443                                 CWARN("still haven't managed to acquire quota "
444                                       "space from the quota master after %d "
445                                       "retries (err=%d, rc=%d)\n",
446                                       cycle, count_err - 1, rc);
447                         } else {
448                                 spin_unlock(&last_print_lock);
449                         }
450                 }
451
452                 CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
453                        cycle);
454         }
455         do_gettimeofday(&work_end);
456         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
457         lprocfs_counter_add(qctxt->lqc_stats,
458                             isblk ? LQUOTA_WAIT_FOR_CHK_BLK :
459                                     LQUOTA_WAIT_FOR_CHK_INO,
460                             timediff);
461
462         RETURN(rc);
463 }
464
465 /**
466  * when a block_write or inode_create rpc is finished, adjust the record for
467  * pending blocks and inodes
468  */
469 static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
470                                 unsigned int gid, int pending, int isblk)
471 {
472         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
473         struct timeval work_start;
474         struct timeval work_end;
475         long timediff;
476         int i;
477         __u32 id[MAXQUOTAS] = { uid, gid };
478         struct qunit_data qdata[MAXQUOTAS];
479         ENTRY;
480
481         CDEBUG(D_QUOTA, "commit pending quota for  %s\n", obd->obd_name);
482         CLASSERT(MAXQUOTAS < 4);
483         if (!sb_any_quota_enabled(qctxt->lqc_sb))
484                 RETURN(0);
485
486         do_gettimeofday(&work_start);
487         for (i = 0; i < MAXQUOTAS; i++) {
488                 struct lustre_qunit_size *lqs = NULL;
489
490                 qdata[i].qd_id = id[i];
491                 qdata[i].qd_flags = i;
492                 if (isblk)
493                         QDATA_SET_BLK(&qdata[i]);
494                 qdata[i].qd_count = 0;
495
496                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
497                         continue;
498
499                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
500                 if (lqs) {
501                         int flag = 0;
502                         spin_lock(&lqs->lqs_lock);
503                         if (isblk) {
504                                 if (lqs->lqs_bwrite_pending >= pending) {
505                                         lqs->lqs_bwrite_pending -= pending;
506                                         spin_unlock(&lqs->lqs_lock);
507                                         flag = 1;
508                                 } else {
509                                         spin_unlock(&lqs->lqs_lock);
510                                         CDEBUG(D_ERROR,
511                                                "there are too many blocks!\n");
512                                 }
513                         } else {
514                                 if (lqs->lqs_iwrite_pending >= pending) {
515                                         lqs->lqs_iwrite_pending -= pending;
516                                         spin_unlock(&lqs->lqs_lock);
517                                         flag = 1;
518                                 } else {
519                                         spin_unlock(&lqs->lqs_lock);
520                                         CDEBUG(D_ERROR,
521                                                "there are too many files!\n");
522                                 }
523                         }
524                         CDEBUG(D_QUOTA, "lqs pending: %lu, pending: %d, "
525                                "isblk: %d.\n",
526                                isblk ? lqs->lqs_bwrite_pending :
527                                lqs->lqs_iwrite_pending, pending, isblk);
528
529                         lqs_putref(lqs);
530                         /* When lqs_*_pening is changed back, we'll putref lqs
531                          * here b=14784 */
532                         if (flag)
533                                 lqs_putref(lqs);
534                 }
535         }
536         do_gettimeofday(&work_end);
537         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
538         lprocfs_counter_add(qctxt->lqc_stats,
539                             isblk ? LQUOTA_WAIT_FOR_COMMIT_BLK :
540                                     LQUOTA_WAIT_FOR_COMMIT_INO,
541                             timediff);
542
543         RETURN(0);
544 }
545
546 static int mds_quota_init(void)
547 {
548         return lustre_dquot_init();
549 }
550
551 static int mds_quota_exit(void)
552 {
553         lustre_dquot_exit();
554         return 0;
555 }
556
557 static int mds_quota_setup(struct obd_device *obd)
558 {
559         struct obd_device_target *obt = &obd->u.obt;
560         struct mds_obd *mds = &obd->u.mds;
561         int rc;
562         ENTRY;
563
564         if (unlikely(mds->mds_quota)) {
565                 CWARN("try to reinitialize quota context!\n");
566                 RETURN(0);
567         }
568
569         init_rwsem(&obt->obt_rwsem);
570         obt->obt_qfmt = LUSTRE_QUOTA_V2;
571         mds->mds_quota_info.qi_version = LUSTRE_QUOTA_V2;
572         atomic_set(&obt->obt_quotachecking, 1);
573         /* initialize quota master and quota context */
574         sema_init(&mds->mds_qonoff_sem, 1);
575         rc = qctxt_init(obd, dqacq_handler);
576         if (rc) {
577                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
578                 RETURN(rc);
579         }
580         mds->mds_quota = 1;
581         RETURN(rc);
582 }
583
584 static int mds_quota_cleanup(struct obd_device *obd)
585 {
586         ENTRY;
587         if (unlikely(!obd->u.mds.mds_quota))
588                 RETURN(0);
589
590         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
591         RETURN(0);
592 }
593
594 static int mds_quota_setinfo(struct obd_device *obd, void *data)
595 {
596         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
597         ENTRY;
598
599         if (unlikely(!obd->u.mds.mds_quota))
600                 RETURN(0);
601
602         if (data != NULL)
603                 QUOTA_MASTER_READY(qctxt);
604         else
605                 QUOTA_MASTER_UNREADY(qctxt);
606         RETURN(0);
607 }
608
609 static int mds_quota_fs_cleanup(struct obd_device *obd)
610 {
611         struct mds_obd *mds = &obd->u.mds;
612         struct obd_quotactl oqctl;
613         ENTRY;
614
615         if (unlikely(!mds->mds_quota))
616                 RETURN(0);
617
618         mds->mds_quota = 0;
619         memset(&oqctl, 0, sizeof(oqctl));
620         oqctl.qc_type = UGQUOTA;
621
622         down(&mds->mds_qonoff_sem);
623         mds_admin_quota_off(obd, &oqctl);
624         up(&mds->mds_qonoff_sem);
625         RETURN(0);
626 }
627
628 static int quota_acquire_common(struct obd_device *obd, unsigned int uid,
629                                 unsigned int gid, struct obd_trans_info *oti,
630                                 int isblk)
631 {
632         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
633         int rc;
634         ENTRY;
635
636         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, isblk, 1, oti);
637         RETURN(rc);
638 }
639
640 #endif /* HAVE_QUOTA_SUPPORT */
641 #endif /* __KERNEL__ */
642
643 struct osc_quota_info {
644         struct list_head        oqi_hash;       /* hash list */
645         struct client_obd      *oqi_cli;        /* osc obd */
646         unsigned int            oqi_id;         /* uid/gid of a file */
647         short                   oqi_type;       /* quota type */
648 };
649
650 spinlock_t qinfo_list_lock = SPIN_LOCK_UNLOCKED;
651
652 static struct list_head qinfo_hash[NR_DQHASH];
653 /* SLAB cache for client quota context */
654 cfs_mem_cache_t *qinfo_cachep = NULL;
655
656 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
657                          __attribute__((__const__));
658
659 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
660 {
661         unsigned long tmp = ((unsigned long)cli>>6) ^ id;
662         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
663         return tmp;
664 }
665
666 /* caller must hold qinfo_list_lock */
667 static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
668 {
669         struct list_head *head = qinfo_hash +
670                 hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
671
672         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
673         list_add(&oqi->oqi_hash, head);
674 }
675
676 /* caller must hold qinfo_list_lock */
677 static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
678 {
679         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
680         list_del_init(&oqi->oqi_hash);
681 }
682
683 /* caller must hold qinfo_list_lock */
684 static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
685                                                 unsigned int id, int type)
686 {
687         unsigned int hashent = hashfn(cli, id, type);
688         struct osc_quota_info *oqi;
689         ENTRY;
690
691         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
692         list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
693                 if (oqi->oqi_cli == cli &&
694                     oqi->oqi_id == id && oqi->oqi_type == type)
695                         return oqi;
696         }
697         RETURN(NULL);
698 }
699
700 static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
701                                           unsigned int id, int type)
702 {
703         struct osc_quota_info *oqi;
704         ENTRY;
705
706         OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_STD, sizeof(*oqi));
707         if(!oqi)
708                 RETURN(NULL);
709
710         CFS_INIT_LIST_HEAD(&oqi->oqi_hash);
711         oqi->oqi_cli = cli;
712         oqi->oqi_id = id;
713         oqi->oqi_type = type;
714
715         RETURN(oqi);
716 }
717
718 static void free_qinfo(struct osc_quota_info *oqi)
719 {
720         OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
721 }
722
723 int osc_quota_chkdq(struct client_obd *cli, unsigned int uid, unsigned int gid)
724 {
725         unsigned int id;
726         int cnt, rc = QUOTA_OK;
727         ENTRY;
728
729         spin_lock(&qinfo_list_lock);
730         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
731                 struct osc_quota_info *oqi = NULL;
732
733                 id = (cnt == USRQUOTA) ? uid : gid;
734                 oqi = find_qinfo(cli, id, cnt);
735                 if (oqi) {
736                         rc = NO_QUOTA;
737                         break;
738                 }
739         }
740         spin_unlock(&qinfo_list_lock);
741
742         RETURN(rc);
743 }
744
745 int osc_quota_setdq(struct client_obd *cli, unsigned int uid, unsigned int gid,
746                     obd_flag valid, obd_flag flags)
747 {
748         unsigned int id;
749         obd_flag noquota;
750         int cnt, rc = 0;
751         ENTRY;
752
753
754         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
755                 struct osc_quota_info *oqi, *old;
756
757                 if (!(valid & ((cnt == USRQUOTA) ?
758                     OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
759                         continue;
760
761                 id = (cnt == USRQUOTA) ? uid : gid;
762                 noquota = (cnt == USRQUOTA) ?
763                     (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
764
765                 oqi = alloc_qinfo(cli, id, cnt);
766                 if (oqi) {
767                         spin_lock(&qinfo_list_lock);
768
769                         old = find_qinfo(cli, id, cnt);
770                         if (old && !noquota)
771                                 remove_qinfo_hash(old);
772                         else if (!old && noquota)
773                                 insert_qinfo_hash(oqi);
774
775                         spin_unlock(&qinfo_list_lock);
776
777                         if (old || !noquota)
778                                 free_qinfo(oqi);
779                         if (old && !noquota)
780                                 free_qinfo(old);
781                 } else {
782                         CERROR("not enough mem!\n");
783                         rc = -ENOMEM;
784                         break;
785                 }
786         }
787
788         RETURN(rc);
789 }
790
791 int osc_quota_cleanup(struct obd_device *obd)
792 {
793         struct client_obd *cli = &obd->u.cli;
794         struct osc_quota_info *oqi, *n;
795         int i;
796         ENTRY;
797
798         spin_lock(&qinfo_list_lock);
799         for (i = 0; i < NR_DQHASH; i++) {
800                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
801                         if (oqi->oqi_cli != cli)
802                                 continue;
803                         remove_qinfo_hash(oqi);
804                         free_qinfo(oqi);
805                 }
806         }
807         spin_unlock(&qinfo_list_lock);
808
809         RETURN(0);
810 }
811
812 int osc_quota_init(void)
813 {
814         int i;
815         ENTRY;
816
817         LASSERT(qinfo_cachep == NULL);
818         qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
819                                             sizeof(struct osc_quota_info),
820                                             0, 0);
821         if (!qinfo_cachep)
822                 RETURN(-ENOMEM);
823
824         for (i = 0; i < NR_DQHASH; i++)
825                 CFS_INIT_LIST_HEAD(qinfo_hash + i);
826
827         RETURN(0);
828 }
829
830 int osc_quota_exit(void)
831 {
832         struct osc_quota_info *oqi, *n;
833         int i, rc;
834         ENTRY;
835
836         spin_lock(&qinfo_list_lock);
837         for (i = 0; i < NR_DQHASH; i++) {
838                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
839                         remove_qinfo_hash(oqi);
840                         free_qinfo(oqi);
841                 }
842         }
843         spin_unlock(&qinfo_list_lock);
844
845         rc = cfs_mem_cache_destroy(qinfo_cachep);
846         LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
847         qinfo_cachep = NULL;
848
849         RETURN(0);
850 }
851
852 #ifdef __KERNEL__
853 #ifdef HAVE_QUOTA_SUPPORT
854 quota_interface_t mds_quota_interface = {
855         .quota_init     = mds_quota_init,
856         .quota_exit     = mds_quota_exit,
857         .quota_setup    = mds_quota_setup,
858         .quota_cleanup  = mds_quota_cleanup,
859         .quota_check    = target_quota_check,
860         .quota_ctl      = mds_quota_ctl,
861         .quota_setinfo  = mds_quota_setinfo,
862         .quota_fs_cleanup = mds_quota_fs_cleanup,
863         .quota_recovery = mds_quota_recovery,
864         .quota_adjust   = mds_quota_adjust,
865         .quota_chkquota = quota_chk_acq_common,
866         .quota_acquire  = quota_acquire_common,
867         .quota_pending_commit = quota_pending_commit,
868 };
869
870 quota_interface_t filter_quota_interface = {
871         .quota_setup    = filter_quota_setup,
872         .quota_cleanup  = filter_quota_cleanup,
873         .quota_check    = target_quota_check,
874         .quota_ctl      = filter_quota_ctl,
875         .quota_setinfo  = filter_quota_setinfo,
876         .quota_clearinfo = filter_quota_clearinfo,
877         .quota_enforce  = filter_quota_enforce,
878         .quota_getflag  = filter_quota_getflag,
879         .quota_acquire  = quota_acquire_common,
880         .quota_adjust   = filter_quota_adjust,
881         .quota_chkquota = quota_chk_acq_common,
882         .quota_adjust_qunit   = filter_quota_adjust_qunit,
883         .quota_pending_commit = quota_pending_commit,
884 };
885 #endif
886 #endif /* __KERNEL__ */
887
888 quota_interface_t mdc_quota_interface = {
889         .quota_ctl      = client_quota_ctl,
890         .quota_check    = client_quota_check,
891         .quota_poll_check = client_quota_poll_check,
892 };
893
894 quota_interface_t lmv_quota_interface = {
895         .quota_ctl      = lmv_quota_ctl,
896         .quota_check    = lmv_quota_check,
897 };
898
899 quota_interface_t osc_quota_interface = {
900         .quota_ctl      = client_quota_ctl,
901         .quota_check    = client_quota_check,
902         .quota_poll_check = client_quota_poll_check,
903         .quota_init     = osc_quota_init,
904         .quota_exit     = osc_quota_exit,
905         .quota_chkdq    = osc_quota_chkdq,
906         .quota_setdq    = osc_quota_setdq,
907         .quota_cleanup  = osc_quota_cleanup,
908         .quota_adjust_qunit = client_quota_adjust_qunit,
909 };
910
911 quota_interface_t lov_quota_interface = {
912         .quota_ctl      = lov_quota_ctl,
913         .quota_check    = lov_quota_check,
914         .quota_adjust_qunit = lov_quota_adjust_qunit,
915 };
916
917 #ifdef __KERNEL__
918
919 cfs_proc_dir_entry_t *lquota_type_proc_dir = NULL;
920
921 static int __init init_lustre_quota(void)
922 {
923 #ifdef HAVE_QUOTA_SUPPORT
924         int rc = 0;
925
926         lquota_type_proc_dir = lprocfs_register(OBD_LQUOTA_DEVICENAME,
927                                                 proc_lustre_root,
928                                                 NULL, NULL);
929         if (IS_ERR(lquota_type_proc_dir)) {
930                 CERROR("LProcFS failed in lquota-init\n");
931                 rc = PTR_ERR(lquota_type_proc_dir);
932                 return rc;
933         }
934
935         rc = qunit_cache_init();
936         if (rc)
937                 return rc;
938
939         PORTAL_SYMBOL_REGISTER(filter_quota_interface);
940         PORTAL_SYMBOL_REGISTER(mds_quota_interface);
941 #endif
942         PORTAL_SYMBOL_REGISTER(mdc_quota_interface);
943         PORTAL_SYMBOL_REGISTER(lmv_quota_interface);
944         PORTAL_SYMBOL_REGISTER(osc_quota_interface);
945         PORTAL_SYMBOL_REGISTER(lov_quota_interface);
946         return 0;
947 }
948
949 static void /*__exit*/ exit_lustre_quota(void)
950 {
951         PORTAL_SYMBOL_UNREGISTER(mdc_quota_interface);
952         PORTAL_SYMBOL_UNREGISTER(lmv_quota_interface);
953         PORTAL_SYMBOL_UNREGISTER(osc_quota_interface);
954         PORTAL_SYMBOL_UNREGISTER(lov_quota_interface);
955 #ifdef HAVE_QUOTA_SUPPORT
956         PORTAL_SYMBOL_UNREGISTER(filter_quota_interface);
957         PORTAL_SYMBOL_UNREGISTER(mds_quota_interface);
958
959         qunit_cache_cleanup();
960
961         if (lquota_type_proc_dir)
962                 lprocfs_remove(&lquota_type_proc_dir);
963 #endif
964 }
965
966 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
967 MODULE_DESCRIPTION("Lustre Quota");
968 MODULE_LICENSE("GPL");
969
970 cfs_module(lquota, "1.0.0", init_lustre_quota, exit_lustre_quota);
971
972 #ifdef HAVE_QUOTA_SUPPORT
973 EXPORT_SYMBOL(mds_quota_interface);
974 EXPORT_SYMBOL(filter_quota_interface);
975 #endif
976 EXPORT_SYMBOL(mdc_quota_interface);
977 EXPORT_SYMBOL(lmv_quota_interface);
978 EXPORT_SYMBOL(osc_quota_interface);
979 EXPORT_SYMBOL(lov_quota_interface);
980 #endif /* __KERNEL */