Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / quota / quota_interface.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/quota/quota_interface.c
5  *
6  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
11  *
12  */
13 #ifndef EXPORT_SYMTAB
14 # define EXPORT_SYMTAB
15 #endif
16 #define DEBUG_SUBSYSTEM S_MDS
17
18 #ifdef __KERNEL__
19 # include <linux/version.h>
20 # include <linux/module.h>
21 # include <linux/init.h>
22 # include <linux/fs.h>
23 # include <linux/jbd.h>
24 # include <linux/ext3_fs.h>
25 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
26 #  include <linux/smp_lock.h>
27 #  include <linux/buffer_head.h>
28 #  include <linux/workqueue.h>
29 #  include <linux/mount.h>
30 # else
31 #  include <linux/locks.h>
32 # endif
33 #else /* __KERNEL__ */
34 # include <liblustre.h>
35 #endif
36
37 #include <obd_class.h>
38 #include <lustre_mds.h>
39 #include <lustre_dlm.h>
40 #include <lustre_cfg.h>
41 #include <obd_ost.h>
42 #include <lustre_fsfilt.h>
43 #include <lustre_quota.h>
44 #include <lprocfs_status.h>
45 #include "quota_internal.h"
46
47
48 #ifdef __KERNEL__
49
50 /* quota proc file handling functions */
51 #ifdef LPROCFS
52
53 #define USER_QUOTA      1
54 #define GROUP_QUOTA     2
55
56 #define MAX_STYPE_SIZE  5
57 int lprocfs_quota_rd_type(char *page, char **start, off_t off, int count,
58                           int *eof, void *data)
59 {
60         struct obd_device *obd = (struct obd_device *)data;
61         char stype[MAX_STYPE_SIZE + 1] = "";
62         int type = obd->u.obt.obt_qctxt.lqc_atype;
63         LASSERT(obd != NULL);
64
65         if (type == 0) {
66                 strcpy(stype, "off");
67         } else {
68                 if (type & USER_QUOTA)
69                         strcat(stype, "u");
70                 if (type & GROUP_QUOTA)
71                         strcat(stype, "g");
72         }
73
74         /* append with quota version on MDS */
75         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)) {
76                 int rc;
77                 lustre_quota_version_t version;
78
79                 rc = mds_quota_get_version(obd, &version);
80                 if (rc)
81                         return rc;
82
83                 switch (version) {
84                         case LUSTRE_QUOTA_V1:
85                                 strcat(stype, "1");
86                                 break;
87                         case LUSTRE_QUOTA_V2:
88                                 strcat(stype, "2");
89                                 break;
90                         default:
91                                 return -ENOSYS;
92                 }
93         }
94
95         return snprintf(page, count, "%s\n", stype);
96 }
97 EXPORT_SYMBOL(lprocfs_quota_rd_type);
98
99 static int auto_quota_on(struct obd_device *obd, int type,
100                          struct super_block *sb, int is_master)
101 {
102         struct obd_quotactl *oqctl;
103         struct lvfs_run_ctxt saved;
104         int rc;
105         ENTRY;
106
107         LASSERT(type == USRQUOTA || type == GRPQUOTA || type == UGQUOTA);
108
109         /* quota already turned on */
110         if (obd->u.obt.obt_qctxt.lqc_status)
111                 RETURN(0);
112
113         OBD_ALLOC_PTR(oqctl);
114         if (!oqctl)
115                 RETURN(-ENOMEM);
116
117         oqctl->qc_type = type;
118         oqctl->qc_cmd = Q_QUOTAON;
119         oqctl->qc_id = QFMT_LDISKFS;
120
121         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
122
123         if (!is_master)
124                 goto local_quota;
125
126         /* turn on cluster wide quota */
127         rc = mds_admin_quota_on(obd, oqctl);
128         if (rc) {
129                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
130                        "auto-enable admin quota failed. rc=%d\n", rc);
131                 GOTO(out_pop, rc);
132         }
133 local_quota:
134         /* turn on local quota */
135         rc = fsfilt_quotactl(obd, sb, oqctl);
136         if (rc) {
137                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
138                        "auto-enable local quota failed. rc=%d\n", rc);
139                 if (is_master)
140                         mds_quota_off(obd, oqctl);
141         } else {
142                 obd->u.obt.obt_qctxt.lqc_status = 1;
143         }
144 out_pop:
145         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
146
147         OBD_FREE_PTR(oqctl);
148         RETURN(rc);
149 }
150
151 int lprocfs_quota_wr_type(struct file *file, const char *buffer,
152                           unsigned long count, void *data)
153 {
154         struct obd_device *obd = (struct obd_device *)data;
155         struct obd_device_target *obt = &obd->u.obt;
156         int type = 0;
157         unsigned long i;
158         char stype[MAX_STYPE_SIZE + 1] = "";
159         LASSERT(obd != NULL);
160
161         if (count > MAX_STYPE_SIZE)
162                 return -EINVAL;
163
164         if (copy_from_user(stype, buffer, count))
165                 return -EFAULT;
166
167         for (i = 0 ; i < count ; i++) {
168                 int rc;
169
170                 switch (stype[i]) {
171                 case 'u' :
172                         type |= USER_QUOTA;
173                         break;
174                 case 'g' :
175                         type |= GROUP_QUOTA;
176                         break;
177                 /* quota version specifiers */
178                 case '1' :
179                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
180                                 break;
181
182                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V1);
183                         if (rc) {
184                                 CDEBUG(D_QUOTA, "failed to set quota v1! %d\n", rc);
185                                 return rc;
186                         }
187                         break;
188                 case '2' :
189                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
190                                 break;
191
192                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V2);
193                         if (rc) {
194                                 CDEBUG(D_QUOTA, "could not set quota v2! %d\n", rc);
195                                 return rc;
196                         }
197                         break;
198                 default  : /* just skip stray symbols like \n */
199                         break;
200                 }
201         }
202
203         obt->obt_qctxt.lqc_atype = type;
204
205         if (type == 0)
206                 return count;
207
208         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
209                 auto_quota_on(obd, type - 1, obt->obt_sb, 1);
210         else if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME))
211                 auto_quota_on(obd, type - 1, obt->obt_sb, 0);
212         else
213                 return -EFAULT;
214
215         return count;
216 }
217 EXPORT_SYMBOL(lprocfs_quota_wr_type);
218
219
220 #endif /* LPROCFS */
221
222 static int filter_quota_setup(struct obd_device *obd)
223 {
224         int rc = 0;
225         struct obd_device_target *obt = &obd->u.obt;
226         ENTRY;
227
228         atomic_set(&obt->obt_quotachecking, 1);
229         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, NULL);
230         if (rc) {
231                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
232                 RETURN(rc);
233         }
234         RETURN(rc);
235 }
236
237 static int filter_quota_cleanup(struct obd_device *obd)
238 {
239         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
240         return 0;
241 }
242
243 static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
244 {
245         struct obd_import *imp;
246
247         /* setup the quota context import */
248         spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
249         obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
250         spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
251
252         /* make imp's connect flags equal relative exp's connect flags 
253          * adding it to avoid the scan export list
254          */
255         imp = exp->exp_imp_reverse;
256         if (imp)
257                 imp->imp_connect_data.ocd_connect_flags |= 
258                         (exp->exp_connect_flags &
259                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
260
261         /* start quota slave recovery thread. (release high limits) */
262         qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
263         return 0;
264 }
265
266 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
267 {
268         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
269
270         /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
271          * should be invalid b=12374 */
272         if (qctxt->lqc_import == exp->exp_imp_reverse) {
273                 spin_lock(&qctxt->lqc_lock);
274                 qctxt->lqc_import = NULL;
275                 spin_unlock(&qctxt->lqc_lock);
276         }
277
278         return 0;
279 }
280
281 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
282 {
283         ENTRY;
284
285         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
286                 RETURN(0);
287
288         if (ignore)
289                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
290         else
291                 cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
292
293         RETURN(0);
294 }
295
296 static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
297 {
298         struct obd_device_target *obt = &obd->u.obt;
299         int err, cnt, rc = 0;
300         struct obd_quotactl *oqctl;
301         ENTRY;
302
303         if (!sb_any_quota_enabled(obt->obt_sb))
304                 RETURN(0);
305
306         oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA);
307
308         OBD_ALLOC_PTR(oqctl);
309         if (!oqctl) {
310                 CERROR("Not enough memory!");
311                 RETURN(-ENOMEM);
312         }
313
314         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
315                 memset(oqctl, 0, sizeof(*oqctl));
316
317                 oqctl->qc_cmd = Q_GETQUOTA;
318                 oqctl->qc_type = cnt;
319                 oqctl->qc_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid;
320                 err = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
321                 if (err) {
322                         if (!rc)
323                                 rc = err;
324                         continue;
325                 }
326
327                 /* set over quota flags for a uid/gid */
328                 oa->o_valid |= (cnt == USRQUOTA) ?
329                                OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA;
330                 if (oqctl->qc_dqblk.dqb_bhardlimit &&
331                    (toqb(oqctl->qc_dqblk.dqb_curspace) > 
332                     oqctl->qc_dqblk.dqb_bhardlimit))
333                         oa->o_flags |= (cnt == USRQUOTA) ? 
334                                 OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA;
335         }
336         OBD_FREE_PTR(oqctl);
337         RETURN(rc);
338 }
339
340 static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
341                                 unsigned int gid)
342 {
343         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
344         int rc;
345         ENTRY;
346
347         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
348         RETURN(rc);
349 }
350
351 /* check whether the left quota of certain uid and gid can satisfy a block_write
352  * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA */
353 static int quota_check_common(struct obd_device *obd, unsigned int uid,
354                               unsigned int gid, int count, int cycle, int isblk)
355 {
356         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
357         int i;
358         __u32 id[MAXQUOTAS] = { uid, gid };
359         struct qunit_data qdata[MAXQUOTAS];
360         int rc = 0, rc2[2] = { 0, 0 };
361         ENTRY;
362
363         CLASSERT(MAXQUOTAS < 4);
364         if (!sb_any_quota_enabled(qctxt->lqc_sb))
365                 RETURN(rc);
366
367         for (i = 0; i < MAXQUOTAS; i++) {
368                 struct lustre_qunit_size *lqs = NULL;
369
370                 qdata[i].qd_id = id[i];
371                 qdata[i].qd_flags = i;
372                 if (isblk)
373                         QDATA_SET_BLK(&qdata[i]);
374                 qdata[i].qd_count = 0;
375
376                 /* ignore root user */
377                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
378                         continue;
379
380                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
381                 if (!lqs)
382                         continue;
383
384                 rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk);
385                 spin_lock(&lqs->lqs_lock);
386                 if (!cycle) {
387                         rc = QUOTA_RET_INC_PENDING;
388                         if (isblk)
389                                 lqs->lqs_bwrite_pending += count;
390                         else
391                                 lqs->lqs_iwrite_pending += count;
392                 }
393
394                 CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
395                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
396                        qdata[i].qd_count);
397                 if (rc2[i] == QUOTA_RET_OK) {
398                         if (isblk && qdata[i].qd_count <
399                             lqs->lqs_bwrite_pending * CFS_PAGE_SIZE)
400                                 rc2[i] = QUOTA_RET_ACQUOTA;
401                         if (!isblk && qdata[i].qd_count <
402                             lqs->lqs_iwrite_pending)
403                                 rc2[i] = QUOTA_RET_ACQUOTA;
404                 }
405
406                 spin_unlock(&lqs->lqs_lock);
407                 lqs_putref(lqs);
408         }
409
410         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
411                 RETURN(rc | QUOTA_RET_ACQUOTA);
412         else
413                 RETURN(rc);
414 }
415
416 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
417                                 unsigned int gid, int count, int *pending,
418                                 int isblk, quota_acquire acquire)
419 {
420         int rc = 0, cycle = 0;
421         ENTRY;
422
423         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
424                QUOTA_RET_ACQUOTA) {
425
426                 if (rc & QUOTA_RET_INC_PENDING)
427                         *pending = 1;
428
429                 cycle++;
430                 if (isblk)
431                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
432                 rc = acquire(obd, uid, gid);
433
434                 /* please reference to dqacq_completion for the below */
435                 /* a new request is finished, try again */
436                 if (rc == -EAGAIN) {
437                         CDEBUG(D_QUOTA, "finish a quota req, try again\n");
438                         continue;
439                 }
440
441                 /* it is out of quota already */
442                 if (rc == -EDQUOT) {
443                         CDEBUG(D_QUOTA, "out of quota,  return -EDQUOT\n");
444                         break;
445                 }
446
447                 /* -EBUSY and others, try 10 times */
448                 if (rc < 0 && cycle < 10) {
449                         CDEBUG(D_QUOTA, "rc: %d, cycle: %d\n", rc, cycle);
450                         cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
451                         continue;
452                 }
453
454                 CDEBUG(D_QUOTA, "exit with rc: %d\n", rc);
455                 break;
456         }
457
458         if (!cycle && rc & QUOTA_RET_INC_PENDING)
459                 *pending = 1;
460
461         RETURN(rc);
462 }
463
464
465 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
466                               unsigned int gid, int npage, int *flag,
467                               quota_acquire acquire)
468 {
469         return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
470                                     acquire);
471 }
472
473 /* when a block_write or inode_create rpc is finished, adjust the record for
474  * pending blocks and inodes*/
475 static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
476                                 unsigned int gid, int count, int isblk)
477 {
478         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
479         int i;
480         __u32 id[MAXQUOTAS] = { uid, gid };
481         struct qunit_data qdata[MAXQUOTAS];
482         ENTRY;
483
484         CLASSERT(MAXQUOTAS < 4);
485         if (!sb_any_quota_enabled(qctxt->lqc_sb))
486                 RETURN(0);
487
488         for (i = 0; i < MAXQUOTAS; i++) {
489                 struct lustre_qunit_size *lqs = NULL;
490
491                 qdata[i].qd_id = id[i];
492                 qdata[i].qd_flags = i;
493                 if (isblk)
494                         QDATA_SET_BLK(&qdata[i]);
495                 qdata[i].qd_count = 0;
496
497                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
498                         continue;
499
500                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
501                 if (lqs) {
502                         spin_lock(&lqs->lqs_lock);
503                         CDEBUG(D_QUOTA, "pending: %lu, count: %d.\n",
504                                isblk ? lqs->lqs_bwrite_pending :
505                                lqs->lqs_iwrite_pending, count);
506
507                         if (isblk) {
508                                 if (lqs->lqs_bwrite_pending >= count)
509                                         lqs->lqs_bwrite_pending -= count;
510                                 else
511                                         CDEBUG(D_ERROR,
512                                                "there are too many blocks!\n");
513                         } else {
514                                 if (lqs->lqs_iwrite_pending >= count)
515                                         lqs->lqs_iwrite_pending -= count;
516                                 else
517                                         CDEBUG(D_ERROR,
518                                                "there are too many files!\n");
519                         }
520
521                         spin_unlock(&lqs->lqs_lock);
522                         lqs_putref(lqs);
523                 }
524         }
525
526         RETURN(0);
527 }
528
529 static int filter_quota_pending_commit(struct obd_device *obd, unsigned int uid,
530                                        unsigned int gid, int npage)
531 {
532         return quota_pending_commit(obd, uid, gid, npage, LQUOTA_FLAGS_BLK);
533 }
534
535 static int mds_quota_init(void)
536 {
537         return lustre_dquot_init();
538 }
539
540 static int mds_quota_exit(void)
541 {
542         lustre_dquot_exit();
543         return 0;
544 }
545
546 static int mds_quota_setup(struct obd_device *obd)
547 {
548         struct obd_device_target *obt = &obd->u.obt;
549         struct mds_obd *mds = &obd->u.mds;
550         int rc;
551         ENTRY;
552
553         mds->mds_quota_info.qi_version = LUSTRE_QUOTA_V2;
554         atomic_set(&obt->obt_quotachecking, 1);
555         /* initialize quota master and quota context */
556         sema_init(&mds->mds_qonoff_sem, 1);
557         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, dqacq_handler);
558         if (rc) {
559                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
560                 RETURN(rc);
561         }
562         RETURN(rc);
563 }
564
565 static int mds_quota_cleanup(struct obd_device *obd)
566 {
567         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
568         RETURN(0);
569 }
570
571 static int mds_quota_fs_cleanup(struct obd_device *obd)
572 {
573         struct mds_obd *mds = &obd->u.mds;
574         int i;
575         ENTRY;
576
577         /* close admin quota files */
578         down(&mds->mds_qonoff_sem);
579         for (i = 0; i < MAXQUOTAS; i++) {
580                 if (mds->mds_quota_info.qi_files[i]) {
581                         filp_close(mds->mds_quota_info.qi_files[i], 0);
582                         mds->mds_quota_info.qi_files[i] = NULL;
583                 }
584         }
585         up(&mds->mds_qonoff_sem);
586         RETURN(0);
587 }
588
589 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
590                            unsigned int gid, int inodes, int *flag,
591                            quota_acquire acquire)
592 {
593         return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
594 }
595
596 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
597                              unsigned int gid)
598 {
599         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
600         int rc;
601         ENTRY;
602
603         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
604         RETURN(rc);
605 }
606
607 static int mds_quota_pending_commit(struct obd_device *obd, unsigned int uid,
608                                     unsigned int gid, int inodes)
609 {
610         return quota_pending_commit(obd, uid, gid, inodes, 0);
611 }
612 #endif /* __KERNEL__ */
613
614 struct osc_quota_info {
615         struct list_head        oqi_hash;       /* hash list */
616         struct client_obd      *oqi_cli;        /* osc obd */
617         unsigned int            oqi_id;         /* uid/gid of a file */
618         short                   oqi_type;       /* quota type */
619 };
620
621 spinlock_t qinfo_list_lock = SPIN_LOCK_UNLOCKED;
622
623 static struct list_head qinfo_hash[NR_DQHASH];
624 /* SLAB cache for client quota context */
625 cfs_mem_cache_t *qinfo_cachep = NULL;
626
627 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
628                          __attribute__((__const__));
629
630 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
631 {
632         unsigned long tmp = ((unsigned long)cli>>6) ^ id;
633         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
634         return tmp;
635 }
636
637 /* caller must hold qinfo_list_lock */
638 static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
639 {
640         struct list_head *head = qinfo_hash + 
641                 hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
642
643         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
644         list_add(&oqi->oqi_hash, head);
645 }
646
647 /* caller must hold qinfo_list_lock */
648 static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
649 {
650         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
651         list_del_init(&oqi->oqi_hash);
652 }
653
654 /* caller must hold qinfo_list_lock */
655 static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
656                                                 unsigned int id, int type)
657 {
658         unsigned int hashent = hashfn(cli, id, type);
659         struct osc_quota_info *oqi;
660
661         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
662         list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
663                 if (oqi->oqi_cli == cli &&
664                     oqi->oqi_id == id && oqi->oqi_type == type)
665                         return oqi;
666         }
667         return NULL;
668 }
669
670 static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
671                                           unsigned int id, int type)
672 {
673         struct osc_quota_info *oqi;
674         ENTRY;
675
676         OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_STD, sizeof(*oqi));
677         if(!oqi)
678                 RETURN(NULL);
679
680         INIT_LIST_HEAD(&oqi->oqi_hash);
681         oqi->oqi_cli = cli;
682         oqi->oqi_id = id;
683         oqi->oqi_type = type;
684
685         RETURN(oqi);
686 }
687
688 static void free_qinfo(struct osc_quota_info *oqi)
689 {
690         OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
691 }
692
693 int osc_quota_chkdq(struct client_obd *cli, 
694                     unsigned int uid, unsigned int gid)
695 {
696         unsigned int id;
697         int cnt, rc = QUOTA_OK;
698         ENTRY;
699
700         spin_lock(&qinfo_list_lock);
701         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
702                 struct osc_quota_info *oqi = NULL;
703
704                 id = (cnt == USRQUOTA) ? uid : gid;
705                 oqi = find_qinfo(cli, id, cnt);
706                 if (oqi) {
707                         rc = NO_QUOTA;
708                         break;
709                 }
710         }
711         spin_unlock(&qinfo_list_lock);
712
713         RETURN(rc);
714 }
715
716 int osc_quota_setdq(struct client_obd *cli, 
717                     unsigned int uid, unsigned int gid,
718                     obd_flag valid, obd_flag flags)
719 {
720         unsigned int id;
721         obd_flag noquota;
722         int cnt, rc = 0;
723         ENTRY;
724
725
726         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
727                 struct osc_quota_info *oqi, *old;
728
729                 if (!(valid & ((cnt == USRQUOTA) ? 
730                     OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
731                         continue;
732
733                 id = (cnt == USRQUOTA) ? uid : gid;
734                 noquota = (cnt == USRQUOTA) ? 
735                     (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
736
737                 oqi = alloc_qinfo(cli, id, cnt);
738                 if (oqi) {
739                         spin_lock(&qinfo_list_lock);
740
741                         old = find_qinfo(cli, id, cnt);
742                         if (old && !noquota)
743                                 remove_qinfo_hash(old);
744                         else if (!old && noquota)
745                                 insert_qinfo_hash(oqi);
746
747                         spin_unlock(&qinfo_list_lock);
748
749                         if (old || !noquota)
750                                 free_qinfo(oqi);
751                         if (old && !noquota)
752                                 free_qinfo(old);
753                 } else {
754                         CERROR("not enough mem!\n");
755                         rc = -ENOMEM;
756                         break;
757                 }
758         }
759
760         RETURN(rc);
761 }
762
763 int osc_quota_cleanup(struct obd_device *obd)
764 {
765         struct client_obd *cli = &obd->u.cli;
766         struct osc_quota_info *oqi, *n;
767         int i;
768         ENTRY;
769
770         spin_lock(&qinfo_list_lock);
771         for (i = 0; i < NR_DQHASH; i++) {
772                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
773                         if (oqi->oqi_cli != cli)
774                                 continue;
775                         remove_qinfo_hash(oqi);
776                         free_qinfo(oqi);
777                 }
778         }
779         spin_unlock(&qinfo_list_lock);
780
781         RETURN(0);
782 }
783
784 int osc_quota_init(void)
785 {
786         int i;
787         ENTRY;
788
789         LASSERT(qinfo_cachep == NULL);
790         qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
791                                             sizeof(struct osc_quota_info),
792                                             0, 0);
793         if (!qinfo_cachep)
794                 RETURN(-ENOMEM);
795
796         for (i = 0; i < NR_DQHASH; i++)
797                 INIT_LIST_HEAD(qinfo_hash + i);
798
799         RETURN(0);
800 }
801
802 int osc_quota_exit(void)
803 {
804         struct osc_quota_info *oqi, *n;
805         int i, rc;
806         ENTRY;
807
808         spin_lock(&qinfo_list_lock);
809         for (i = 0; i < NR_DQHASH; i++) {
810                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
811                         remove_qinfo_hash(oqi);
812                         free_qinfo(oqi);
813                 }
814         }
815         spin_unlock(&qinfo_list_lock);
816
817         rc = cfs_mem_cache_destroy(qinfo_cachep);
818         LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
819         qinfo_cachep = NULL;
820
821         RETURN(0);
822 }
823
824 #ifdef __KERNEL__
825 quota_interface_t mds_quota_interface = {
826         .quota_init     = mds_quota_init,
827         .quota_exit     = mds_quota_exit,
828         .quota_setup    = mds_quota_setup,
829         .quota_cleanup  = mds_quota_cleanup,
830         .quota_check    = target_quota_check,
831         .quota_ctl      = mds_quota_ctl,
832         .quota_fs_cleanup       =mds_quota_fs_cleanup,
833         .quota_recovery = mds_quota_recovery,
834         .quota_adjust   = mds_quota_adjust,
835         .quota_chkquota = mds_quota_check,
836         .quota_acquire  = mds_quota_acquire,
837         .quota_pending_commit = mds_quota_pending_commit,
838 };
839
840 quota_interface_t filter_quota_interface = {
841         .quota_setup    = filter_quota_setup,
842         .quota_cleanup  = filter_quota_cleanup,
843         .quota_check    = target_quota_check,
844         .quota_ctl      = filter_quota_ctl,
845         .quota_setinfo  = filter_quota_setinfo,
846         .quota_clearinfo = filter_quota_clearinfo,
847         .quota_enforce  = filter_quota_enforce,
848         .quota_getflag  = filter_quota_getflag,
849         .quota_acquire  = filter_quota_acquire,
850         .quota_adjust   = filter_quota_adjust,
851         .quota_chkquota = filter_quota_check,
852         .quota_adjust_qunit   = filter_quota_adjust_qunit,
853         .quota_pending_commit = filter_quota_pending_commit,
854 };
855 #endif /* __KERNEL__ */
856
857 quota_interface_t mdc_quota_interface = {
858         .quota_ctl      = client_quota_ctl,
859         .quota_check    = client_quota_check,
860         .quota_poll_check = client_quota_poll_check,
861 };
862
863 quota_interface_t osc_quota_interface = {
864         .quota_ctl      = client_quota_ctl,
865         .quota_check    = client_quota_check,
866         .quota_poll_check = client_quota_poll_check,
867         .quota_init     = osc_quota_init,
868         .quota_exit     = osc_quota_exit,
869         .quota_chkdq    = osc_quota_chkdq,
870         .quota_setdq    = osc_quota_setdq,
871         .quota_cleanup  = osc_quota_cleanup,
872         .quota_adjust_qunit = client_quota_adjust_qunit,
873 };
874
875 quota_interface_t lov_quota_interface = {
876         .quota_check    = lov_quota_check,
877         .quota_ctl      = lov_quota_ctl,
878         .quota_adjust_qunit = lov_quota_adjust_qunit,
879 };
880
881 #ifdef __KERNEL__
882 static int __init init_lustre_quota(void)
883 {
884         int rc = qunit_cache_init();
885         if (rc)
886                 return rc;
887         PORTAL_SYMBOL_REGISTER(filter_quota_interface);
888         PORTAL_SYMBOL_REGISTER(mds_quota_interface);
889         PORTAL_SYMBOL_REGISTER(mdc_quota_interface);
890         PORTAL_SYMBOL_REGISTER(osc_quota_interface);
891         PORTAL_SYMBOL_REGISTER(lov_quota_interface);
892         return 0;
893 }
894
895 static void /*__exit*/ exit_lustre_quota(void)
896 {
897         PORTAL_SYMBOL_UNREGISTER(filter_quota_interface);
898         PORTAL_SYMBOL_UNREGISTER(mds_quota_interface);
899         PORTAL_SYMBOL_UNREGISTER(mdc_quota_interface);
900         PORTAL_SYMBOL_UNREGISTER(osc_quota_interface);
901         PORTAL_SYMBOL_UNREGISTER(lov_quota_interface);
902
903         qunit_cache_cleanup();
904 }
905
906 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
907 MODULE_DESCRIPTION("Lustre Quota");
908 MODULE_LICENSE("GPL");
909
910 cfs_module(lquota, "1.0.0", init_lustre_quota, exit_lustre_quota);
911
912 EXPORT_SYMBOL(mds_quota_interface);
913 EXPORT_SYMBOL(filter_quota_interface);
914 EXPORT_SYMBOL(mdc_quota_interface);
915 EXPORT_SYMBOL(osc_quota_interface);
916 EXPORT_SYMBOL(lov_quota_interface);
917 #endif /* __KERNEL */