Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / quota / quota_ctl.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_LQUOTA
40
41 #ifdef __KERNEL__
42 # include <linux/version.h>
43 # include <linux/module.h>
44 # include <linux/init.h>
45 # include <linux/fs.h>
46 # include <linux/jbd.h>
47 # include <linux/quota.h>
48 #  include <linux/smp_lock.h>
49 #  include <linux/buffer_head.h>
50 #  include <linux/workqueue.h>
51 #  include <linux/mount.h>
52 #else /* __KERNEL__ */
53 # include <liblustre.h>
54 #endif
55
56 #include <obd_class.h>
57 #include <lustre_mds.h>
58 #include <lustre_dlm.h>
59 #include <lustre_cfg.h>
60 #include <obd_ost.h>
61 #include <lustre_fsfilt.h>
62 #include <lustre_quota.h>
63 #include "quota_internal.h"
64
65 #ifdef HAVE_QUOTA_SUPPORT
66 #ifdef __KERNEL__
67
68 /* When quotaon, build a lqs for every uid/gid who has been set limitation
69  * for quota. After quota_search_lqs, it will hold one ref for the lqs.
70  * It will be released when qctxt_cleanup() is executed b=18574 */
71 void build_lqs(struct obd_device *obd)
72 {
73         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
74         struct list_head id_list;
75         int i, rc;
76
77         INIT_LIST_HEAD(&id_list);
78         for (i = 0; i < MAXQUOTAS; i++) {
79                 struct dquot_id *dqid, *tmp;
80
81                 if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
82                         continue;
83
84 #ifndef KERNEL_SUPPORTS_QUOTA_READ
85                 rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
86                                  i, &id_list);
87 #else
88                 rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
89                                  i, &id_list);
90 #endif
91                 if (rc) {
92                         CDEBUG(D_ERROR, "fail to get %s qids!\n",
93                                i ? "group" : "user");
94                         continue;
95                 }
96
97                 list_for_each_entry_safe(dqid, tmp, &id_list,
98                                          di_link) {
99                         struct lustre_qunit_size *lqs;
100
101                         list_del_init(&dqid->di_link);
102                         lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
103                                                qctxt, 1);
104                         if (lqs && !IS_ERR(lqs)) {
105                                 lqs->lqs_flags |= dqid->di_flag;
106                                 lqs_putref(lqs);
107                         } else {
108                                 CDEBUG(D_ERROR, "fail to create a lqs"
109                                        "(%s id: %u)!\n", i ? "group" : "user",
110                                        dqid->di_id);
111                         }
112
113                         OBD_FREE_PTR(dqid);
114                 }
115         }
116 }
117
118 int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
119 {
120         struct obd_device *obd = exp->exp_obd;
121         struct obd_device_target *obt = &obd->u.obt;
122         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
123         struct timeval work_start;
124         struct timeval work_end;
125         long timediff;
126         int rc = 0;
127         ENTRY;
128
129         do_gettimeofday(&work_start);
130         switch (oqctl->qc_cmd) {
131         case Q_QUOTAON:
132                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
133                 rc = mds_quota_on(obd, oqctl);
134                 /* when quotaon, create lqs for every quota uid/gid b=18574 */
135                 build_lqs(obd);
136                 break;
137         case Q_QUOTAOFF:
138                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
139                 mds_quota_off(obd, oqctl);
140                 break;
141         case Q_SETINFO:
142                 rc = mds_set_dqinfo(obd, oqctl);
143                 break;
144         case Q_GETINFO:
145                 rc = mds_get_dqinfo(obd, oqctl);
146                 break;
147         case Q_SETQUOTA:
148                 rc = mds_set_dqblk(obd, oqctl);
149                 break;
150         case Q_GETQUOTA:
151                 rc = mds_get_dqblk(obd, oqctl);
152                 break;
153         case Q_GETOINFO:
154         case Q_GETOQUOTA:
155                 rc = mds_get_obd_quota(obd, oqctl);
156                 break;
157         case LUSTRE_Q_INVALIDATE:
158                 rc = mds_quota_invalidate(obd, oqctl);
159                 break;
160         case LUSTRE_Q_FINVALIDATE:
161                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
162                 rc = mds_quota_finvalidate(obd, oqctl);
163                 break;
164         default:
165                 CERROR("%s: unsupported mds_quotactl command: %d\n",
166                        obd->obd_name, oqctl->qc_cmd);
167                 RETURN(-EFAULT);
168         }
169
170         if (rc)
171                 CDEBUG(D_INFO, "mds_quotactl admin quota command %d, id %u, "
172                                "type %d, failed: rc = %d\n",
173                        oqctl->qc_cmd, oqctl->qc_id, oqctl->qc_type, rc);
174         do_gettimeofday(&work_end);
175         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
176         lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff);
177
178         RETURN(rc);
179 }
180
181 int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
182 {
183         struct obd_device *obd = exp->exp_obd;
184         struct obd_device_target *obt = &obd->u.obt;
185         struct lvfs_run_ctxt saved;
186         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
187         struct lustre_qunit_size *lqs;
188         struct timeval work_start;
189         struct timeval work_end;
190         long timediff;
191         int rc = 0;
192         ENTRY;
193
194         do_gettimeofday(&work_start);
195         switch (oqctl->qc_cmd) {
196         case Q_FINVALIDATE:
197         case Q_QUOTAON:
198         case Q_QUOTAOFF:
199                 if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
200                         CDEBUG(D_INFO, "other people are doing quotacheck\n");
201                         atomic_inc(&obt->obt_quotachecking);
202                         rc = -EBUSY;
203                         break;
204                 }
205                 if (oqctl->qc_cmd == Q_FINVALIDATE &&
206                     (obt->obt_qctxt.lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) {
207                         atomic_inc(&obt->obt_quotachecking);
208                         rc = -EBUSY;
209                         break;
210                 }
211                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
212         case Q_GETOINFO:
213         case Q_GETOQUOTA:
214         case Q_GETQUOTA:
215                 /* In recovery scenario, this pending dqacq/dqrel might have
216                  * been processed by master successfully before it's dquot
217                  * on master enter recovery mode. We must wait for this 
218                  * dqacq/dqrel done then return the correct limits to master */
219                 if (oqctl->qc_stat == QUOTA_RECOVERING)
220                         qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
221                                                  oqctl->qc_id, oqctl->qc_type, 
222                                                  1);
223
224                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
225                 rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
226                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
227
228                 if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
229                     oqctl->qc_cmd == Q_FINVALIDATE) {
230                         if (!rc && oqctl->qc_cmd == Q_QUOTAON)
231                                 obt->obt_qctxt.lqc_flags |= UGQUOTA2LQC(oqctl->qc_type);
232                         if (!rc && oqctl->qc_cmd == Q_QUOTAOFF)
233                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
234                         atomic_inc(&obt->obt_quotachecking);
235                 }
236
237                 /* when quotaon, create lqs for every quota uid/gid b=18574 */
238                 if (oqctl->qc_cmd == Q_QUOTAON)
239                         build_lqs(obd);
240                 break;
241         case Q_SETQUOTA:
242                 /* currently, it is only used for nullifying the quota */
243                 qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
244                                          oqctl->qc_id, oqctl->qc_type, 1);
245
246                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
247                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
248
249                 if (!rc) {
250                         oqctl->qc_cmd = Q_SYNC;
251                         fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
252                         oqctl->qc_cmd = Q_SETQUOTA;
253                 }
254                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
255
256                 lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
257                                        qctxt, 0);
258                 if (lqs == NULL || IS_ERR(lqs)){
259                         CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
260                 } else {
261                         lqs->lqs_flags &= ~QB_SET;
262                         lqs_putref(lqs);
263                 }
264
265                 break;
266         case Q_INITQUOTA:
267                 {
268                 unsigned int uid = 0, gid = 0;
269
270                 /* Initialize quota limit to MIN_QLIMIT */
271                 LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
272                 LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
273
274                 /* There might be a pending dqacq/dqrel (which is going to
275                  * clear stale limits on slave). we should wait for it's
276                  * completion then initialize limits */
277                 qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
278                                          oqctl->qc_id, oqctl->qc_type, 1);
279
280                 if (!oqctl->qc_dqblk.dqb_bhardlimit)
281                         goto adjust;
282
283                 LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
284                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
285                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
286
287                 /* Update on-disk quota, in case of lose the changed limits
288                  * (MIN_QLIMIT) on crash, which cannot be recovered.*/
289                 if (!rc) {
290                         oqctl->qc_cmd = Q_SYNC;
291                         fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
292                         oqctl->qc_cmd = Q_INITQUOTA;
293                 }
294                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
295
296                 if (rc)
297                         RETURN(rc);
298 adjust:
299                 lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
300                                        qctxt, 1);
301                 if (lqs == NULL || IS_ERR(lqs)){
302                         CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
303                         break;
304                 } else {
305                         lqs->lqs_flags |= QB_SET;
306                         if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_WITHOUT_CHANGE_QS)) {
307                                 lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz;
308                                 lqs->lqs_btune_sz = qctxt->lqc_btune_sz;
309                                 lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz;
310                                 lqs->lqs_itune_sz = qctxt->lqc_itune_sz;
311                         }
312                         lqs_putref(lqs);
313                 }
314
315                 /* Trigger qunit pre-acquire */
316                 if (oqctl->qc_type == USRQUOTA)
317                         uid = oqctl->qc_id;
318                 else
319                         gid = oqctl->qc_id;
320
321                 rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
322                                         uid, gid, 1, 0, NULL);
323                 if (rc == -EDQUOT || rc == -EBUSY) {
324                         CDEBUG(D_QUOTA, "rc: %d.\n", rc);
325                         rc = 0;
326                 }
327
328                 break;
329                 }
330         default:
331                 CERROR("%s: unsupported filter_quotactl command: %d\n",
332                        obd->obd_name, oqctl->qc_cmd);
333                 RETURN(-EFAULT);
334         }
335         do_gettimeofday(&work_end);
336         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
337         lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff);
338
339         RETURN(rc);
340 }
341 #endif /* __KERNEL__ */
342 #endif
343
344 int client_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
345 {
346         struct ptlrpc_request *req;
347         struct obd_quotactl *oqc;
348         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oqctl) };
349         int ver, opc, rc;
350         ENTRY;
351
352         if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
353                 ver = LUSTRE_MDS_VERSION,
354                 opc = MDS_QUOTACTL;
355         } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
356                 ver = LUSTRE_OST_VERSION,
357                 opc = OST_QUOTACTL;
358         } else {
359                 RETURN(-EINVAL);
360         }
361
362         req = ptlrpc_prep_req(class_exp2cliimp(exp), ver, opc, 2, size, NULL);
363         if (!req)
364                 GOTO(out, rc = -ENOMEM);
365
366         oqc = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*oqctl));
367         *oqc = *oqctl;
368
369         ptlrpc_req_set_repsize(req, 2, size);
370
371         rc = ptlrpc_queue_wait(req);
372         if (rc) {
373                 CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
374                 GOTO(out, rc);
375         }
376
377         oqc = NULL;
378         if (req->rq_repmsg)
379                 oqc = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oqc),
380                                          lustre_swab_obd_quotactl);
381         if (oqc == NULL) {
382                 CERROR ("Can't unpack obd_quotactl\n");
383                 GOTO(out, rc = -EPROTO);
384         }
385
386         *oqctl = *oqc;
387         EXIT;
388 out:
389         ptlrpc_req_finished(req);
390         return rc;
391 }
392
393 int lov_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
394 {
395         struct obd_device *obd = class_exp2obd(exp);
396         struct lov_obd *lov = &obd->u.lov;
397         __u64 curspace = 0;
398         __u64 bhardlimit = 0;
399         int i, rc = 0;
400         ENTRY;
401
402         if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON &&
403             oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF &&
404             oqctl->qc_cmd != Q_GETOQUOTA &&
405             oqctl->qc_cmd != Q_INITQUOTA &&
406             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA &&
407             oqctl->qc_cmd != Q_FINVALIDATE) {
408                 CERROR("bad quota opc %x for lov obd", oqctl->qc_cmd);
409                 RETURN(-EFAULT);
410         }
411
412         obd_getref(obd);
413         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
414                 int err;
415
416                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
417                         if (oqctl->qc_cmd == Q_GETOQUOTA) {
418                                 CERROR("ost %d is inactive\n", i);
419                                 rc = -EIO;
420                         } else {
421                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
422                         }
423                         continue;
424                 }
425
426                 err = obd_quotactl(lov->lov_tgts[i]->ltd_exp, oqctl);
427                 if (err) {
428                         if (lov->lov_tgts[i]->ltd_active && !rc)
429                                 rc = err;
430                         continue;
431                 }
432
433                 if (oqctl->qc_cmd == Q_GETOQUOTA) {
434                         curspace += oqctl->qc_dqblk.dqb_curspace;
435                         bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
436                 }
437         }
438         obd_putref(obd);
439
440         if (oqctl->qc_cmd == Q_GETOQUOTA) {
441                 oqctl->qc_dqblk.dqb_curspace = curspace;
442                 oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
443         }
444         RETURN(rc);
445 }