Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / quota / quota_interface.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lustre/quota/quota_interface.c
5  *
6  * Copyright (c) 2001-2005 Cluster File Systems, Inc.
7  *
8  * This file is part of Lustre, http://www.lustre.org.
9  *
10  * No redistribution or use is permitted outside of Cluster File Systems, Inc.
11  *
12  */
13 #ifndef EXPORT_SYMTAB
14 # define EXPORT_SYMTAB
15 #endif
16 #define DEBUG_SUBSYSTEM S_MDS
17
18 #ifdef __KERNEL__
19 # include <linux/version.h>
20 # include <linux/module.h>
21 # include <linux/init.h>
22 # include <linux/fs.h>
23 # include <linux/jbd.h>
24 # include <linux/ext3_fs.h>
25 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
26 #  include <linux/smp_lock.h>
27 #  include <linux/buffer_head.h>
28 #  include <linux/workqueue.h>
29 #  include <linux/mount.h>
30 # else
31 #  include <linux/locks.h>
32 # endif
33 #else /* __KERNEL__ */
34 # include <liblustre.h>
35 #endif
36
37 #include <obd_class.h>
38 #include <lustre_mds.h>
39 #include <lustre_dlm.h>
40 #include <lustre_cfg.h>
41 #include <obd_ost.h>
42 #include <lustre_fsfilt.h>
43 #include <lustre_quota.h>
44 #include <lprocfs_status.h>
45 #include "quota_internal.h"
46
47 #ifdef __KERNEL__
48
49 /* quota proc file handling functions */
50 #ifdef LPROCFS
51
52 #define USER_QUOTA      1
53 #define GROUP_QUOTA     2
54
55 #define MAX_STYPE_SIZE  5
56 int lprocfs_quota_rd_type(char *page, char **start, off_t off, int count,
57                           int *eof, void *data)
58 {
59         struct obd_device *obd = (struct obd_device *)data;
60         char stype[MAX_STYPE_SIZE + 1] = "";
61         int type = obd->u.obt.obt_qctxt.lqc_atype;
62         LASSERT(obd != NULL);
63
64         if (type == 0) {
65                 strcpy(stype, "off");
66         } else {
67                 if (type & USER_QUOTA)
68                         strcat(stype, "u");
69                 if (type & GROUP_QUOTA)
70                         strcat(stype, "g");
71         }
72
73         /* append with quota version on MDS */
74         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)) {
75                 int rc;
76                 lustre_quota_version_t version;
77
78                 rc = mds_quota_get_version(obd, &version);
79                 if (rc)
80                         return rc;
81
82                 switch (version) {
83                         case LUSTRE_QUOTA_V1:
84                                 strcat(stype, "1");
85                                 break;
86                         case LUSTRE_QUOTA_V2:
87                                 strcat(stype, "2");
88                                 break;
89                         default:
90                                 return -ENOSYS;
91                 }
92         }
93
94         return snprintf(page, count, "%s\n", stype);
95 }
96 EXPORT_SYMBOL(lprocfs_quota_rd_type);
97
98 static int auto_quota_on(struct obd_device *obd, int type,
99                          struct super_block *sb, int is_master)
100 {
101         struct obd_quotactl *oqctl;
102         struct lvfs_run_ctxt saved;
103         int rc;
104         ENTRY;
105
106         LASSERT(type == USRQUOTA || type == GRPQUOTA || type == UGQUOTA);
107
108         /* quota already turned on */
109         if (obd->u.obt.obt_qctxt.lqc_status)
110                 RETURN(0);
111
112         OBD_ALLOC_PTR(oqctl);
113         if (!oqctl)
114                 RETURN(-ENOMEM);
115
116         oqctl->qc_type = type;
117         oqctl->qc_cmd = Q_QUOTAON;
118         oqctl->qc_id = QFMT_LDISKFS;
119
120         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
121
122         if (!is_master)
123                 goto local_quota;
124
125         /* turn on cluster wide quota */
126         rc = mds_admin_quota_on(obd, oqctl);
127         if (rc) {
128                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
129                        "auto-enable admin quota failed. rc=%d\n", rc);
130                 GOTO(out_pop, rc);
131         }
132 local_quota:
133         /* turn on local quota */
134         rc = fsfilt_quotactl(obd, sb, oqctl);
135         if (rc) {
136                 CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR,
137                        "auto-enable local quota failed. rc=%d\n", rc);
138                 if (is_master)
139                         mds_quota_off(obd, oqctl);
140         } else {
141                 obd->u.obt.obt_qctxt.lqc_status = 1;
142         }
143 out_pop:
144         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
145
146         OBD_FREE_PTR(oqctl);
147         RETURN(rc);
148 }
149
150 int lprocfs_quota_wr_type(struct file *file, const char *buffer,
151                           unsigned long count, void *data)
152 {
153         struct obd_device *obd = (struct obd_device *)data;
154         struct obd_device_target *obt = &obd->u.obt;
155         int type = 0;
156         unsigned long i;
157         char stype[MAX_STYPE_SIZE + 1] = "";
158         LASSERT(obd != NULL);
159
160         if (count > MAX_STYPE_SIZE)
161                 return -EINVAL;
162
163         if (copy_from_user(stype, buffer, count))
164                 return -EFAULT;
165
166         for (i = 0 ; i < count ; i++) {
167                 int rc;
168
169                 switch (stype[i]) {
170                 case 'u' :
171                         type |= USER_QUOTA;
172                         break;
173                 case 'g' :
174                         type |= GROUP_QUOTA;
175                         break;
176                 /* quota version specifiers */
177                 case '1' :
178                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
179                                 break;
180
181                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V1);
182                         if (rc) {
183                                 CDEBUG(D_QUOTA, "failed to set quota v1! %d\n", rc);
184                                 return rc;
185                         }
186                         break;
187                 case '2' :
188                         if (strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
189                                 break;
190
191                         rc = mds_quota_set_version(obd, LUSTRE_QUOTA_V2);
192                         if (rc) {
193                                 CDEBUG(D_QUOTA, "could not set quota v2! %d\n", rc);
194                                 return rc;
195                         }
196                         break;
197                 default  : /* just skip stray symbols like \n */
198                         break;
199                 }
200         }
201
202         obt->obt_qctxt.lqc_atype = type;
203
204         if (type == 0)
205                 return count;
206
207         if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME))
208                 auto_quota_on(obd, type - 1, obt->obt_sb, 1);
209         else if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME))
210                 auto_quota_on(obd, type - 1, obt->obt_sb, 0);
211         else
212                 return -EFAULT;
213
214         return count;
215 }
216 EXPORT_SYMBOL(lprocfs_quota_wr_type);
217
218 #endif /* LPROCFS */
219
220 static int filter_quota_setup(struct obd_device *obd)
221 {
222         int rc = 0;
223         struct obd_device_target *obt = &obd->u.obt;
224         ENTRY;
225
226         atomic_set(&obt->obt_quotachecking, 1);
227         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, NULL);
228         if (rc) {
229                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
230                 RETURN(rc);
231         }
232         RETURN(rc);
233 }
234
235 static int filter_quota_cleanup(struct obd_device *obd)
236 {
237         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
238         return 0;
239 }
240
241 static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
242 {
243         struct obd_import *imp;
244
245         /* setup the quota context import */
246         spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
247         obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
248         spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
249
250         /* make imp's connect flags equal relative exp's connect flags
251          * adding it to avoid the scan export list
252          */
253         imp = exp->exp_imp_reverse;
254         if (imp)
255                 imp->imp_connect_data.ocd_connect_flags |=
256                         (exp->exp_connect_flags &
257                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
258
259         /* start quota slave recovery thread. (release high limits) */
260         qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
261         return 0;
262 }
263
264 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
265 {
266         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
267
268         /* lquota may be not set up before destroying export, b=14896 */
269         if (!obd->obd_set_up)
270                 return 0;
271
272         /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import
273          * should be invalid b=12374 */
274         if (qctxt->lqc_import == exp->exp_imp_reverse) {
275                 spin_lock(&qctxt->lqc_lock);
276                 qctxt->lqc_import = NULL;
277                 spin_unlock(&qctxt->lqc_lock);
278         }
279
280         return 0;
281 }
282
283 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
284 {
285         ENTRY;
286
287         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
288                 RETURN(0);
289
290         if (ignore)
291                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
292         else
293                 cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
294
295         RETURN(0);
296 }
297
298 static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
299 {
300         struct obd_device_target *obt = &obd->u.obt;
301         int err, cnt, rc = 0;
302         struct obd_quotactl *oqctl;
303         ENTRY;
304
305         if (!sb_any_quota_enabled(obt->obt_sb))
306                 RETURN(0);
307
308         oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA);
309
310         OBD_ALLOC_PTR(oqctl);
311         if (!oqctl) {
312                 CERROR("Not enough memory!");
313                 RETURN(-ENOMEM);
314         }
315
316         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
317                 memset(oqctl, 0, sizeof(*oqctl));
318
319                 oqctl->qc_cmd = Q_GETQUOTA;
320                 oqctl->qc_type = cnt;
321                 oqctl->qc_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid;
322                 err = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
323                 if (err) {
324                         if (!rc)
325                                 rc = err;
326                         continue;
327                 }
328
329                 /* set over quota flags for a uid/gid */
330                 oa->o_valid |= (cnt == USRQUOTA) ?
331                                OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA;
332                 if (oqctl->qc_dqblk.dqb_bhardlimit &&
333                    (toqb(oqctl->qc_dqblk.dqb_curspace) >=
334                     oqctl->qc_dqblk.dqb_bhardlimit))
335                         oa->o_flags |= (cnt == USRQUOTA) ?
336                                 OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA;
337         }
338         OBD_FREE_PTR(oqctl);
339         RETURN(rc);
340 }
341
342 static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
343                                 unsigned int gid)
344 {
345         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
346         int rc;
347         ENTRY;
348
349         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
350         RETURN(rc);
351 }
352
353 /* check whether the left quota of certain uid and gid can satisfy a block_write
354  * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA */
355 static int quota_check_common(struct obd_device *obd, unsigned int uid,
356                               unsigned int gid, int count, int cycle, int isblk)
357 {
358         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
359         int i;
360         __u32 id[MAXQUOTAS] = { uid, gid };
361         struct qunit_data qdata[MAXQUOTAS];
362         int rc = 0, rc2[2] = { 0, 0 };
363         ENTRY;
364
365         CLASSERT(MAXQUOTAS < 4);
366         if (!sb_any_quota_enabled(qctxt->lqc_sb))
367                 RETURN(rc);
368
369         for (i = 0; i < MAXQUOTAS; i++) {
370                 struct lustre_qunit_size *lqs = NULL;
371
372                 qdata[i].qd_id = id[i];
373                 qdata[i].qd_flags = i;
374                 if (isblk)
375                         QDATA_SET_BLK(&qdata[i]);
376                 qdata[i].qd_count = 0;
377
378                 /* ignore root user */
379                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
380                         continue;
381
382                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
383                 if (!lqs)
384                         continue;
385
386                 rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk);
387                 spin_lock(&lqs->lqs_lock);
388                 if (!cycle) {
389                         rc = QUOTA_RET_INC_PENDING;
390                         if (isblk)
391                                 lqs->lqs_bwrite_pending += count;
392                         else
393                                 lqs->lqs_iwrite_pending += count;
394                 }
395
396                 CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
397                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
398                        qdata[i].qd_count);
399                 if (rc2[i] == QUOTA_RET_OK) {
400                         if (isblk && qdata[i].qd_count <
401                             lqs->lqs_bwrite_pending * CFS_PAGE_SIZE)
402                                 rc2[i] = QUOTA_RET_ACQUOTA;
403                         if (!isblk && qdata[i].qd_count <
404                             lqs->lqs_iwrite_pending)
405                                 rc2[i] = QUOTA_RET_ACQUOTA;
406                 }
407
408                 spin_unlock(&lqs->lqs_lock);
409
410                 /* When cycle is zero, lqs_*_pending will be changed. We will
411                  * get reference of the lqs here and put reference of lqs in
412                  * quota_pending_commit b=14784 */
413                 if (!cycle)
414                         lqs_getref(lqs);
415
416                 /* this is for quota_search_lqs */
417                 lqs_putref(lqs);
418         }
419
420         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
421                 RETURN(rc | QUOTA_RET_ACQUOTA);
422         else
423                 RETURN(rc);
424 }
425
426 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
427                                 unsigned int gid, int count, int *pending,
428                                 int isblk, quota_acquire acquire)
429 {
430         int rc = 0, cycle = 0, count_err = 0;
431         ENTRY;
432
433         /* Unfortunately, if quota master is too busy to handle the
434          * pre-dqacq in time and quota hash on ost is used up, we
435          * have to wait for the completion of in flight dqacq/dqrel,
436          * in order to get enough quota for write b=12588 */
437         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
438                QUOTA_RET_ACQUOTA) {
439
440                 if (rc & QUOTA_RET_INC_PENDING)
441                         *pending = 1;
442
443                 cycle++;
444                 if (isblk)
445                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
446                 /* after acquire(), we should run quota_check_common again
447                  * so that we confirm there are enough quota to finish write */
448                 rc = acquire(obd, uid, gid);
449
450                 /* please reference to dqacq_completion for the below */
451                 /* a new request is finished, try again */
452                 if (rc == -EAGAIN) {
453                         CDEBUG(D_QUOTA, "finish a quota req, try again\n");
454                         continue;
455                 }
456
457                 /* it is out of quota already */
458                 if (rc == -EDQUOT) {
459                         CDEBUG(D_QUOTA, "out of quota,  return -EDQUOT\n");
460                         break;
461                 }
462
463                 /* -EBUSY and others, try 10 times */
464                 if (rc < 0 && count_err < 10) {
465                         CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
466                         cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
467                         continue;
468                 }
469
470                 if (count_err >= 10 || cycle >= 1000) {
471                         CDEBUG(D_ERROR, "we meet 10 errors or run too many"
472                                " cycles when acquiring quota, quit checking with"
473                                " rc: %d, cycle: %d.\n", rc, cycle);
474                         break;
475                 }
476
477                 CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
478                        cycle);
479         }
480
481         if (!cycle && rc & QUOTA_RET_INC_PENDING)
482                 *pending = 1;
483
484         RETURN(rc);
485 }
486
487
488 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
489                               unsigned int gid, int npage, int *flag,
490                               quota_acquire acquire)
491 {
492         return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
493                                     acquire);
494 }
495
496 /* when a block_write or inode_create rpc is finished, adjust the record for
497  * pending blocks and inodes*/
498 static int quota_pending_commit(struct obd_device *obd, unsigned int uid,
499                                 unsigned int gid, int count, int isblk)
500 {
501         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
502         int i;
503         __u32 id[MAXQUOTAS] = { uid, gid };
504         struct qunit_data qdata[MAXQUOTAS];
505         ENTRY;
506
507         CLASSERT(MAXQUOTAS < 4);
508         if (!sb_any_quota_enabled(qctxt->lqc_sb))
509                 RETURN(0);
510
511         for (i = 0; i < MAXQUOTAS; i++) {
512                 struct lustre_qunit_size *lqs = NULL;
513
514                 qdata[i].qd_id = id[i];
515                 qdata[i].qd_flags = i;
516                 if (isblk)
517                         QDATA_SET_BLK(&qdata[i]);
518                 qdata[i].qd_count = 0;
519
520                 if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i]))
521                         continue;
522
523                 quota_search_lqs(&qdata[i], NULL, qctxt, &lqs);
524                 if (lqs) {
525                         int flag = 0;
526                         spin_lock(&lqs->lqs_lock);
527                         CDEBUG(D_QUOTA, "pending: %lu, count: %d.\n",
528                                isblk ? lqs->lqs_bwrite_pending :
529                                lqs->lqs_iwrite_pending, count);
530
531                         if (isblk) {
532                                 if (lqs->lqs_bwrite_pending >= count) {
533                                         lqs->lqs_bwrite_pending -= count;
534                                         flag = 1;
535                                 } else {
536                                         CDEBUG(D_ERROR,
537                                                "there are too many blocks!\n");
538                                 }
539                         } else {
540                                 if (lqs->lqs_iwrite_pending >= count) {
541                                         lqs->lqs_iwrite_pending -= count;
542                                         flag = 1;
543                                 } else {
544                                         CDEBUG(D_ERROR,
545                                                "there are too many files!\n");
546                                 }
547                         }
548
549                         spin_unlock(&lqs->lqs_lock);
550                         lqs_putref(lqs);
551                         /* When lqs_*_pening is changed back, we'll putref lqs
552                          * here b=14784 */
553                         if (flag)
554                                 lqs_putref(lqs);
555                 }
556         }
557
558         RETURN(0);
559 }
560
561 static int filter_quota_pending_commit(struct obd_device *obd, unsigned int uid,
562                                        unsigned int gid, int npage)
563 {
564         return quota_pending_commit(obd, uid, gid, npage, LQUOTA_FLAGS_BLK);
565 }
566
567 static int mds_quota_init(void)
568 {
569         return lustre_dquot_init();
570 }
571
572 static int mds_quota_exit(void)
573 {
574         lustre_dquot_exit();
575         return 0;
576 }
577
578 static int mds_quota_setup(struct obd_device *obd)
579 {
580         struct obd_device_target *obt = &obd->u.obt;
581         struct mds_obd *mds = &obd->u.mds;
582         int rc;
583         ENTRY;
584
585         mds->mds_quota_info.qi_version = LUSTRE_QUOTA_V2;
586         atomic_set(&obt->obt_quotachecking, 1);
587         /* initialize quota master and quota context */
588         sema_init(&mds->mds_qonoff_sem, 1);
589         rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, dqacq_handler);
590         if (rc) {
591                 CERROR("initialize quota context failed! (rc:%d)\n", rc);
592                 RETURN(rc);
593         }
594         RETURN(rc);
595 }
596
597 static int mds_quota_cleanup(struct obd_device *obd)
598 {
599         qctxt_cleanup(&obd->u.obt.obt_qctxt, 0);
600         RETURN(0);
601 }
602
603 static int mds_quota_fs_cleanup(struct obd_device *obd)
604 {
605         struct mds_obd *mds = &obd->u.mds;
606         int i;
607         ENTRY;
608
609         /* close admin quota files */
610         down(&mds->mds_qonoff_sem);
611         for (i = 0; i < MAXQUOTAS; i++) {
612                 if (mds->mds_quota_info.qi_files[i]) {
613                         filp_close(mds->mds_quota_info.qi_files[i], 0);
614                         mds->mds_quota_info.qi_files[i] = NULL;
615                 }
616         }
617         up(&mds->mds_qonoff_sem);
618         RETURN(0);
619 }
620
621 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
622                            unsigned int gid, int inodes, int *flag,
623                            quota_acquire acquire)
624 {
625         return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
626 }
627
628 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
629                              unsigned int gid)
630 {
631         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
632         int rc;
633         ENTRY;
634
635         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
636         RETURN(rc);
637 }
638
639 static int mds_quota_pending_commit(struct obd_device *obd, unsigned int uid,
640                                     unsigned int gid, int inodes)
641 {
642         return quota_pending_commit(obd, uid, gid, inodes, 0);
643 }
644 #endif /* __KERNEL__ */
645
646 struct osc_quota_info {
647         struct list_head        oqi_hash;       /* hash list */
648         struct client_obd      *oqi_cli;        /* osc obd */
649         unsigned int            oqi_id;         /* uid/gid of a file */
650         short                   oqi_type;       /* quota type */
651 };
652
653 spinlock_t qinfo_list_lock = SPIN_LOCK_UNLOCKED;
654
655 static struct list_head qinfo_hash[NR_DQHASH];
656 /* SLAB cache for client quota context */
657 cfs_mem_cache_t *qinfo_cachep = NULL;
658
659 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
660                          __attribute__((__const__));
661
662 static inline int hashfn(struct client_obd *cli, unsigned long id, int type)
663 {
664         unsigned long tmp = ((unsigned long)cli>>6) ^ id;
665         tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH;
666         return tmp;
667 }
668
669 /* caller must hold qinfo_list_lock */
670 static inline void insert_qinfo_hash(struct osc_quota_info *oqi)
671 {
672         struct list_head *head = qinfo_hash +
673                 hashfn(oqi->oqi_cli, oqi->oqi_id, oqi->oqi_type);
674
675         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
676         list_add(&oqi->oqi_hash, head);
677 }
678
679 /* caller must hold qinfo_list_lock */
680 static inline void remove_qinfo_hash(struct osc_quota_info *oqi)
681 {
682         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
683         list_del_init(&oqi->oqi_hash);
684 }
685
686 /* caller must hold qinfo_list_lock */
687 static inline struct osc_quota_info *find_qinfo(struct client_obd *cli,
688                                                 unsigned int id, int type)
689 {
690         unsigned int hashent = hashfn(cli, id, type);
691         struct osc_quota_info *oqi;
692
693         LASSERT_SPIN_LOCKED(&qinfo_list_lock);
694         list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) {
695                 if (oqi->oqi_cli == cli &&
696                     oqi->oqi_id == id && oqi->oqi_type == type)
697                         return oqi;
698         }
699         return NULL;
700 }
701
702 static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
703                                           unsigned int id, int type)
704 {
705         struct osc_quota_info *oqi;
706         ENTRY;
707
708         OBD_SLAB_ALLOC(oqi, qinfo_cachep, CFS_ALLOC_STD, sizeof(*oqi));
709         if(!oqi)
710                 RETURN(NULL);
711
712         CFS_INIT_LIST_HEAD(&oqi->oqi_hash);
713         oqi->oqi_cli = cli;
714         oqi->oqi_id = id;
715         oqi->oqi_type = type;
716
717         RETURN(oqi);
718 }
719
720 static void free_qinfo(struct osc_quota_info *oqi)
721 {
722         OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
723 }
724
725 int osc_quota_chkdq(struct client_obd *cli, unsigned int uid, unsigned int gid)
726 {
727         unsigned int id;
728         int cnt, rc = QUOTA_OK;
729         ENTRY;
730
731         spin_lock(&qinfo_list_lock);
732         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
733                 struct osc_quota_info *oqi = NULL;
734
735                 id = (cnt == USRQUOTA) ? uid : gid;
736                 oqi = find_qinfo(cli, id, cnt);
737                 if (oqi) {
738                         rc = NO_QUOTA;
739                         break;
740                 }
741         }
742         spin_unlock(&qinfo_list_lock);
743
744         RETURN(rc);
745 }
746
747 int osc_quota_setdq(struct client_obd *cli, unsigned int uid, unsigned int gid,
748                     obd_flag valid, obd_flag flags)
749 {
750         unsigned int id;
751         obd_flag noquota;
752         int cnt, rc = 0;
753         ENTRY;
754
755
756         for (cnt = 0; cnt < MAXQUOTAS; cnt++) {
757                 struct osc_quota_info *oqi, *old;
758
759                 if (!(valid & ((cnt == USRQUOTA) ?
760                     OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA)))
761                         continue;
762
763                 id = (cnt == USRQUOTA) ? uid : gid;
764                 noquota = (cnt == USRQUOTA) ?
765                     (flags & OBD_FL_NO_USRQUOTA) : (flags & OBD_FL_NO_GRPQUOTA);
766
767                 oqi = alloc_qinfo(cli, id, cnt);
768                 if (oqi) {
769                         spin_lock(&qinfo_list_lock);
770
771                         old = find_qinfo(cli, id, cnt);
772                         if (old && !noquota)
773                                 remove_qinfo_hash(old);
774                         else if (!old && noquota)
775                                 insert_qinfo_hash(oqi);
776
777                         spin_unlock(&qinfo_list_lock);
778
779                         if (old || !noquota)
780                                 free_qinfo(oqi);
781                         if (old && !noquota)
782                                 free_qinfo(old);
783                 } else {
784                         CERROR("not enough mem!\n");
785                         rc = -ENOMEM;
786                         break;
787                 }
788         }
789
790         RETURN(rc);
791 }
792
793 int osc_quota_cleanup(struct obd_device *obd)
794 {
795         struct client_obd *cli = &obd->u.cli;
796         struct osc_quota_info *oqi, *n;
797         int i;
798         ENTRY;
799
800         spin_lock(&qinfo_list_lock);
801         for (i = 0; i < NR_DQHASH; i++) {
802                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
803                         if (oqi->oqi_cli != cli)
804                                 continue;
805                         remove_qinfo_hash(oqi);
806                         free_qinfo(oqi);
807                 }
808         }
809         spin_unlock(&qinfo_list_lock);
810
811         RETURN(0);
812 }
813
814 int osc_quota_init(void)
815 {
816         int i;
817         ENTRY;
818
819         LASSERT(qinfo_cachep == NULL);
820         qinfo_cachep = cfs_mem_cache_create("osc_quota_info",
821                                             sizeof(struct osc_quota_info),
822                                             0, 0);
823         if (!qinfo_cachep)
824                 RETURN(-ENOMEM);
825
826         for (i = 0; i < NR_DQHASH; i++)
827                 CFS_INIT_LIST_HEAD(qinfo_hash + i);
828
829         RETURN(0);
830 }
831
832 int osc_quota_exit(void)
833 {
834         struct osc_quota_info *oqi, *n;
835         int i, rc;
836         ENTRY;
837
838         spin_lock(&qinfo_list_lock);
839         for (i = 0; i < NR_DQHASH; i++) {
840                 list_for_each_entry_safe(oqi, n, &qinfo_hash[i], oqi_hash) {
841                         remove_qinfo_hash(oqi);
842                         free_qinfo(oqi);
843                 }
844         }
845         spin_unlock(&qinfo_list_lock);
846
847         rc = cfs_mem_cache_destroy(qinfo_cachep);
848         LASSERTF(rc == 0, "couldn't destory qinfo_cachep slab\n");
849         qinfo_cachep = NULL;
850
851         RETURN(0);
852 }
853
854 #ifdef __KERNEL__
855 quota_interface_t mds_quota_interface = {
856         .quota_init     = mds_quota_init,
857         .quota_exit     = mds_quota_exit,
858         .quota_setup    = mds_quota_setup,
859         .quota_cleanup  = mds_quota_cleanup,
860         .quota_check    = target_quota_check,
861         .quota_ctl      = mds_quota_ctl,
862         .quota_fs_cleanup       =mds_quota_fs_cleanup,
863         .quota_recovery = mds_quota_recovery,
864         .quota_adjust   = mds_quota_adjust,
865         .quota_chkquota = mds_quota_check,
866         .quota_acquire  = mds_quota_acquire,
867         .quota_pending_commit = mds_quota_pending_commit,
868 };
869
870 quota_interface_t filter_quota_interface = {
871         .quota_setup    = filter_quota_setup,
872         .quota_cleanup  = filter_quota_cleanup,
873         .quota_check    = target_quota_check,
874         .quota_ctl      = filter_quota_ctl,
875         .quota_setinfo  = filter_quota_setinfo,
876         .quota_clearinfo = filter_quota_clearinfo,
877         .quota_enforce  = filter_quota_enforce,
878         .quota_getflag  = filter_quota_getflag,
879         .quota_acquire  = filter_quota_acquire,
880         .quota_adjust   = filter_quota_adjust,
881         .quota_chkquota = filter_quota_check,
882         .quota_adjust_qunit   = filter_quota_adjust_qunit,
883         .quota_pending_commit = filter_quota_pending_commit,
884 };
885 #endif /* __KERNEL__ */
886
887 quota_interface_t mdc_quota_interface = {
888         .quota_ctl      = client_quota_ctl,
889         .quota_check    = client_quota_check,
890         .quota_poll_check = client_quota_poll_check,
891 };
892
893 quota_interface_t osc_quota_interface = {
894         .quota_ctl      = client_quota_ctl,
895         .quota_check    = client_quota_check,
896         .quota_poll_check = client_quota_poll_check,
897         .quota_init     = osc_quota_init,
898         .quota_exit     = osc_quota_exit,
899         .quota_chkdq    = osc_quota_chkdq,
900         .quota_setdq    = osc_quota_setdq,
901         .quota_cleanup  = osc_quota_cleanup,
902         .quota_adjust_qunit = client_quota_adjust_qunit,
903 };
904
905 quota_interface_t lov_quota_interface = {
906         .quota_check    = lov_quota_check,
907         .quota_ctl      = lov_quota_ctl,
908         .quota_adjust_qunit = lov_quota_adjust_qunit,
909 };
910
911 #ifdef __KERNEL__
912 static int __init init_lustre_quota(void)
913 {
914         int rc = qunit_cache_init();
915         if (rc)
916                 return rc;
917         PORTAL_SYMBOL_REGISTER(filter_quota_interface);
918         PORTAL_SYMBOL_REGISTER(mds_quota_interface);
919         PORTAL_SYMBOL_REGISTER(mdc_quota_interface);
920         PORTAL_SYMBOL_REGISTER(osc_quota_interface);
921         PORTAL_SYMBOL_REGISTER(lov_quota_interface);
922         return 0;
923 }
924
925 static void /*__exit*/ exit_lustre_quota(void)
926 {
927         PORTAL_SYMBOL_UNREGISTER(filter_quota_interface);
928         PORTAL_SYMBOL_UNREGISTER(mds_quota_interface);
929         PORTAL_SYMBOL_UNREGISTER(mdc_quota_interface);
930         PORTAL_SYMBOL_UNREGISTER(osc_quota_interface);
931         PORTAL_SYMBOL_UNREGISTER(lov_quota_interface);
932
933         qunit_cache_cleanup();
934 }
935
936 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
937 MODULE_DESCRIPTION("Lustre Quota");
938 MODULE_LICENSE("GPL");
939
940 cfs_module(lquota, "1.0.0", init_lustre_quota, exit_lustre_quota);
941
942 EXPORT_SYMBOL(mds_quota_interface);
943 EXPORT_SYMBOL(filter_quota_interface);
944 EXPORT_SYMBOL(mdc_quota_interface);
945 EXPORT_SYMBOL(osc_quota_interface);
946 EXPORT_SYMBOL(lov_quota_interface);
947 #endif /* __KERNEL */