Whamcloud - gitweb
b=22688 Make sure we don't use statfs data from cache.
[fs/lustre-release.git] / lustre / quota / quota_ctl.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_LQUOTA
40
41 #ifdef __KERNEL__
42 # include <linux/version.h>
43 # include <linux/module.h>
44 # include <linux/init.h>
45 # include <linux/fs.h>
46 # include <linux/quota.h>
47 #  include <linux/smp_lock.h>
48 #  include <linux/buffer_head.h>
49 #  include <linux/workqueue.h>
50 #  include <linux/mount.h>
51 #else /* __KERNEL__ */
52 # include <liblustre.h>
53 #endif
54
55 #include <obd_class.h>
56 #include <lustre_mds.h>
57 #include <lustre_dlm.h>
58 #include <lustre_cfg.h>
59 #include <obd_ost.h>
60 #include <lustre_fsfilt.h>
61 #include <lustre_quota.h>
62 #include "quota_internal.h"
63
64 #ifdef HAVE_QUOTA_SUPPORT
65 #ifdef __KERNEL__
66
67 /* When quotaon, build a lqs for every uid/gid who has been set limitation
68  * for quota. After quota_search_lqs, it will hold one ref for the lqs.
69  * It will be released when qctxt_cleanup() is executed b=18574 */
70 void build_lqs(struct obd_device *obd)
71 {
72         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
73         struct list_head id_list;
74         int i, rc;
75
76         INIT_LIST_HEAD(&id_list);
77         for (i = 0; i < MAXQUOTAS; i++) {
78                 struct dquot_id *dqid, *tmp;
79
80                 if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
81                         continue;
82
83 #ifndef KERNEL_SUPPORTS_QUOTA_READ
84                 rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
85                                  i, &id_list);
86 #else
87                 rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
88                                  i, &id_list);
89 #endif
90                 if (rc) {
91                         CERROR("%s: failed to get %s qids\n", obd->obd_name,
92                                i ? "group" : "user");
93                         continue;
94                 }
95
96                 list_for_each_entry_safe(dqid, tmp, &id_list,
97                                          di_link) {
98                         struct lustre_qunit_size *lqs;
99
100                         list_del_init(&dqid->di_link);
101                         lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
102                                                qctxt, 1);
103                         if (lqs && !IS_ERR(lqs)) {
104                                 lqs->lqs_flags |= dqid->di_flag;
105                                 lqs_putref(lqs);
106                         } else {
107                                 CERROR("%s: failed to create a lqs for %sid %u"
108                                        "\n", obd->obd_name, i ? "g" : "u",
109                                        dqid->di_id);
110                         }
111
112                         OBD_FREE_PTR(dqid);
113                 }
114         }
115 }
116
117 int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
118 {
119         struct obd_device *obd = exp->exp_obd;
120         struct obd_device_target *obt = &obd->u.obt;
121         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
122         struct timeval work_start;
123         struct timeval work_end;
124         long timediff;
125         int rc = 0;
126         ENTRY;
127
128         do_gettimeofday(&work_start);
129         switch (oqctl->qc_cmd) {
130         case Q_QUOTAON:
131                 rc = mds_quota_on(obd, oqctl);
132                 break;
133         case Q_QUOTAOFF:
134                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
135                 mds_quota_off(obd, oqctl);
136                 break;
137         case Q_SETINFO:
138                 rc = mds_set_dqinfo(obd, oqctl);
139                 break;
140         case Q_GETINFO:
141                 rc = mds_get_dqinfo(obd, oqctl);
142                 break;
143         case Q_SETQUOTA:
144                 rc = mds_set_dqblk(obd, oqctl);
145                 break;
146         case Q_GETQUOTA:
147                 rc = mds_get_dqblk(obd, oqctl);
148                 break;
149         case Q_GETOINFO:
150         case Q_GETOQUOTA:
151                 rc = mds_get_obd_quota(obd, oqctl);
152                 break;
153         case LUSTRE_Q_INVALIDATE:
154                 rc = mds_quota_invalidate(obd, oqctl);
155                 break;
156         case LUSTRE_Q_FINVALIDATE:
157                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
158                 rc = mds_quota_finvalidate(obd, oqctl);
159                 break;
160         default:
161                 CERROR("%s: unsupported mds_quotactl command: %d\n",
162                        obd->obd_name, oqctl->qc_cmd);
163                 RETURN(-EFAULT);
164         }
165
166         if (rc)
167                 CDEBUG(D_INFO, "mds_quotactl admin quota command %d, id %u, "
168                                "type %d, failed: rc = %d\n",
169                        oqctl->qc_cmd, oqctl->qc_id, oqctl->qc_type, rc);
170         do_gettimeofday(&work_end);
171         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
172         lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff);
173
174         RETURN(rc);
175 }
176
177 int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
178 {
179         struct obd_device *obd = exp->exp_obd;
180         struct obd_device_target *obt = &obd->u.obt;
181         struct lvfs_run_ctxt saved;
182         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
183         struct lustre_qunit_size *lqs;
184         void *handle = NULL;
185         struct timeval work_start;
186         struct timeval work_end;
187         long timediff;
188         int rc = 0;
189         ENTRY;
190
191         do_gettimeofday(&work_start);
192         switch (oqctl->qc_cmd) {
193         case Q_QUOTAON:
194                 oqctl->qc_id = obt->obt_qfmt;
195                 rc = generic_quota_on(obd, oqctl, 0);
196                 break;
197         case Q_FINVALIDATE:
198         case Q_QUOTAOFF:
199                 if (!atomic_dec_and_test(&obt->obt_quotachecking)) {
200                         CDEBUG(D_INFO, "other people are doing quotacheck\n");
201                         atomic_inc(&obt->obt_quotachecking);
202                         rc = -EBUSY;
203                         break;
204                 }
205                 if (oqctl->qc_cmd == Q_FINVALIDATE &&
206                     (obt->obt_qctxt.lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) {
207                         atomic_inc(&obt->obt_quotachecking);
208                         rc = -EBUSY;
209                         break;
210                 }
211                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
212         case Q_GETOINFO:
213         case Q_GETOQUOTA:
214         case Q_GETQUOTA:
215                 /* In recovery scenario, this pending dqacq/dqrel might have
216                  * been processed by master successfully before it's dquot
217                  * on master enter recovery mode. We must wait for this 
218                  * dqacq/dqrel done then return the correct limits to master */
219                 if (oqctl->qc_stat == QUOTA_RECOVERING)
220                         handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
221
222                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
223                 rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
224                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
225
226                 if (oqctl->qc_stat == QUOTA_RECOVERING)
227                         quota_unbarrier(handle);
228
229                 if (oqctl->qc_cmd == Q_QUOTAOFF ||
230                     oqctl->qc_cmd == Q_FINVALIDATE) {
231                         if (!rc && oqctl->qc_cmd == Q_QUOTAOFF)
232                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
233                         atomic_inc(&obt->obt_quotachecking);
234                 }
235                 break;
236         case Q_SETQUOTA:
237                 /* currently, it is only used for nullifying the quota */
238                 handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
239
240                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
241                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
242
243                 if (!rc) {
244                         oqctl->qc_cmd = Q_SYNC;
245                         fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
246                         oqctl->qc_cmd = Q_SETQUOTA;
247                 }
248                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
249                 quota_unbarrier(handle);
250
251                 lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
252                                        qctxt, 0);
253                 if (lqs == NULL || IS_ERR(lqs)){
254                         CERROR("fail to create lqs during setquota operation "
255                                "for %sid %u\n", oqctl->qc_type ? "g" : "u",
256                                oqctl->qc_id);
257                 } else {
258                         lqs->lqs_flags &= ~QB_SET;
259                         lqs_putref(lqs);
260                 }
261
262                 break;
263         case Q_INITQUOTA:
264                 {
265                 unsigned int uid = 0, gid = 0;
266
267                 /* Initialize quota limit to MIN_QLIMIT */
268                 LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
269                 LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
270
271                 if (!oqctl->qc_dqblk.dqb_bhardlimit)
272                         goto adjust;
273
274                /* There might be a pending dqacq/dqrel (which is going to
275                  * clear stale limits on slave). we should wait for it's
276                  * completion then initialize limits */
277                 handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
278                 LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
279                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
280                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
281
282                 /* Update on-disk quota, in case of lose the changed limits
283                  * (MIN_QLIMIT) on crash, which cannot be recovered.*/
284                 if (!rc) {
285                         oqctl->qc_cmd = Q_SYNC;
286                         fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
287                         oqctl->qc_cmd = Q_INITQUOTA;
288                 }
289                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
290                 quota_unbarrier(handle);
291
292                 if (rc)
293                         RETURN(rc);
294 adjust:
295                 lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
296                                        qctxt, 1);
297                 if (lqs == NULL || IS_ERR(lqs)){
298                         CERROR("fail to create lqs during setquota operation "
299                                "for %sid %u\n", oqctl->qc_type ? "g" : "u",
300                                oqctl->qc_id);
301                         break;
302                 } else {
303                         lqs->lqs_flags |= QB_SET;
304                         if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_WITHOUT_CHANGE_QS)) {
305                                 lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz;
306                                 lqs->lqs_btune_sz = qctxt->lqc_btune_sz;
307                                 lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz;
308                                 lqs->lqs_itune_sz = qctxt->lqc_itune_sz;
309                         }
310                         lqs_putref(lqs);
311                 }
312
313                 /* Trigger qunit pre-acquire */
314                 if (oqctl->qc_type == USRQUOTA)
315                         uid = oqctl->qc_id;
316                 else
317                         gid = oqctl->qc_id;
318
319                 rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
320                                         uid, gid, 1, 0, NULL);
321                 if (rc == -EDQUOT || rc == -EBUSY) {
322                         CDEBUG(D_QUOTA, "rc: %d.\n", rc);
323                         rc = 0;
324                 }
325
326                 break;
327                 }
328         default:
329                 CERROR("%s: unsupported filter_quotactl command: %d\n",
330                        obd->obd_name, oqctl->qc_cmd);
331                 RETURN(-EFAULT);
332         }
333         do_gettimeofday(&work_end);
334         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
335         lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff);
336
337         RETURN(rc);
338 }
339 #endif /* __KERNEL__ */
340 #endif
341
342 int client_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
343 {
344         struct ptlrpc_request *req;
345         struct obd_quotactl *oqc;
346         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oqctl) };
347         int ver, opc, rc;
348         ENTRY;
349
350         if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) {
351                 ver = LUSTRE_MDS_VERSION,
352                 opc = MDS_QUOTACTL;
353         } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) {
354                 ver = LUSTRE_OST_VERSION,
355                 opc = OST_QUOTACTL;
356         } else {
357                 RETURN(-EINVAL);
358         }
359
360         req = ptlrpc_prep_req(class_exp2cliimp(exp), ver, opc, 2, size, NULL);
361         if (!req)
362                 GOTO(out, rc = -ENOMEM);
363
364         oqc = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*oqctl));
365         *oqc = *oqctl;
366
367         ptlrpc_req_set_repsize(req, 2, size);
368         ptlrpc_at_set_req_timeout(req);
369         req->rq_no_resend = 1;
370
371         rc = ptlrpc_queue_wait(req);
372         if (rc) {
373                 CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc);
374                 GOTO(out, rc);
375         }
376
377         if (req->rq_repmsg) {
378                 oqc = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oqc),
379                                          lustre_swab_obd_quotactl);
380                 if (oqc != NULL) {
381                         *oqctl = *oqc;
382                 } else {
383                         CERROR ("Can't unpack obd_quotactl\n");
384                         rc = -EPROTO;
385                 }
386         }
387         EXIT;
388 out:
389         ptlrpc_req_finished(req);
390         return rc;
391 }
392
393 struct lov_getquota_set_arg {
394         __u64 curspace;
395         __u64 bhardlimit;
396 };
397
398 static int lov_getquota_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
399 {
400         struct lov_getquota_set_arg *set_arg = data;
401         struct ptlrpc_request *req;
402         struct list_head *pos;
403         struct obd_quotactl *oqc;
404
405         list_for_each(pos, &rqset->set_requests) {
406                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
407
408                 if (req->rq_status)
409                         continue;
410
411                 oqc = NULL;
412                 if (req->rq_repmsg)
413                         oqc = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oqc),
414                                 lustre_swab_obd_quotactl);
415
416                 if (oqc == NULL) {
417                         CERROR("Can't unpack obd_quotactl\n");
418                         rc = -EPROTO;
419                         continue;
420                 }
421
422                 set_arg->curspace += oqc->qc_dqblk.dqb_curspace;
423                 set_arg->bhardlimit += oqc->qc_dqblk.dqb_bhardlimit;
424         }
425
426         return rc;
427 }
428
429 int lov_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
430 {
431         struct obd_device *obd = class_exp2obd(exp);
432         struct lov_obd *lov = &obd->u.lov;
433         int i, rc = 0, rc1;
434         struct lov_getquota_set_arg set_arg = { 0 };
435         struct obd_export *ltd_exp;
436         struct ptlrpc_request_set *rqset;
437         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oqctl) };
438
439         ENTRY;
440
441         if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON &&
442             oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF &&
443             oqctl->qc_cmd != Q_GETOQUOTA &&
444             oqctl->qc_cmd != Q_INITQUOTA &&
445             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA &&
446             oqctl->qc_cmd != Q_FINVALIDATE) {
447                 CERROR("bad quota opc %x for lov obd", oqctl->qc_cmd);
448                 RETURN(-EINVAL);
449         }
450
451         rqset = ptlrpc_prep_set();
452         if (rqset == NULL)
453                 RETURN(-ENOMEM);
454
455         obd_getref(obd);
456
457         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
458                 struct ptlrpc_request *req;
459                 struct obd_quotactl *oqc;
460
461                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
462                         if (oqctl->qc_cmd == Q_GETOQUOTA) {
463                                 CERROR("ost %d is inactive\n", i);
464                                 rc = -EIO;
465                         } else {
466                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
467                         }
468                         continue;
469                 }
470
471                 ltd_exp = lov->lov_tgts[i]->ltd_exp;
472
473                 req = ptlrpc_prep_req(class_exp2cliimp(ltd_exp),
474                                       LUSTRE_OST_VERSION,
475                                       OST_QUOTACTL, 2, size, NULL);
476                 if (!req) {
477                         obd_putref(obd);
478                         GOTO(out, rc = -ENOMEM);
479                 }
480
481                 oqc = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*oqctl));
482                 *oqc = *oqctl;
483
484                 ptlrpc_req_set_repsize(req, 2, size);
485                 ptlrpc_at_set_req_timeout(req);
486                 req->rq_no_resend = 1;
487                 req->rq_no_delay = 1;
488
489                 ptlrpc_set_add_req(rqset, req);
490         }
491
492         obd_putref(obd);
493
494         if (oqctl->qc_cmd == Q_GETOQUOTA) {
495                 rqset->set_interpret = lov_getquota_interpret;
496                 rqset->set_arg = &set_arg;
497         }
498         rc1 = ptlrpc_set_wait(rqset);
499         rc = rc1 ? rc1 : rc;
500
501 out:
502         ptlrpc_set_destroy(rqset);
503         oqctl->qc_dqblk.dqb_curspace = set_arg.curspace;
504         oqctl->qc_dqblk.dqb_bhardlimit = set_arg.bhardlimit;
505
506         RETURN(rc);
507 }