X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_ctl.c;h=3650a6716d317599e8f96ca0bffbd1616532eb07;hb=e67c6e366752611ffd2baeb7cefa24c9f289eb78;hp=c056048c71b9379fd4682f6ff29e6c51eb6032fa;hpb=113303973ec9f8484eb2355a1a6ef3c4c7fd6a56;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_ctl.c b/lustre/quota/quota_ctl.c index c056048..3650a67 100644 --- a/lustre/quota/quota_ctl.c +++ b/lustre/quota/quota_ctl.c @@ -1,19 +1,42 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/quota/quota_ctl.c + * GPL HEADER START * - * Copyright (c) 2005 Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * No redistribution or use is permitted outside of Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #ifdef __KERNEL__ # include @@ -21,16 +44,11 @@ # include # include # include -# include # include -# if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) -# include -# include -# include -# include -# else -# include -# endif +# include +# include +# include +# include #else /* __KERNEL__ */ # include #endif @@ -44,19 +62,29 @@ #include #include "quota_internal.h" +#ifdef HAVE_QUOTA_SUPPORT #ifdef __KERNEL__ -int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) + +int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused, + struct obd_quotactl *oqctl) { - struct obd_device *obd = exp->exp_obd; + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct timeval work_start; + struct timeval work_end; + long timediff; int rc = 0; ENTRY; + cfs_gettimeofday(&work_start); switch (oqctl->qc_cmd) { case Q_QUOTAON: + oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */ rc = mds_quota_on(obd, oqctl); break; case Q_QUOTAOFF: - mds_quota_off(obd, oqctl); + oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */ + rc = mds_quota_off(obd, oqctl); break; case Q_SETINFO: rc = mds_set_dqinfo(obd, oqctl); @@ -74,6 +102,13 @@ int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) case Q_GETOQUOTA: rc = mds_get_obd_quota(obd, oqctl); break; + case LUSTRE_Q_INVALIDATE: + rc = mds_quota_invalidate(obd, oqctl); + break; + case LUSTRE_Q_FINVALIDATE: + oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */ + rc = mds_quota_finvalidate(obd, oqctl); + break; default: CERROR("%s: unsupported mds_quotactl command: %d\n", obd->obd_name, oqctl->qc_cmd); @@ -84,67 +119,125 @@ int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) CDEBUG(D_INFO, "mds_quotactl admin quota command %d, id %u, " "type %d, failed: rc = %d\n", oqctl->qc_cmd, oqctl->qc_id, oqctl->qc_type, rc); + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff); RETURN(rc); } -int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) +int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) { struct obd_device *obd = exp->exp_obd; struct obd_device_target *obt = &obd->u.obt; struct lvfs_run_ctxt saved; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + struct lustre_qunit_size *lqs; + void *handle = NULL; + struct timeval work_start; + struct timeval work_end; + long timediff; int rc = 0; ENTRY; + cfs_gettimeofday(&work_start); switch (oqctl->qc_cmd) { + case Q_FINVALIDATE: case Q_QUOTAON: case Q_QUOTAOFF: - if (!atomic_dec_and_test(&obt->obt_quotachecking)) { - CDEBUG(D_INFO, "other people are doing quotacheck\n"); - atomic_inc(&obt->obt_quotachecking); + cfs_down(&obt->obt_quotachecking); + if (oqctl->qc_cmd == Q_FINVALIDATE && + (obt->obt_qctxt.lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) { + CWARN("quota[%u] is on yet\n", oqctl->qc_type); + cfs_up(&obt->obt_quotachecking); rc = -EBUSY; break; } + oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */ case Q_GETOINFO: case Q_GETOQUOTA: case Q_GETQUOTA: /* In recovery scenario, this pending dqacq/dqrel might have * been processed by master successfully before it's dquot - * on master enter recovery mode. We must wait for this + * on master enter recovery mode. We must wait for this * dqacq/dqrel done then return the correct limits to master */ if (oqctl->qc_stat == QUOTA_RECOVERING) - qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, - 1); + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF) { - if (!rc) - obt->obt_qctxt.lqc_status = - (oqctl->qc_cmd == Q_QUOTAON) ? 1 : 0; - atomic_inc(&obt->obt_quotachecking); + if (oqctl->qc_stat == QUOTA_RECOVERING) + quota_unbarrier(handle); + + if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF || + oqctl->qc_cmd == Q_FINVALIDATE) { + if (oqctl->qc_cmd == Q_QUOTAON) { + if (!rc) { + obt->obt_qctxt.lqc_flags |= + UGQUOTA2LQC(oqctl->qc_type); + /* when quotaon, create lqs for every + * quota uid/gid b=18574 */ + build_lqs(obd); + } else if (rc == -EBUSY && + quota_is_on(qctxt, oqctl)) { + rc = -EALREADY; + } + } else if (oqctl->qc_cmd == Q_QUOTAOFF) { + if (!rc) + obt->obt_qctxt.lqc_flags &= + ~UGQUOTA2LQC(oqctl->qc_type); + else if (quota_is_off(qctxt, oqctl)) + rc = -EALREADY; + } + cfs_up(&obt->obt_quotachecking); } + + break; + case Q_SETQUOTA: + /* currently, it is only used for nullifying the quota */ + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + + if (!rc) { + oqctl->qc_cmd = Q_SYNC; + fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + oqctl->qc_cmd = Q_SETQUOTA; + } + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + quota_unbarrier(handle); + + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)){ + CERROR("fail to create lqs during setquota operation " + "for %sid %u\n", oqctl->qc_type ? "g" : "u", + oqctl->qc_id); + } else { + lqs->lqs_flags &= ~QB_SET; + lqs_putref(lqs); + } + break; case Q_INITQUOTA: { - unsigned int uid = 0, gid = 0; + unsigned int id[MAXQUOTAS] = { 0, 0 }; /* Initialize quota limit to MIN_QLIMIT */ LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS); LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0); - /* There might be a pending dqacq/dqrel (which is going to - * clear stale limits on slave). we should wait for it's - * completion then initialize limits */ - qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, 1); - if (!oqctl->qc_dqblk.dqb_bhardlimit) goto adjust; - + + /* There might be a pending dqacq/dqrel (which is going to + * clear stale limits on slave). we should wait for it's + * completion then initialize limits */ + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); @@ -157,18 +250,36 @@ int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) oqctl->qc_cmd = Q_INITQUOTA; } pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + quota_unbarrier(handle); if (rc) RETURN(rc); adjust: + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, 1); + if (lqs == NULL || IS_ERR(lqs)){ + CERROR("fail to create lqs during setquota operation " + "for %sid %u\n", oqctl->qc_type ? "g" : "u", + oqctl->qc_id); + break; + } else { + lqs->lqs_flags |= QB_SET; + lqs_putref(lqs); + } + /* Trigger qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; + + rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, + id, 1, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } - rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, - uid, gid, 1, 0); break; } default: @@ -176,86 +287,144 @@ adjust: obd->obd_name, oqctl->qc_cmd); RETURN(-EFAULT); } + cfs_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, LQUOTA_QUOTA_CTL, timediff); RETURN(rc); } #endif /* __KERNEL__ */ +#endif -int client_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) +int client_quota_ctl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) { - struct ptlrpc_request *req; - struct obd_quotactl *oqc; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oqctl) }; - int ver, opc, rc; + struct ptlrpc_request *req; + struct obd_quotactl *oqc; + const struct req_format *rf; + int ver, opc, rc, resends = 0; ENTRY; if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) { + rf = &RQF_MDS_QUOTACTL; ver = LUSTRE_MDS_VERSION, opc = MDS_QUOTACTL; } else if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_OSC_NAME)) { + rf = &RQF_OST_QUOTACTL; ver = LUSTRE_OST_VERSION, opc = OST_QUOTACTL; } else { RETURN(-EINVAL); } - req = ptlrpc_prep_req(class_exp2cliimp(exp), ver, opc, 2, size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); +restart_request: + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), rf, ver, opc); + if (req == NULL) + RETURN(-ENOMEM); - oqc = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*oqctl)); + oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); *oqc = *oqctl; - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); + ptlrpc_at_set_req_timeout(req); + req->rq_no_resend = 1; rc = ptlrpc_queue_wait(req); - if (!rc) { - oqc = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*oqc), - lustre_swab_obd_quotactl); - if (oqc == NULL) { - CERROR ("Can't unpack obd_quotactl\n"); - GOTO(out, rc = -EPROTO); - } + if (rc) { + CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); + GOTO(out, rc); + } - *oqctl = *oqc; + oqc = NULL; + if (req->rq_repmsg) + oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + + if (oqc == NULL) { + CERROR ("Can't unpack obd_quotactl\n"); + GOTO(out, rc = -EPROTO); } + + *oqctl = *oqc; + EXIT; out: ptlrpc_req_finished(req); - RETURN (rc); + + if (client_quota_recoverable_error(rc)) { + resends++; + if (!client_quota_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); + } + + goto restart_request; + } + + return rc; +} + +/** + * For lmv, only need to send request to master MDT, and the master MDT will + * process with other slave MDTs. + */ +int lmv_quota_ctl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct obd_device *obd = class_exp2obd(exp); + struct lmv_obd *lmv = &obd->u.lmv; + struct lmv_tgt_desc *tgt = &lmv->tgts[0]; + int rc; + ENTRY; + + if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) { + CERROR("master lmv inactive\n"); + RETURN(-EIO); + } + + rc = obd_quotactl(tgt->ltd_exp, oqctl); + RETURN(rc); } -int lov_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) +int lov_quota_ctl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) { struct obd_device *obd = class_exp2obd(exp); struct lov_obd *lov = &obd->u.lov; + struct lov_tgt_desc *tgt; __u64 curspace = 0; - __u32 bhardlimit = 0; + __u64 bhardlimit = 0; int i, rc = 0; ENTRY; - if (oqctl->qc_cmd != Q_QUOTAON && oqctl->qc_cmd != Q_QUOTAOFF && - oqctl->qc_cmd != Q_GETOQUOTA && oqctl->qc_cmd != Q_INITQUOTA) { + if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON && + oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF && + oqctl->qc_cmd != Q_GETOQUOTA && + oqctl->qc_cmd != Q_INITQUOTA && + oqctl->qc_cmd != LUSTRE_Q_SETQUOTA && + oqctl->qc_cmd != Q_FINVALIDATE) { CERROR("bad quota opc %x for lov obd", oqctl->qc_cmd); RETURN(-EFAULT); } + /* for lov tgt */ + obd_getref(obd); for (i = 0; i < lov->desc.ld_tgt_count; i++) { int err; - if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) { + tgt = lov->lov_tgts[i]; + if (!tgt || !tgt->ltd_active || tgt->ltd_reap) { if (oqctl->qc_cmd == Q_GETOQUOTA) { + rc = -EREMOTEIO; CERROR("ost %d is inactive\n", i); - rc = -EIO; - break; } else { CDEBUG(D_HA, "ost %d is inactive\n", i); - continue; } + continue; } - err = obd_quotactl(lov->lov_tgts[i]->ltd_exp, oqctl); + err = obd_quotactl(tgt->ltd_exp, oqctl); if (err) { - if (lov->lov_tgts[i]->ltd_active && !rc) + if (tgt->ltd_active && !rc) rc = err; continue; } @@ -265,6 +434,7 @@ int lov_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit; } } + obd_putref(obd); if (oqctl->qc_cmd == Q_GETOQUOTA) { oqctl->qc_dqblk.dqb_curspace = curspace; @@ -272,4 +442,3 @@ int lov_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl) } RETURN(rc); } -