Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / quota / quota_interface.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lustre/quota/quota_interface.c
5  *
6  * Copyright (c) 2001-2005 Cluster File Systems, Inc.
7  *
8  * This file is part of Lustre, http://www.lustre.org.
9  *
10  * No redistribution or use is permitted outside of Cluster File Systems, Inc.
11  *
12  */
13 #ifndef EXPORT_SYMTAB
14 # define EXPORT_SYMTAB
15 #endif
16 #define DEBUG_SUBSYSTEM S_MDS
17
18 #ifdef __KERNEL__
19 # include <linux/version.h>
20 # include <linux/module.h>
21 # include <linux/init.h>
22 # include <linux/fs.h>
23 # include <linux/jbd.h>
24 # include <linux/ext3_fs.h>
25 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
26 #  include <linux/smp_lock.h>
27 #  include <linux/buffer_head.h>
28 #  include <linux/workqueue.h>
29 #  include <linux/mount.h>
30 # else
31 #  include <linux/locks.h>
32 # endif
33 #else /* __KERNEL__ */
34 # include <liblustre.h>
35 #endif
36
37 #include <obd_class.h>
38 #include <lustre_mds.h>
39 #include <lustre_dlm.h>
40 #include <lustre_cfg.h>
41 #include <obd_ost.h>
42 #include <lustre_fsfilt.h>
43 #include <lustre_quota.h>
44 #include <lprocfs_status.h>
45 #include "quota_internal.h"
46
47 #ifdef __KERNEL__
48
49 /* quota proc file handling functions */
50 #ifdef LPROCFS
51
52 #define USER_QUOTA      1
53 #define GROUP_QUOTA     2
54
55 #define MAX_STYPE_SIZE  5
56 int lprocfs_quota_rd_type(char *page, char **start, off_t off, int count,
57                           int *eof, void *data)
58 {
59         struct obd_device *obd = (struct obd_device *)data;
60         char stype[MAX_STYPE_SIZE + 1] = "";
61         int type = obd->u.obt.obt_qctxt.lqc_atype;
62         LASSERT(obd != NULL);
63
64         if (type == 0) {
65                 strcpy(stype, "off");
66         } else {
67                 if (type & USER_QUOTA)
68                         strcat(stype, "u");
69                 if (type & GROUP_QUOTA)
70                         strcat(stype, "g");
71         }
72
73         /* append with quota version on MDS */
74         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)) {
75                 int rc;
76                 lustre_quota_version_t version;
77
78                 rc = mds_quota_get_version(obd, &version);
79                 if (rc)
80                         return rc;
81
82                 switch (version) {
83                         case LUSTRE_QUOTA_V1:
84                                 strcat(stype, "1");
85                                 break;
86                         case LUSTRE_QUOTA_V2:
87                                 strcat(stype, "2");
88                                 break;
89                         default:
90                                 return -ENOSYS;
91                 }
92         }
93
94         return snprintf(page, count, "%s\n", stype);
95 }
96 EXPORT_SYMBOL(lprocfs_quota_rd_type);
97
98 static int auto_quota_on(struct obd_device *obd, int type,
99                          struct super_block *sb, int is_master)
100 {
101         struct obd_quotactl *oqctl;
102         struct lvfs_run_ctxt saved;
103         int rc;
104         ENTRY;
105
106         LASSERT(type == USRQUOTA || type == GRPQUOTA || type == UGQUOTA);
107
108         /* quota already turned on */
109         if (obd->u.obt.obt_qctxt.lqc_status)
110                 RETURN(0);
111
112         OBD_ALLOC_PTR(oqctl);
113         if (!oqctl)
114                 RETURN(-ENOMEM);
115
116         oqctl->qc_type = type;
117         oqctl->qc_cmd = Q_QUOTAON;
118         oqctl->qc_id = QFMT_LDISKFS;
119
120         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
121
122         if (!is_master)
123                 goto local_quota;
124
125         /* turn on cluster wide quota */
126         rc = mds_admin_quota_on(obd, oqctl);
127         if (rc) {
128                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
129                        "auto-enable admin quota failed. rc=%d\n", rc);
130                 GOTO(out_pop, rc);
131         }
132 local_quota:
133         /* turn on local quota */
134         rc = fsfilt_quotactl(obd, sb, oqctl);
135         if (rc) {
136                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
137                        "auto-enable local quota failed. rc=%d\n", rc);
138                 if (is_master)
139                         mds_quota_off(obd, oqctl);
140         } else {
141                 obd->u.obt.obt_qctxt.lqc_status = 1;
142         }
143 out_pop:
144         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
145
146         OBD_FREE_PTR(oqctl);
147         RETURN(rc);
148 }
149
150 int lprocfs_quota_wr_type(struct file *file, const char *buffer,
151                           unsigned long count, void *data)
152 {
153         struct obd_device *obd = (struct obd_device *)data;
154         struct obd_device_target *obt = &obd->u.obt;
155         int type = 0;
156         unsigned long i;
157         char stype[MAX_STYPE_SIZE + 1] = "";
158         LASSERT(obd != NULL);
159
160         if (count > MAX_STYPE_SIZE)
161                 return -EINVAL;
162
163         if (copy_from_user(stype, buffer, count))
164                 return -EFAULT;
165
166         for (i = 0 ; i < count ; i++) {
167                 int rc;
168
169                 switch (stype[i]) {
170                 case 'u' :
171                         type |= USER_QUOTA;
172                         break;
173                 case 'g' :
174                         type |= GROUP_QUOTA;
175                         break;
176                 /* quota version specifiers */
177                 case '1' :
178                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
179                                 break;
180
181                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V1);
182                         if (rc) {
183                                 CDEBUG(D_QUOTA, "failed to set quota v1! %d\n", rc);
184                                 return rc;
185                         }
186                         break;
187                 case '2' :
188                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
189                                 break;
190
191                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V2);
192                         if (rc) {
193                                 CDEBUG(D_QUOTA, "could not set quota v2! %d\n", rc);
194                                 return rc;
195                         }
196                         break;
197                 default  : /* just skip stray symbols like \n */
198                         break;
199                 }
200         }
201
202         obt->obt_qctxt.lqc_atype = type;
203
204         if (type == 0)
205                 return count;
206
207         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
208                 auto_quota_on(obd, type - 1, obt->obt_sb, 1);
209         else if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME))
210                 auto_quota_on(obd, type - 1, obt->obt_sb, 0);
211         else
212                 return -EFAULT;
213
214         return count;
215 }
216 EXPORT_SYMBOL(lprocfs_quota_wr_type);
217
218 #endif /* LPROCFS */
219
220 static int filter_quota_setup(struct obd_device *obd)
221 {
222         int rc = 0;
223         struct obd_device_target *obt = &obd->u.obt;
224         ENTRY;
225
226         atomic_set(&obt->obt_quotachecking, 1);
227         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, NULL);
228         if (rc) {
229                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
230                 RETURN(rc);
231         }
232         RETURN(rc);
233 }
234
235 static int filter_quota_cleanup(struct obd_device *obd)
236 {
237         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
238         return 0;
239 }
240
241 static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
242 {
243         struct obd_import *imp;
244
245         /* setup the quota context import */
246         spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
247         obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
248         spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
249
250         /* make imp's connect flags equal relative exp's connect flags
251          * adding it to avoid the scan export list
252          */
253         imp = exp->exp_imp_reverse;
254         if (imp)
255                 imp->imp_connect_data.ocd_connect_flags |=
256                         (exp->exp_connect_flags &
257                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
258
259         /* start quota slave recovery thread. (release high limits) */
260         qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
261         return 0;
262 }
263
264 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
265 {
266         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
267
268         /* lquota may be not set up before destroying export, b=14896 */
269         if (!obd->obd_set_up)
270                 return 0;
271
272         /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
273          * should be invalid b=12374 */
274         if (qctxt->lqc_import == exp->exp_imp_reverse) {
275                 spin_lock(&qctxt->lqc_lock);
276                 qctxt->lqc_import = NULL;
277                 spin_unlock(&qctxt->lqc_lock);
278         }
279
280         return 0;
281 }
282
283 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
284 {
285         ENTRY;
286
287         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
288                 RETURN(0);
289
290         if (ignore)
291                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
292         else
293                 cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
294
295         RETURN(0);
296 }
297
298 static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
299 {
300         struct obd_device_target *obt = &obd->u.obt;
301         int err, cnt, rc = 0;
302         struct obd_quotactl *oqctl;
303         ENTRY;
304
305         if (!sb_any_quota_enabled(obt->obt_sb))
306                 RETURN(0);
307
308         oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA);
309
310         OBD_ALLOC_PTR(oqctl);
311         if (!oqctl) {
312                 CERROR("Not enough memory!");
313                 RETURN(-ENOMEM);
314         }
315
316         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
317                 memset(oqctl, 0, sizeof(*oqctl));
318
319                 oqctl->qc_cmd = Q_GETQUOTA;
320                 oqctl->qc_type = cnt;
321                 oqctl->qc_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid;
322                 err = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
323                 if (err) {
324                         if (!rc)
325                                 rc = err;
326                         continue;
327                 }
328
329                 /* set over quota flags for a uid/gid */
330                 oa->o_valid |= (cnt == USRQUOTA) ?
331                                OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA;
332                 if (oqctl->qc_dqblk.dqb_bhardlimit &&
333                    (toqb(oqctl->qc_dqblk.dqb_curspace) >=
334                     oqctl->qc_dqblk.dqb_bhardlimit))
335                         oa->o_flags |= (cnt == USRQUOTA) ?
336                                 OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA;
337         }
338         OBD_FREE_PTR(oqctl);
339         RETURN(rc);
340 }
341
342 static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
343                                 unsigned int gid)
344 {
345         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
346         int rc;
347         ENTRY;
348
349         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
350         RETURN(rc);
351 }
352
353 /* check whether the left quota of certain uid and gid can satisfy a block_write
354  * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA */
355 static int quota_check_common(struct obd_device *obd, unsigned int uid,
356                               unsigned int gid, int count, int cycle, int isblk)
357 {
358         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
359         int i;
360         __u32 id[MAXQUOTAS] = { uid, gid };
361         struct qunit_data qdata[MAXQUOTAS];
362         int rc = 0, rc2[2] = { 0, 0 };
363         ENTRY;
364
365         CLASSERT(MAXQUOTAS < 4);
366         if (!sb_any_quota_enabled(qctxt->lqc_sb))
367                 RETURN(rc);
368
369         for (i = 0; i < MAXQUOTAS; i++) {
370                 struct lustre_qunit_size *lqs = NULL;
371
372                 qdata[i].qd_id = id[i];
373                 qdata[i].qd_flags = i;
374                 if (isblk)
375                         QDATA_SET_BLK(&qdata[i]);
376                 qdata[i].qd_count = 0;
377
378                 /* ignore root user */
379                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
380                         continue;
381
382                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
383                 if (!lqs)
384                         continue;
385
386                 rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk);
387                 spin_lock(&lqs->lqs_lock);
388                 if (!cycle) {
389                         rc = QUOTA_RET_INC_PENDING;
390                         if (isblk)
391                                 lqs->lqs_bwrite_pending += count;
392                         else
393                                 lqs->lqs_iwrite_pending += count;
394                 }
395
396                 CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
397                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
398                        qdata[i].qd_count);
399                 if (rc2[i] == QUOTA_RET_OK) {
400                         if (isblk && qdata[i].qd_count <
401                             lqs->lqs_bwrite_pending * CFS_PAGE_SIZE)
402                                 rc2[i] = QUOTA_RET_ACQUOTA;
403                         if (!isblk && qdata[i].qd_count <
404                             lqs->lqs_iwrite_pending)
405                                 rc2[i] = QUOTA_RET_ACQUOTA;
406                 }
407
408                 spin_unlock(&lqs->lqs_lock);
409
410                 /* When cycle is zero, lqs_*_pending will be changed. We will
411                  * get reference of the lqs here and put reference of lqs in
412                  * quota_pending_commit b=14784 */
413                 if (!cycle)
414                         lqs_getref(lqs);
415
416                 /* this is for quota_search_lqs */
417                 lqs_putref(lqs);
418         }
419
420         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
421                 RETURN(rc | QUOTA_RET_ACQUOTA);
422         else
423                 RETURN(rc);
424 }
425
426 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
427                                 unsigned int gid, int count, int *pending,
428                                 int isblk, quota_acquire acquire)
429 {
430         int rc = 0, cycle = 0, count_err = 0;
431         ENTRY;
432
433         /* Unfortunately, if quota master is too busy to handle the
434          * pre-dqacq in time and quota hash on ost is used up, we
435          * have to wait for the completion of in flight dqacq/dqrel,
436          * in order to get enough quota for write b=12588 */
437         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
438                QUOTA_RET_ACQUOTA) {
439
440                 if (rc & QUOTA_RET_INC_PENDING)
441                         *pending = 1;
442
443                 cycle++;
444                 if (isblk)
445                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
446                 /* after acquire(), we should run quota_check_common again
447                  * so that we confirm there are enough quota to finish write */
448                 rc = acquire(obd, uid, gid);
449
450                 /* please reference to dqacq_completion for the below */
451                 /* a new request is finished, try again */
452                 if (rc == -EAGAIN) {
453                         CDEBUG(D_QUOTA, "finish a quota req, try again\n");
454                         continue;
455                 }
456
457                 /* it is out of quota already */
458                 if (rc == -EDQUOT) {
459                         CDEBUG(D_QUOTA, "out of quota,  return -EDQUOT\n");
460                         break;
461                 }
462
463                 /* -EBUSY and others, try 10 times */
464                 if (rc < 0 && count_err < 10) {
465                         CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
466                         cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
467                         continue;
468                 }
469
470                 if (count_err >= 10 || cycle >= 1000) {
471                         CDEBUG(D_ERROR, "we meet 10 errors or run too many"
472                                " cycles when acquiring quota, quit checking with"
473                                " rc: %d, cycle: %d.\n", rc, cycle);
474                         break;
475                 }
476
477                 CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
478                        cycle);
479         }
480
481         if (!cycle && rc & QUOTA_RET_INC_PENDING)
482                 *pending = 1;
483
484         RETURN(rc);
485 }
486
487
488 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
489                               unsigned int gid, int npage, int *flag,
490                               quota_acquire acquire)
491 {
492         return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
493                                     acquire);
494 }
495
496 /* when a block_write or inode_create rpc is finished, adjust the record for
497  * pending blocks and inodes*/
498 static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
499                                 unsigned int gid, int count, int isblk)
500 {
501         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
502         int i;
503         __u32 id[MAXQUOTAS] = { uid, gid };
504         struct qunit_data qdata[MAXQUOTAS];
505         ENTRY;
506
507         CLASSERT(MAXQUOTAS < 4);
508         if (!sb_any_quota_enabled(qctxt->lqc_sb))
509                 RETURN(0);
510
511         for (i = 0; i < MAXQUOTAS; i++) {
512                 struct lustre_qunit_size *lqs = NULL;
513
514                 qdata[i].qd_id = id[i];
515                 qdata[i].qd_flags = i;
516                 if (isblk)
517                         QDATA_SET_BLK(&qdata[i]);
518                 qdata[i].qd_count = 0;
519
520                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
521                         continue;
522
523                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
524                 if (lqs) {
525                         int flag = 0;
526                         spin_lock(&lqs->lqs_lock);
527                         CDEBUG(D_QUOTA, "pending: %lu, count: %d.\n",
528                                isblk ? lqs->lqs_bwrite_pending :
529                                lqs->lqs_iwrite_pending, count);
530
531                         if (isblk) {
532                                 if (lqs->lqs_bwrite_pending >= count) {
533                                         lqs->lqs_bwrite_pending -= count;
534                                         flag = 1;
535                                 } else {
536                                         CDEBUG(D_ERROR,
537                                                "there are too many blocks!\n");
538                                 }
539                         } else {
540                                 if (lqs->lqs_iwrite_pending >= count) {
541                                         lqs->lqs_iwrite_pending -= count;
542                                         flag = 1;
543                                 } else {
544                                         CDEBUG(D_ERROR,
545                                                "there are too many files!\n");
546                                 }
547                         }
548
549                         spin_unlock(&lqs->lqs_lock);
550                         lqs_putref(lqs);
551                         /* When lqs_*_pening is changed back, we'll putref lqs
552                          * here b=14784 */
553                         if (flag)
554                                 lqs_putref(lqs);
555                 }
556         }
557
558         RETURN(0);
559 }
560
561 static int filter_quota_pending_commit(struct obd_device *obd, unsigned int uid,
562                                        unsigned int gid, int npage)
563 {
564         return quota_pending_commit(obd, uid, gid, npage, LQUOTA_FLAGS_BLK);
565 }
566
567 static int mds_quota_init(void)
568 {
569         return lustre_dquot_init();
570 }
571
572 static int mds_quota_exit(void)
573 {
574         lustre_dquot_exit();
575         return 0;
576 }
577
578 static int mds_quota_setup(struct obd_device *obd)
579 {
580         struct obd_device_target *obt = &obd->u.obt;
581         struct mds_obd *mds = &obd->u.mds;
582         int rc;
583         ENTRY;
584
585         mds->mds_quota_info.qi_version = LUSTRE_QUOTA_V2;
586         atomic_set(&obt->obt_quotachecking, 1);
587         /* initialize quota master and quota context */
588         sema_init(&mds->mds_qonoff_sem, 1);
589         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, dqacq_handler);
590         if (rc) {
591                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
592                 RETURN(rc);
593         }
594         RETURN(rc);
595 }
596
597 static int mds_quota_cleanup(struct obd_device *obd)
598 {
599         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
600         RETURN(0);
601 }
602
603 static int mds_quota_fs_cleanup(struct obd_device *obd)
604 {
605         struct mds_obd *mds = &obd->u.mds;
606         struct obd_quotactl oqctl;
607         ENTRY;
608
609         memset(&oqctl, 0, sizeof(oqctl));
610         oqctl.qc_type = UGQUOTA;
611
612         down(&mds->mds_qonoff_sem);
613         mds_admin_quota_off(obd, &oqctl);
614         up(&mds->mds_qonoff_sem);
615         RETURN(0);
616 }
617
618 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
619                            unsigned int gid, int inodes, int *flag,
620                            quota_acquire acquire)
621 {
622         return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
623 }
624
625 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
626                              unsigned int gid)
627 {
628         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
629         int rc;
630         ENTRY;
631
632         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
633         RETURN(rc);
634 }
635
636 static int mds_quota_pending_commit(struct obd_device *obd, unsigned int uid,
637                                     unsigned int gid, int inodes)
638 {
639         return quota_pending_commit(obd, uid, gid, inodes, 0);
640 }
641 #endif /* __KERNEL__ */
642
643 struct osc_quota_info {
644         struct list_head        oqi_hash;       /* hash list */
645         struct client_obd      *oqi_cli;        /* osc obd */
646         unsigned int            oqi_id;         /* uid/gid of a file */
647         short                   oqi_type;       /* quota type */
648 };
649
650 spinlock_t qinfo_list_lock = SPIN_LOCK_UNLOCKED;
651
652 static struct list_head qinfo_hash[NR_DQHASH];
653 /* SLAB cache for client quota context */
654 cfs_mem_cache_t *qinfo_cachep = NULL;
655
656 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
657                          __attribute__((__const__));
658
659 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
660 {
661         unsigned long tmp = ((unsigned long)cli>>6) ^ id;
662         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
663         return tmp;
664 }
665
666 /* caller must hold qinfo_list_lock */
667 static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
668 {
669         struct list_head *head = qinfo_hash +
670                 hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
671
672         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
673         list_add(&oqi->oqi_hash, head);
674 }
675
676 /* caller must hold qinfo_list_lock */
677 static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
678 {
679         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
680         list_del_init(&oqi->oqi_hash);
681 }
682
683 /* caller must hold qinfo_list_lock */
684 static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
685                                                 unsigned int id, int type)
686 {
687         unsigned int hashent = hashfn(cli, id, type);
688         struct osc_quota_info *oqi;
689
690         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
691         list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
692                 if (oqi->oqi_cli == cli &&
693                     oqi->oqi_id == id && oqi->oqi_type == type)
694                         return oqi;
695         }
696         return NULL;
697 }
698
699 static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
700                                           unsigned int id, int type)
701 {
702         struct osc_quota_info *oqi;
703         ENTRY;
704
705         OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_STD, sizeof(*oqi));
706         if(!oqi)
707                 RETURN(NULL);
708
709         CFS_INIT_LIST_HEAD(&oqi->oqi_hash);
710         oqi->oqi_cli = cli;
711         oqi->oqi_id = id;
712         oqi->oqi_type = type;
713
714         RETURN(oqi);
715 }
716
717 static void free_qinfo(struct osc_quota_info *oqi)
718 {
719         OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
720 }
721
722 int osc_quota_chkdq(struct client_obd *cli, unsigned int uid, unsigned int gid)
723 {
724         unsigned int id;
725         int cnt, rc = QUOTA_OK;
726         ENTRY;
727
728         spin_lock(&qinfo_list_lock);
729         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
730                 struct osc_quota_info *oqi = NULL;
731
732                 id = (cnt == USRQUOTA) ? uid : gid;
733                 oqi = find_qinfo(cli, id, cnt);
734                 if (oqi) {
735                         rc = NO_QUOTA;
736                         break;
737                 }
738         }
739         spin_unlock(&qinfo_list_lock);
740
741         RETURN(rc);
742 }
743
744 int osc_quota_setdq(struct client_obd *cli, unsigned int uid, unsigned int gid,
745                     obd_flag valid, obd_flag flags)
746 {
747         unsigned int id;
748         obd_flag noquota;
749         int cnt, rc = 0;
750         ENTRY;
751
752
753         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
754                 struct osc_quota_info *oqi, *old;
755
756                 if (!(valid & ((cnt == USRQUOTA) ?
757                     OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
758                         continue;
759
760                 id = (cnt == USRQUOTA) ? uid : gid;
761                 noquota = (cnt == USRQUOTA) ?
762                     (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
763
764                 oqi = alloc_qinfo(cli, id, cnt);
765                 if (oqi) {
766                         spin_lock(&qinfo_list_lock);
767
768                         old = find_qinfo(cli, id, cnt);
769                         if (old && !noquota)
770                                 remove_qinfo_hash(old);
771                         else if (!old && noquota)
772                                 insert_qinfo_hash(oqi);
773
774                         spin_unlock(&qinfo_list_lock);
775
776                         if (old || !noquota)
777                                 free_qinfo(oqi);
778                         if (old && !noquota)
779                                 free_qinfo(old);
780                 } else {
781                         CERROR("not enough mem!\n");
782                         rc = -ENOMEM;
783                         break;
784                 }
785         }
786
787         RETURN(rc);
788 }
789
790 int osc_quota_cleanup(struct obd_device *obd)
791 {
792         struct client_obd *cli = &obd->u.cli;
793         struct osc_quota_info *oqi, *n;
794         int i;
795         ENTRY;
796
797         spin_lock(&qinfo_list_lock);
798         for (i = 0; i < NR_DQHASH; i++) {
799                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
800                         if (oqi->oqi_cli != cli)
801                                 continue;
802                         remove_qinfo_hash(oqi);
803                         free_qinfo(oqi);
804                 }
805         }
806         spin_unlock(&qinfo_list_lock);
807
808         RETURN(0);
809 }
810
811 int osc_quota_init(void)
812 {
813         int i;
814         ENTRY;
815
816         LASSERT(qinfo_cachep == NULL);
817         qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
818                                             sizeof(struct osc_quota_info),
819                                             0, 0);
820         if (!qinfo_cachep)
821                 RETURN(-ENOMEM);
822
823         for (i = 0; i < NR_DQHASH; i++)
824                 CFS_INIT_LIST_HEAD(qinfo_hash + i);
825
826         RETURN(0);
827 }
828
829 int osc_quota_exit(void)
830 {
831         struct osc_quota_info *oqi, *n;
832         int i, rc;
833         ENTRY;
834
835         spin_lock(&qinfo_list_lock);
836         for (i = 0; i < NR_DQHASH; i++) {
837                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
838                         remove_qinfo_hash(oqi);
839                         free_qinfo(oqi);
840                 }
841         }
842         spin_unlock(&qinfo_list_lock);
843
844         rc = cfs_mem_cache_destroy(qinfo_cachep);
845         LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
846         qinfo_cachep = NULL;
847
848         RETURN(0);
849 }
850
851 #ifdef __KERNEL__
852 quota_interface_t mds_quota_interface = {
853         .quota_init     = mds_quota_init,
854         .quota_exit     = mds_quota_exit,
855         .quota_setup    = mds_quota_setup,
856         .quota_cleanup  = mds_quota_cleanup,
857         .quota_check    = target_quota_check,
858         .quota_ctl      = mds_quota_ctl,
859         .quota_fs_cleanup       =mds_quota_fs_cleanup,
860         .quota_recovery = mds_quota_recovery,
861         .quota_adjust   = mds_quota_adjust,
862         .quota_chkquota = mds_quota_check,
863         .quota_acquire  = mds_quota_acquire,
864         .quota_pending_commit = mds_quota_pending_commit,
865 };
866
867 quota_interface_t filter_quota_interface = {
868         .quota_setup    = filter_quota_setup,
869         .quota_cleanup  = filter_quota_cleanup,
870         .quota_check    = target_quota_check,
871         .quota_ctl      = filter_quota_ctl,
872         .quota_setinfo  = filter_quota_setinfo,
873         .quota_clearinfo = filter_quota_clearinfo,
874         .quota_enforce  = filter_quota_enforce,
875         .quota_getflag  = filter_quota_getflag,
876         .quota_acquire  = filter_quota_acquire,
877         .quota_adjust   = filter_quota_adjust,
878         .quota_chkquota = filter_quota_check,
879         .quota_adjust_qunit   = filter_quota_adjust_qunit,
880         .quota_pending_commit = filter_quota_pending_commit,
881 };
882 #endif /* __KERNEL__ */
883
884 quota_interface_t mdc_quota_interface = {
885         .quota_ctl      = client_quota_ctl,
886         .quota_check    = client_quota_check,
887         .quota_poll_check = client_quota_poll_check,
888 };
889
890 quota_interface_t osc_quota_interface = {
891         .quota_ctl      = client_quota_ctl,
892         .quota_check    = client_quota_check,
893         .quota_poll_check = client_quota_poll_check,
894         .quota_init     = osc_quota_init,
895         .quota_exit     = osc_quota_exit,
896         .quota_chkdq    = osc_quota_chkdq,
897         .quota_setdq    = osc_quota_setdq,
898         .quota_cleanup  = osc_quota_cleanup,
899         .quota_adjust_qunit = client_quota_adjust_qunit,
900 };
901
902 quota_interface_t lov_quota_interface = {
903         .quota_check    = lov_quota_check,
904         .quota_ctl      = lov_quota_ctl,
905         .quota_adjust_qunit = lov_quota_adjust_qunit,
906 };
907
908 #ifdef __KERNEL__
909 static int __init init_lustre_quota(void)
910 {
911         int rc = qunit_cache_init();
912         if (rc)
913                 return rc;
914         PORTAL_SYMBOL_REGISTER(filter_quota_interface);
915         PORTAL_SYMBOL_REGISTER(mds_quota_interface);
916         PORTAL_SYMBOL_REGISTER(mdc_quota_interface);
917         PORTAL_SYMBOL_REGISTER(osc_quota_interface);
918         PORTAL_SYMBOL_REGISTER(lov_quota_interface);
919         return 0;
920 }
921
922 static void /*__exit*/ exit_lustre_quota(void)
923 {
924         PORTAL_SYMBOL_UNREGISTER(filter_quota_interface);
925         PORTAL_SYMBOL_UNREGISTER(mds_quota_interface);
926         PORTAL_SYMBOL_UNREGISTER(mdc_quota_interface);
927         PORTAL_SYMBOL_UNREGISTER(osc_quota_interface);
928         PORTAL_SYMBOL_UNREGISTER(lov_quota_interface);
929
930         qunit_cache_cleanup();
931 }
932
933 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
934 MODULE_DESCRIPTION("Lustre Quota");
935 MODULE_LICENSE("GPL");
936
937 cfs_module(lquota, "1.0.0", init_lustre_quota, exit_lustre_quota);
938
939 EXPORT_SYMBOL(mds_quota_interface);
940 EXPORT_SYMBOL(filter_quota_interface);
941 EXPORT_SYMBOL(mdc_quota_interface);
942 EXPORT_SYMBOL(osc_quota_interface);
943 EXPORT_SYMBOL(lov_quota_interface);
944 #endif /* __KERNEL */