X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_master.c;h=7753bfffdf4bb809e68b32629993d10f3fdf7a18;hb=222305972a5d38b0fe111966da775e98a0ec8269;hp=d47f5f93bfdca5b2dcd0eab85d606d0d42e5ecb4;hpb=71e20cf1b01ea7da372858dd114d47005324153b;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index d47f5f9..7753bff 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -1,22 +1,50 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/quota/quota_master.c - * Lustre Quota Master request handler + * GPL HEADER START * - * Copyright (c) 2001-2005 Cluster File Systems, Inc. - * Author: Niu YaWei + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * No redistribution or use is permitted outside of Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/quota/quota_master.c + * + * Lustre Quota Master request handler + * + * Author: Niu YaWei + */ + #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #include #include @@ -34,8 +62,9 @@ #include "quota_internal.h" -/* lock ordering: - * mds->mds_qonoff_sem > dquot->dq_sem */ +#ifdef HAVE_QUOTA_SUPPORT + +/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */ static struct list_head lustre_dquot_hash[NR_DQHASH]; static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED; @@ -54,7 +83,7 @@ int lustre_dquot_init(void) return (-ENOMEM); for (i = 0; i < NR_DQHASH; i++) { - INIT_LIST_HEAD(lustre_dquot_hash + i); + CFS_INIT_LIST_HEAD(lustre_dquot_hash + i); } RETURN(0); } @@ -112,11 +141,11 @@ static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, struct lustre_dquot *dquot = NULL; ENTRY; - OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot)); + OBD_SLAB_ALLOC_PTR_GFP(dquot, lustre_dquot_cachep, CFS_ALLOC_IO); if (dquot == NULL) RETURN(NULL); - INIT_LIST_HEAD(&dquot->dq_hash); + CFS_INIT_LIST_HEAD(&dquot->dq_hash); init_mutex_locked(&dquot->dq_sem); dquot->dq_refcnt = 1; dquot->dq_info = lqi; @@ -170,7 +199,7 @@ static struct lustre_dquot *lustre_dqget(struct obd_device *obd, if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); - + spin_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { dquot->dq_refcnt++; @@ -198,24 +227,134 @@ static struct lustre_dquot *lustre_dqget(struct obd_device *obd, RETURN(dquot); } +static void init_oqaq(struct quota_adjust_qunit *oqaq, + struct lustre_quota_ctxt *qctxt, + qid_t id, int type) +{ + struct lustre_qunit_size *lqs = NULL; + + oqaq->qaq_id = id; + oqaq->qaq_flags = type; + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 0); + if (lqs && !IS_ERR(lqs)) { + spin_lock(&lqs->lqs_lock); + oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz; + oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz; + oqaq->qaq_flags = lqs->lqs_flags; + spin_unlock(&lqs->lqs_lock); + lqs_putref(lqs); + } else { + CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); + oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz; + oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz; + } +} + +int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, + __u32 is_blk) +{ + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct obd_device *lov_mds_obd = class_exp2obd(mds->mds_osc_exp); + struct lov_obd *lov = &lov_mds_obd->u.lov; + __u32 ost_num = lov->desc.ld_tgt_count, mdt_num = 1; + struct quota_adjust_qunit *oqaq = NULL; + unsigned int qid[MAXQUOTAS] = { 0, 0 }; + struct lustre_quota_info *info = &mds->mds_quota_info; + struct lustre_dquot *dquot = NULL; + int adjust_res = 0; + int rc = 0; + ENTRY; + + LASSERT(mds); + dquot = lustre_dqget(obd, info, id, type); + if (IS_ERR(dquot)) + RETURN(PTR_ERR(dquot)); + + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + GOTO(out, rc = -ENOMEM); + + down(&dquot->dq_sem); + init_oqaq(oqaq, qctxt, id, type); + + rc = dquot_create_oqaq(qctxt, dquot, ost_num, mdt_num, + is_blk ? LQUOTA_FLAGS_ADJBLK : + LQUOTA_FLAGS_ADJINO, oqaq); + + if (rc < 0) { + CDEBUG(D_ERROR, "create oqaq failed! (rc:%d)\n", rc); + GOTO(out_sem, rc); + } + QAQ_DEBUG(oqaq, "show oqaq.\n") + + if (!QAQ_IS_ADJBLK(oqaq) && !QAQ_IS_ADJINO(oqaq)) + GOTO(out_sem, rc); + + /* adjust the mds slave qunit size */ + adjust_res = quota_adjust_slave_lqs(oqaq, qctxt); + if (adjust_res <= 0) { + if (adjust_res < 0) { + rc = adjust_res; + CDEBUG(D_ERROR, "adjust mds slave's qunit size failed! \ + (rc:%d)\n", rc); + } else { + CDEBUG(D_QUOTA, "qunit doesn't need to be adjusted.\n"); + } + GOTO(out_sem, rc); + } + + if (type) + qid[GRPQUOTA] = dquot->dq_id; + else + qid[USRQUOTA] = dquot->dq_id; + + up(&dquot->dq_sem); + + rc = qctxt_adjust_qunit(obd, qctxt, qid, is_blk, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } + if (rc) { + CDEBUG(D_ERROR, "mds fail to adjust file quota! \ + (rc:%d)\n", rc); + GOTO(out, rc); + } + + /* only when block qunit is reduced, boardcast to osts */ + if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq)) + rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt); + +out: + lustre_dqput(dquot); + if (oqaq) + OBD_FREE_PTR(oqaq); + + RETURN(rc); +out_sem: + up(&dquot->dq_sem); + goto out; +} + int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) { struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; struct lustre_quota_info *info = &mds->mds_quota_info; struct lustre_dquot *dquot = NULL; __u64 *usage = NULL; - __u32 hlimit = 0, slimit = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; + __u64 hlimit = 0, slimit = 0; time_t *time = NULL; unsigned int grace = 0; + struct lustre_qunit_size *lqs = NULL; int rc = 0; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ)) RETURN(-EIO); - dquot = lustre_dqget(obd, info, qdata->qd_id, qdata_type); + dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); @@ -230,14 +369,14 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) GOTO(out, rc = -EBUSY); } - if (is_blk) { - grace = info->qi_info[qdata_type].dqi_bgrace; + if (QDATA_IS_BLK(qdata)) { + grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; hlimit = dquot->dq_dqb.dqb_bhardlimit; slimit = dquot->dq_dqb.dqb_bsoftlimit; time = &dquot->dq_dqb.dqb_btime; } else { - grace = info->qi_info[qdata_type].dqi_igrace; + grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_igrace; usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes; hlimit = dquot->dq_dqb.dqb_ihardlimit; slimit = dquot->dq_dqb.dqb_isoftlimit; @@ -253,12 +392,21 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) switch (opc) { case QUOTA_DQACQ: - if (hlimit && - QUSG(*usage + qdata->qd_count, is_blk) > hlimit) - GOTO(out, rc = -EDQUOT); + if (hlimit && + QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > hlimit) + { + if (QDATA_IS_CHANGE_QS(qdata) && + QUSG(*usage, QDATA_IS_BLK(qdata)) < hlimit) + qdata->qd_count = (hlimit - + QUSG(*usage, QDATA_IS_BLK(qdata))) + * (QDATA_IS_BLK(qdata) ? + QUOTABLOCK_SIZE : 1); + else + GOTO(out, rc = -EDQUOT); + } if (slimit && - QUSG(*usage + qdata->qd_count, is_blk) > slimit) { + QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > slimit) { if (*time && cfs_time_current_sec() >= *time) GOTO(out, rc = -EDQUOT); else if (!*time) @@ -276,7 +424,7 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) *usage -= qdata->qd_count; /* (usage <= soft limit) but not (usage < soft limit) */ - if (!slimit || QUSG(*usage, is_blk) <= slimit) + if (!slimit || QUSG(*usage, QDATA_IS_BLK(qdata)) <= slimit) *time = 0; break; default: @@ -289,35 +437,95 @@ out: up(&dquot->dq_sem); up(&mds->mds_qonoff_sem); lustre_dqput(dquot); + if (rc != -EDQUOT) + dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), + QDATA_IS_BLK(qdata)); + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : + qctxt->lqc_iunit_sz; + } else { + spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + spin_unlock(&lqs->lqs_lock); + } + + if (QDATA_IS_BLK(qdata)) + QDATA_SET_ADJBLK(qdata); + else + QDATA_SET_ADJINO(qdata); + + QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); + if (lqs) + lqs_putref(lqs); + return rc; } -int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[], - unsigned int qpids[], int rc, int opc) +int mds_quota_adjust(struct obd_device *obd, const unsigned int qcids[], + const unsigned int qpids[], int rc, int opc) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; int rc2 = 0; ENTRY; - if (rc && rc != -EDQUOT) + if (rc && rc != -EDQUOT && rc != ENOLCK) RETURN(0); switch (opc) { - case FSFILT_OP_RENAME: - /* acquire/release block quota on owner of original parent */ - rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0); - /* fall-through */ case FSFILT_OP_SETATTR: - /* acquire/release file quota on original owner */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0); - /* fall-through */ - case FSFILT_OP_CREATE: + /* release file quota on original owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 0, 0, NULL); + /* release block quota on original owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* acquire file quota on current owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* acquire block quota on current owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_UNLINK_PARTIAL_CHILD: + /* release file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* rlease block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_CREATE_PARTIAL_CHILD: + /* acquire file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* acquire block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_LINK: + /* acquire block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + break; case FSFILT_OP_UNLINK: - /* acquire/release file/block quota on owner of child (or current owner) */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0); - rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); - /* acquire/release block quota on owner of parent (or original owner) */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0); + /* release block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* release file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + if (qpids[0] != qcids[0] || qpids[1] != qcids[1]) + /* release block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, + NULL); + break; + case FSFILT_OP_UNLINK_PARTIAL_PARENT: + /* release block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + break; + case FSFILT_OP_CREATE: + /* acquire block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* acquire file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + if (qpids[0] != qcids[0] || qpids[1] != qcids[1]) + /* acquire block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, + NULL); break; default: LBUG(); @@ -325,12 +533,14 @@ int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[], } if (rc2) - CERROR("mds adjust qunit failed! (opc:%d rc:%d)\n", opc, rc2); + CDEBUG(rc2 == QUOTA_REQ_RETURNED ? D_QUOTA: D_ERROR, + "mds adjust qunit %ssuccessfully! (opc:%d rc:%d)\n", + rc2 == QUOTA_REQ_RETURNED ? "" : "un", opc, rc2); RETURN(0); } -int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[], - unsigned int qpids[], int rc, int opc) +int filter_quota_adjust(struct obd_device *obd, const unsigned int qcids[], + const unsigned int qpids[], int rc, int opc) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; int rc2 = 0; @@ -342,50 +552,128 @@ int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[], switch (opc) { case FSFILT_OP_SETATTR: /* acquire/release block quota on original & current owner */ - rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); - rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0); + rc = qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + rc2 = qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); break; case FSFILT_OP_UNLINK: /* release block quota on this owner */ case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */ /* acquire block quota on this owner */ - rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); + rc = qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); break; default: LBUG(); break; } - if (rc || rc2) - CERROR("filter adjust qunit failed! (opc:%d rc%d)\n", - opc, rc ?: rc2); + if (rc || rc2) { + if (!rc) + rc = rc2; + CDEBUG(rc == QUOTA_REQ_RETURNED ? D_QUOTA: D_ERROR, + "filter adjust qunit %ssuccessfully! (opc:%d rc%d)\n", + QUOTA_REQ_RETURNED ? "" : "un", opc, rc); + } + RETURN(0); } -#define LUSTRE_ADMIN_QUOTAFILES {\ - "admin_quotafile.usr", /* user admin quotafile */\ - "admin_quotafile.grp" /* group admin quotafile */\ -} static const char prefix[] = "OBJECTS/"; +int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl) +{ + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + int rc = 0, i; + char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; + char name[64]; + struct lvfs_run_ctxt saved; + + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + return -EINVAL; + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + down(&mds->mds_qonoff_sem); + + for (i = 0; i < MAXQUOTAS; i++) { + struct file *fp; + + if (!Q_TYPESET(oqctl, i)) + continue; + + /* quota file has been opened ? */ + if (qinfo->qi_files[i]) { + rc = -EBUSY; + goto out; + } + + LASSERT(strlen(quotafile[i]) + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); + + fp = filp_open(name, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (IS_ERR(fp)) { + rc = PTR_ERR(fp); + CERROR("error invalidating admin quotafile %s (rc:%d)\n", + name, rc); + } + else + filp_close(fp, 0); + } + +out: + up(&mds->mds_qonoff_sem); + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + return rc; +} + +int mds_quota_finvalidate(struct obd_device *obd, struct obd_quotactl *oqctl) +{ + struct mds_obd *mds = &obd->u.mds; + int rc; + struct lvfs_run_ctxt saved; + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + down(&mds->mds_qonoff_sem); + + oqctl->qc_cmd = Q_FINVALIDATE; + rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + if (!rc) + rc = obd_quotactl(mds->mds_osc_exp, oqctl); + + up(&mds->mds_qonoff_sem); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + return rc; +} + int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES; + const char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; struct lvfs_run_ctxt saved; char name[64]; int i, rc = 0; - struct dentry *dparent = mds->mds_objects_dir; - struct inode *iparent = dparent->d_inode; ENTRY; - LASSERT(iparent); + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); down(&mds->mds_qonoff_sem); - for (i = 0; i < MAXQUOTAS; i++) { - struct dentry *de; + + for (i = 0; i < MAXQUOTAS && !rc; i++) { struct file *fp; if (!Q_TYPESET(oqctl, i)) @@ -398,33 +686,44 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) continue; } - /* lookup quota file */ - rc = 0; - LOCK_INODE_MUTEX(iparent); - de = lookup_one_len(quotafiles[i], dparent, - strlen(quotafiles[i])); - UNLOCK_INODE_MUTEX(iparent); - if (IS_ERR(de) || de->d_inode == NULL || - !S_ISREG(de->d_inode->i_mode)) - rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT; - if (!IS_ERR(de)) - dput(de); - - if (rc && rc != -ENOENT) { - CERROR("error lookup quotafile %s! (rc:%d)\n", + LASSERT(strlen(quotafile[i]) + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); + + /* check if quota file exists and is correct */ + fp = filp_open(name, O_RDONLY, 0); + if (!IS_ERR(fp)) { + /* irregular file is not the right place for quota */ + if (!S_ISREG(fp->f_dentry->d_inode->i_mode)) { + CERROR("admin quota file %s is not " + "regular!", name); + filp_close(fp, 0); + rc = -EINVAL; + break; + } + qinfo->qi_files[i] = fp; + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK); + qinfo->qi_files[i] = 0; + filp_close(fp, 0); + } + else + rc = PTR_ERR(fp); + + if (!rc) + continue; + + /* -EINVAL may be returned by quotainfo for bad quota file */ + if (rc != -ENOENT && rc != -EINVAL) { + CERROR("error opening old quota file %s (%d)\n", name, rc); break; - } else if (!rc) { - continue; } - LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name)); - sprintf(name, "%s%s", prefix, quotafiles[i]); + CDEBUG(D_INFO, "%s new quota file %s\n", name, + rc == -ENOENT ? "creating" : "overwriting"); - LASSERT(rc == -ENOENT); - /* create quota file */ - fp = filp_open(name, O_CREAT | O_EXCL, 0644); - if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) { + /* create quota file overwriting old if needed */ + fp = filp_open(name, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (IS_ERR(fp)) { rc = PTR_ERR(fp); CERROR("error creating admin quotafile %s (rc:%d)\n", name, rc); @@ -432,15 +731,14 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) } qinfo->qi_files[i] = fp; - rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO); - filp_close(fp, 0); - qinfo->qi_files[i] = NULL; - if (rc) { + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO); + if (rc) CERROR("error init %s admin quotafile! (rc:%d)\n", i == USRQUOTA ? "user" : "group", rc); - break; - } + + filp_close(fp, 0); + qinfo->qi_files[i] = NULL; } up(&mds->mds_qonoff_sem); @@ -448,7 +746,7 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(rc); } -static int close_quota_files(struct obd_quotactl *oqctl, +static int close_quota_files(struct obd_quotactl *oqctl, struct lustre_quota_info *qinfo) { int i, rc = 0; @@ -471,13 +769,12 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES; + const char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; char name[64]; int i, rc = 0; - struct inode *iparent = mds->mds_objects_dir->d_inode; ENTRY; - LASSERT(iparent); + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); /* open admin quota files and read quotafile info */ for (i = 0; i < MAXQUOTAS; i++) { @@ -486,27 +783,33 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) if (!Q_TYPESET(oqctl, i)) continue; - LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name)); - sprintf(name, "%s%s", prefix, quotafiles[i]); + LASSERT(strlen(quotafile[i]) + + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); if (qinfo->qi_files[i] != NULL) { rc = -EBUSY; break; } - fp = filp_open(name, O_RDWR | O_EXCL, 0644); + fp = filp_open(name, O_RDWR, 0); if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) { - rc = PTR_ERR(fp); - CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR, - "open %s failed! (rc:%d)\n", name, rc); + rc = IS_ERR(fp) ? PTR_ERR(fp) : -EINVAL; + CERROR("error open/create %s! (rc:%d)\n", name, rc); break; } qinfo->qi_files[i] = fp; + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK); + if (rc) { + CERROR("invalid quota file %s! (rc:%d)\n", name, rc); + break; + } + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_RD_INFO); if (rc) { - CERROR("error read quotainfo of %s! (rc:%d)\n", - name, rc); + CERROR("error read quotainfo of %s! (rc:%d)\n", name, + rc); break; } } @@ -517,8 +820,8 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(rc); } -static int mds_admin_quota_off(struct obd_device *obd, - struct obd_quotactl *oqctl) +int mds_admin_quota_off(struct obd_device *obd, + struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; @@ -538,25 +841,32 @@ int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) int rc; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + if (!atomic_dec_and_test(&obt->obt_quotachecking)) { CDEBUG(D_INFO, "other people are doing quotacheck\n"); atomic_inc(&obt->obt_quotachecking); RETURN(-EBUSY); } + LASSERT(!obt->obt_qctxt.lqc_immutable); down(&mds->mds_qonoff_sem); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = mds_admin_quota_on(obd, oqctl); if (rc) - goto out; - - rc = obd_quotactl(mds->mds_osc_exp, oqctl); - if (rc) - goto out; + GOTO(out, rc); rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); if (!rc) - obt->obt_qctxt.lqc_status = 1; + obt->obt_qctxt.lqc_flags |= UGQUOTA2LQC(oqctl->qc_type); + else + GOTO(out, rc); + + rc = obd_quotactl(mds->mds_osc_exp, oqctl); + out: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); up(&mds->mds_qonoff_sem); @@ -569,9 +879,17 @@ int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) struct mds_obd *mds = &obd->u.mds; struct obd_device_target *obt = &obd->u.obt; struct lvfs_run_ctxt saved; - int rc, rc2; + int rc, rc2, imm; ENTRY; + imm = oqctl->qc_type & IMMQUOTA; + oqctl->qc_type &= ~IMMQUOTA; + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + if (!atomic_dec_and_test(&obt->obt_quotachecking)) { CDEBUG(D_INFO, "other people are doing quotacheck\n"); atomic_inc(&obt->obt_quotachecking); @@ -585,9 +903,11 @@ int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) rc = obd_quotactl(mds->mds_osc_exp, oqctl); rc2 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); - if (!rc2) - obt->obt_qctxt.lqc_status = 0; - + if (!rc2) { + if (imm) + obt->obt_qctxt.lqc_immutable = 1; + obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type); + } pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); up(&mds->mds_qonoff_sem); atomic_inc(&obt->obt_quotachecking); @@ -603,6 +923,10 @@ int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) int rc; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { rc = -ESRCH; @@ -628,6 +952,10 @@ int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) int rc = 0; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { rc = -ESRCH; @@ -643,11 +971,125 @@ out: RETURN(rc); } +int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt, + struct lustre_dquot *dquot, __u32 ost_num, __u32 mdt_num, + int type, struct quota_adjust_qunit *oqaq) +{ + __u64 bunit_curr_o, iunit_curr_o; + unsigned long shrink_qunit_limit = qctxt->lqc_cqs_boundary_factor; + unsigned long cqs_factor = qctxt->lqc_cqs_qs_factor; + __u64 blimit = dquot->dq_dqb.dqb_bhardlimit ? + dquot->dq_dqb.dqb_bhardlimit : dquot->dq_dqb.dqb_bsoftlimit; + __u64 ilimit = dquot->dq_dqb.dqb_ihardlimit ? + dquot->dq_dqb.dqb_ihardlimit : dquot->dq_dqb.dqb_isoftlimit; + int rc = 0; + ENTRY; + + if (!dquot || !oqaq) + RETURN(-EINVAL); + LASSERT_SEM_LOCKED(&dquot->dq_sem); + LASSERT(oqaq->qaq_iunit_sz); + LASSERT(oqaq->qaq_bunit_sz); + + /* don't change qunit size */ + if (!qctxt->lqc_switch_qs) + RETURN(rc); + + bunit_curr_o = oqaq->qaq_bunit_sz; + iunit_curr_o = oqaq->qaq_iunit_sz; + + if (dquot->dq_type == GRPQUOTA) + QAQ_SET_GRP(oqaq); + + if ((type & LQUOTA_FLAGS_ADJBLK) && blimit) { + __u64 b_limitation = + oqaq->qaq_bunit_sz * (ost_num + 1) * shrink_qunit_limit; + /* enlarge block qunit size */ + while (blimit > + QUSG(dquot->dq_dqb.dqb_curspace + 2 * b_limitation, 1)) { + oqaq->qaq_bunit_sz = + QUSG(oqaq->qaq_bunit_sz * cqs_factor, 1) + << QUOTABLOCK_BITS; + b_limitation = oqaq->qaq_bunit_sz * (ost_num + 1) * + shrink_qunit_limit; + } + + if (oqaq->qaq_bunit_sz > qctxt->lqc_bunit_sz) + oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz; + + /* shrink block qunit size */ + while (blimit < + QUSG(dquot->dq_dqb.dqb_curspace + b_limitation, 1)) { + do_div(oqaq->qaq_bunit_sz , cqs_factor); + oqaq->qaq_bunit_sz = QUSG(oqaq->qaq_bunit_sz, 1) << + QUOTABLOCK_BITS; + b_limitation = oqaq->qaq_bunit_sz * (ost_num + 1) * + shrink_qunit_limit; + if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit) + break; + } + + if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit) + oqaq->qaq_bunit_sz = qctxt->lqc_cqs_least_bunit; + + if (bunit_curr_o != oqaq->qaq_bunit_sz) + QAQ_SET_ADJBLK(oqaq); + + } + + if ((type & LQUOTA_FLAGS_ADJINO) && ilimit) { + __u64 i_limitation = + oqaq->qaq_iunit_sz * mdt_num * shrink_qunit_limit; + /* enlarge file qunit size */ + while (ilimit > dquot->dq_dqb.dqb_curinodes + + 2 * i_limitation) { + oqaq->qaq_iunit_sz = oqaq->qaq_iunit_sz * cqs_factor; + i_limitation = oqaq->qaq_iunit_sz * mdt_num * + shrink_qunit_limit; + } + + if (oqaq->qaq_iunit_sz > qctxt->lqc_iunit_sz) + oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz; + + /* shrink file qunit size */ + while (ilimit < dquot->dq_dqb.dqb_curinodes + + i_limitation) { + do_div(oqaq->qaq_iunit_sz, cqs_factor); + i_limitation = oqaq->qaq_iunit_sz * mdt_num * + shrink_qunit_limit; + if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit) + break; + } + + if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit) + oqaq->qaq_iunit_sz = qctxt->lqc_cqs_least_iunit; + + if (iunit_curr_o != oqaq->qaq_iunit_sz) + QAQ_SET_ADJINO(oqaq); + + } + + if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit && + !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) { + oqaq->qaq_bunit_sz = 0; + oqaq->qaq_iunit_sz = 0; + QAQ_SET_ADJBLK(oqaq); + QAQ_SET_ADJINO(oqaq); + } + + QAQ_DEBUG(oqaq, "the oqaq computed\n"); + + RETURN(rc); +} + static int mds_init_slave_ilimits(struct obd_device *obd, - struct obd_quotactl *oqctl, int set) + struct obd_quotactl *oqctl, int set, + struct quota_adjust_qunit *oqaq) { /* XXX: for file limits only adjust local now */ - unsigned int uid = 0, gid = 0; + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + unsigned int id[MAXQUOTAS] = { 0, 0 }; struct obd_quotactl *ioqc = NULL; int flag; int rc; @@ -655,21 +1097,29 @@ static int mds_init_slave_ilimits(struct obd_device *obd, /* if we are going to set zero limit, needn't init slaves */ if (!oqctl->qc_dqblk.dqb_ihardlimit && !oqctl->qc_dqblk.dqb_isoftlimit && - set) + !set) RETURN(0); OBD_ALLOC_PTR(ioqc); if (!ioqc) RETURN(-ENOMEM); - - flag = oqctl->qc_dqblk.dqb_ihardlimit || - oqctl->qc_dqblk.dqb_isoftlimit || set; + + flag = oqctl->qc_dqblk.dqb_ihardlimit || + oqctl->qc_dqblk.dqb_isoftlimit || !set; ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA; ioqc->qc_id = oqctl->qc_id; ioqc->qc_type = oqctl->qc_type; ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS; ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0; + if (QAQ_IS_ADJINO(oqaq)) { + /* adjust the mds slave's inode qunit size */ + rc = quota_adjust_slave_lqs(oqaq, qctxt); + if (rc < 0) + CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \ + failed! (rc:%d)\n", rc); + } + /* set local limit to MIN_QLIMIT */ rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc); if (rc) @@ -677,13 +1127,18 @@ static int mds_init_slave_ilimits(struct obd_device *obd, /* trigger local qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; - rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0); + rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 0, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } if (rc) { - CERROR("error mds adjust local file quota! (rc:%d)\n", rc); + CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n", + rc); GOTO(out, rc); } /* FIXME initialize all slaves in CMD */ @@ -695,31 +1150,41 @@ out: } static int mds_init_slave_blimits(struct obd_device *obd, - struct obd_quotactl *oqctl, int set) + struct obd_quotactl *oqctl, int set, + struct quota_adjust_qunit *oqaq) { + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; struct mds_obd *mds = &obd->u.mds; struct obd_quotactl *ioqc; - unsigned int uid = 0, gid = 0; + unsigned int id[MAXQUOTAS] = { 0, 0 }; + int rc, rc1 = 0; int flag; - int rc; ENTRY; /* if we are going to set zero limit, needn't init slaves */ if (!oqctl->qc_dqblk.dqb_bhardlimit && !oqctl->qc_dqblk.dqb_bsoftlimit && - set) + !set) RETURN(0); OBD_ALLOC_PTR(ioqc); if (!ioqc) RETURN(-ENOMEM); - flag = oqctl->qc_dqblk.dqb_bhardlimit || - oqctl->qc_dqblk.dqb_bsoftlimit || set; + flag = oqctl->qc_dqblk.dqb_bhardlimit || + oqctl->qc_dqblk.dqb_bsoftlimit || !set; ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA; ioqc->qc_id = oqctl->qc_id; ioqc->qc_type = oqctl->qc_type; ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS; ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0; + if (QAQ_IS_ADJBLK(oqaq)) { + /* adjust the mds slave's block qunit size */ + rc1 = quota_adjust_slave_lqs(oqaq, qctxt); + if (rc1 < 0) + CERROR("adjust mds slave's block qunit size failed!" + "(rc:%d)\n", rc1); + } rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc); if (rc) @@ -727,18 +1192,29 @@ static int mds_init_slave_blimits(struct obd_device *obd, /* trigger local qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; + + /* initialize all slave's limit */ + rc = obd_quotactl(mds->mds_osc_exp, ioqc); - rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0); + rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } if (rc) { CERROR("error mds adjust local block quota! (rc:%d)\n", rc); GOTO(out, rc); } - /* initialize all slave's limit */ - rc = obd_quotactl(mds->mds_osc_exp, ioqc); + /* adjust all slave's qunit size when setting quota + * this is will create a lqs for every ost, which will present + * certain uid/gid is set quota or not */ + QAQ_SET_ADJBLK(oqaq); + rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt); + EXIT; out: OBD_FREE_PTR(ioqc); @@ -748,15 +1224,31 @@ out: int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp); + struct lov_obd *lov = &lov_obd->u.lov; + struct quota_adjust_qunit *oqaq = NULL; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - __u32 ihardlimit, isoftlimit, bhardlimit, bsoftlimit; + __u64 ihardlimit, isoftlimit, bhardlimit, bsoftlimit; time_t btime, itime; struct lustre_dquot *dquot; struct obd_dqblk *dqblk = &oqctl->qc_dqblk; - int set, rc; + /* orig_set means if quota was set before; now_set means we are + * setting/cancelling quota */ + int orig_set, now_set; + int rc, rc2 = 0, flag = 0; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + RETURN(-ENOMEM); down(&mds->mds_qonoff_sem); + init_oqaq(oqaq, qctxt, oqctl->qc_id, oqctl->qc_type); + if (qinfo->qi_files[oqctl->qc_type] == NULL) GOTO(out_sem, rc = -ESRCH); @@ -791,18 +1283,20 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) dquot->dq_dqb.dqb_bhardlimit = dqblk->dqb_bhardlimit; dquot->dq_dqb.dqb_bsoftlimit = dqblk->dqb_bsoftlimit; /* clear usage (limit pool) */ - if (!dquot->dq_dqb.dqb_bhardlimit && + if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit) dquot->dq_dqb.dqb_curspace = 0; /* clear grace time */ - if (!dqblk->dqb_bsoftlimit || + if (!dqblk->dqb_bsoftlimit || toqb(dquot->dq_dqb.dqb_curspace) <= dqblk->dqb_bsoftlimit) dquot->dq_dqb.dqb_btime = 0; /* set grace only if user hasn't provided his own */ else if (!(dqblk->dqb_valid & QIF_BTIME)) - dquot->dq_dqb.dqb_btime = cfs_time_current_sec() + + dquot->dq_dqb.dqb_btime = cfs_time_current_sec() + qinfo->qi_info[dquot->dq_type].dqi_bgrace; + + flag |= LQUOTA_FLAGS_ADJBLK; } if (dqblk->dqb_valid & QIF_ILIMITS) { @@ -819,7 +1313,16 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) else if (!(dqblk->dqb_valid & QIF_ITIME)) dquot->dq_dqb.dqb_itime = cfs_time_current_sec() + qinfo->qi_info[dquot->dq_type].dqi_igrace; + + flag |= LQUOTA_FLAGS_ADJINO; } + QAQ_DEBUG(oqaq, "before dquot_create_oqaq\n"); + rc = dquot_create_oqaq(qctxt, dquot, lov->desc.ld_tgt_count, 1, + flag, oqaq); + QAQ_DEBUG(oqaq, "after dquot_create_oqaq\n"); + if (rc < 0) + CDEBUG(D_QUOTA, "adjust qunit size failed! (rc:%d)\n", rc); + rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); @@ -831,38 +1334,47 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) } up(&mds->mds_qonoff_sem); - if (dqblk->dqb_valid & QIF_ILIMITS) { - set = !(ihardlimit || isoftlimit); - rc = mds_init_slave_ilimits(obd, oqctl, set); + orig_set = ihardlimit || isoftlimit; + now_set = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit; + if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) { + down(&dquot->dq_sem); + dquot->dq_dqb.dqb_curinodes = 0; + up(&dquot->dq_sem); + rc = mds_init_slave_ilimits(obd, oqctl, orig_set, oqaq); if (rc) { CERROR("init slave ilimits failed! (rc:%d)\n", rc); goto revoke_out; } } - if (dqblk->dqb_valid & QIF_BLIMITS) { - set = !(bhardlimit || bsoftlimit); - rc = mds_init_slave_blimits(obd, oqctl, set); + orig_set = bhardlimit || bsoftlimit; + now_set = dqblk->dqb_bhardlimit || dqblk->dqb_bsoftlimit; + if (dqblk->dqb_valid & QIF_BLIMITS && orig_set != now_set) { + down(&dquot->dq_sem); + dquot->dq_dqb.dqb_curspace = 0; + up(&dquot->dq_sem); + rc = mds_init_slave_blimits(obd, oqctl, orig_set, oqaq); if (rc) { CERROR("init slave blimits failed! (rc:%d)\n", rc); goto revoke_out; } } - down(&mds->mds_qonoff_sem); revoke_out: + down(&mds->mds_qonoff_sem); + down(&dquot->dq_sem); if (rc) { /* cancel previous setting */ - down(&dquot->dq_sem); dquot->dq_dqb.dqb_ihardlimit = ihardlimit; dquot->dq_dqb.dqb_isoftlimit = isoftlimit; dquot->dq_dqb.dqb_bhardlimit = bhardlimit; dquot->dq_dqb.dqb_bsoftlimit = bsoftlimit; dquot->dq_dqb.dqb_btime = btime; dquot->dq_dqb.dqb_itime = itime; - fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); - up(&dquot->dq_sem); } + rc2 = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); + up(&dquot->dq_sem); + out: down(&dquot->dq_sem); dquot->dq_status &= ~DQ_STATUS_SET; @@ -871,14 +1383,18 @@ out: EXIT; out_sem: up(&mds->mds_qonoff_sem); - return rc; + + if (oqaq) + OBD_FREE_PTR(oqaq); + + return rc ? rc : rc2; } static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl) { struct obd_quotactl *soqc; struct lvfs_run_ctxt saved; - int rc; + int rc, rc1; ENTRY; OBD_ALLOC_PTR(soqc); @@ -889,26 +1405,29 @@ static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl) soqc->qc_id = oqctl->qc_id; soqc->qc_type = oqctl->qc_type; + /* get block usage from OSS */ + soqc->qc_dqblk.dqb_curspace = 0; rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc); - if (rc) - GOTO(out, rc); - - oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace; + if (!rc || rc == -EREMOTEIO) { + oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace; + oqctl->qc_dqblk.dqb_valid |= QIF_SPACE; + } - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + /* get block/inode usage from MDS */ soqc->qc_dqblk.dqb_curspace = 0; - rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc); + soqc->qc_dqblk.dqb_curinodes = 0; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (!rc1) { + oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace; + oqctl->qc_dqblk.dqb_curinodes = soqc->qc_dqblk.dqb_curinodes; + oqctl->qc_dqblk.dqb_valid |= QIF_INODES; + } - if (rc) - GOTO(out, rc); - - oqctl->qc_dqblk.dqb_curinodes += soqc->qc_dqblk.dqb_curinodes; - oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace; - EXIT; -out: OBD_FREE_PTR(soqc); - return rc; + + RETURN(rc ? : rc1); } int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) @@ -920,7 +1439,12 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) int rc; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); + dqblk->dqb_valid = 0; if (qinfo->qi_files[oqctl->qc_type] == NULL) GOTO(out, rc = -ESRCH); @@ -935,6 +1459,7 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) dqblk->dqb_bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit; dqblk->dqb_btime = dquot->dq_dqb.dqb_btime; dqblk->dqb_itime = dquot->dq_dqb.dqb_itime; + dqblk->dqb_valid |= QIF_LIMITS | QIF_TIMES; up(&dquot->dq_sem); lustre_dqput(dquot); @@ -969,7 +1494,7 @@ static int dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) { struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo= &obd->u.mds.mds_quota_info; + struct lustre_quota_info *qinfo= &mds->mds_quota_info; struct lustre_dquot *dquot; struct obd_quotactl *qctl; __u64 total_limits = 0; @@ -1002,7 +1527,7 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) qctl->qc_type = type; qctl->qc_id = id; qctl->qc_stat = QUOTA_RECOVERING; - rc = obd_quotactl(obd->u.mds.mds_osc_exp, qctl); + rc = obd_quotactl(mds->mds_osc_exp, qctl); if (rc) GOTO(out, rc); total_limits = qctl->qc_dqblk.dqb_bhardlimit; @@ -1046,17 +1571,22 @@ static int qmaster_recovery_main(void *arg) { struct qmaster_recov_thread_data *data = arg; struct obd_device *obd = data->obd; + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; int rc = 0; unsigned short type; ENTRY; ptlrpc_daemonize("qmaster_recovd"); + /* for mds */ + class_incref(obd, "qmaster_recovd_mds", obd); + /* for lov */ + class_incref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); + complete(&data->comp); for (type = USRQUOTA; type < MAXQUOTAS; type++) { - struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo = &mds->mds_quota_info; struct list_head id_list; struct dquot_id *dqid, *tmp; @@ -1065,8 +1595,8 @@ static int qmaster_recovery_main(void *arg) up(&mds->mds_qonoff_sem); continue; } - INIT_LIST_HEAD(&id_list); - rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, + CFS_INIT_LIST_HEAD(&id_list); + rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, &id_list); up(&mds->mds_qonoff_sem); @@ -1086,23 +1616,30 @@ free: kfree(dqid); } } + class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); + class_decref(obd, "qmaster_recovd_mds", obd); RETURN(rc); } int mds_quota_recovery(struct obd_device *obd) { - struct lov_obd *lov = &obd->u.mds.mds_osc_obd->u.lov; + struct mds_obd *mds = &obd->u.mds; struct qmaster_recov_thread_data data; int rc = 0; ENTRY; - mutex_down(&lov->lov_lock); - if (lov->desc.ld_tgt_count != lov->desc.ld_active_tgt_count) { - CWARN("Not all osts are active, abort quota recovery\n"); - mutex_up(&lov->lov_lock); + if (unlikely(!mds->mds_quota || obd->obd_stopping)) + RETURN(rc); + + mutex_down(&obd->obd_dev_sem); + if (mds->mds_lov_desc.ld_active_tgt_count != mds->mds_lov_objid_count) { + CWARN("Only %u/%u OSTs are active, abort quota recovery\n", + mds->mds_lov_desc.ld_active_tgt_count, + mds->mds_lov_objid_count); + mutex_up(&obd->obd_dev_sem); RETURN(rc); } - mutex_up(&lov->lov_lock); + mutex_up(&obd->obd_dev_sem); data.obd = obd; init_completion(&data.comp); @@ -1114,3 +1651,5 @@ int mds_quota_recovery(struct obd_device *obd) wait_for_completion(&data.comp); RETURN(rc); } + +#endif /* HAVE_QUOTA_SUPPORT */