X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_master.c;h=4c68085f44ab76c115488c4461fd3eb74eab62a5;hb=e7f3c2955a426bbe3e39b32208056b9ab8667190;hp=6057238738e3e3845248f47b22ff07c08c292945;hpb=70e80ade90af09300396706b8910e196a7928520;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index 6057238..4c68085 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -16,8 +16,8 @@ * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see [sun.com URL with a - * copy of GPLv2]. + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or @@ -44,7 +44,7 @@ # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #include #include @@ -62,8 +62,9 @@ #include "quota_internal.h" -/* lock ordering: - * mds->mds_qonoff_sem > dquot->dq_sem */ +#ifdef HAVE_QUOTA_SUPPORT + +/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */ static struct list_head lustre_dquot_hash[NR_DQHASH]; static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED; @@ -140,7 +141,7 @@ static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, struct lustre_dquot *dquot = NULL; ENTRY; - OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot)); + OBD_SLAB_ALLOC_PTR_GFP(dquot, lustre_dquot_cachep, CFS_ALLOC_IO); if (dquot == NULL) RETURN(NULL); @@ -198,7 +199,7 @@ static struct lustre_dquot *lustre_dqget(struct obd_device *obd, if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); - + spin_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { dquot->dq_refcnt++; @@ -226,24 +227,134 @@ static struct lustre_dquot *lustre_dqget(struct obd_device *obd, RETURN(dquot); } +static void init_oqaq(struct quota_adjust_qunit *oqaq, + struct lustre_quota_ctxt *qctxt, + qid_t id, int type) +{ + struct lustre_qunit_size *lqs = NULL; + + oqaq->qaq_id = id; + oqaq->qaq_flags = type; + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 0); + if (lqs && !IS_ERR(lqs)) { + spin_lock(&lqs->lqs_lock); + oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz; + oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz; + oqaq->qaq_flags = lqs->lqs_flags; + spin_unlock(&lqs->lqs_lock); + lqs_putref(lqs); + } else { + CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); + oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz; + oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz; + } +} + +int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, + __u32 is_blk) +{ + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct obd_device *lov_mds_obd = class_exp2obd(mds->mds_osc_exp); + struct lov_obd *lov = &lov_mds_obd->u.lov; + __u32 ost_num = lov->desc.ld_tgt_count, mdt_num = 1; + struct quota_adjust_qunit *oqaq = NULL; + unsigned int qid[MAXQUOTAS] = { 0, 0 }; + struct lustre_quota_info *info = &mds->mds_quota_info; + struct lustre_dquot *dquot = NULL; + int adjust_res = 0; + int rc = 0; + ENTRY; + + LASSERT(mds); + dquot = lustre_dqget(obd, info, id, type); + if (IS_ERR(dquot)) + RETURN(PTR_ERR(dquot)); + + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + GOTO(out, rc = -ENOMEM); + + down(&dquot->dq_sem); + init_oqaq(oqaq, qctxt, id, type); + + rc = dquot_create_oqaq(qctxt, dquot, ost_num, mdt_num, + is_blk ? LQUOTA_FLAGS_ADJBLK : + LQUOTA_FLAGS_ADJINO, oqaq); + + if (rc < 0) { + CDEBUG(D_ERROR, "create oqaq failed! (rc:%d)\n", rc); + GOTO(out_sem, rc); + } + QAQ_DEBUG(oqaq, "show oqaq.\n") + + if (!QAQ_IS_ADJBLK(oqaq) && !QAQ_IS_ADJINO(oqaq)) + GOTO(out_sem, rc); + + /* adjust the mds slave qunit size */ + adjust_res = quota_adjust_slave_lqs(oqaq, qctxt); + if (adjust_res <= 0) { + if (adjust_res < 0) { + rc = adjust_res; + CDEBUG(D_ERROR, "adjust mds slave's qunit size failed! \ + (rc:%d)\n", rc); + } else { + CDEBUG(D_QUOTA, "qunit doesn't need to be adjusted.\n"); + } + GOTO(out_sem, rc); + } + + if (type) + qid[GRPQUOTA] = dquot->dq_id; + else + qid[USRQUOTA] = dquot->dq_id; + + up(&dquot->dq_sem); + + rc = qctxt_adjust_qunit(obd, qctxt, qid, is_blk, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } + if (rc) { + CDEBUG(D_ERROR, "mds fail to adjust file quota! \ + (rc:%d)\n", rc); + GOTO(out, rc); + } + + /* only when block qunit is reduced, boardcast to osts */ + if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq)) + rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt); + +out: + lustre_dqput(dquot); + if (oqaq) + OBD_FREE_PTR(oqaq); + + RETURN(rc); +out_sem: + up(&dquot->dq_sem); + goto out; +} + int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) { struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; struct lustre_quota_info *info = &mds->mds_quota_info; struct lustre_dquot *dquot = NULL; __u64 *usage = NULL; - __u32 hlimit = 0, slimit = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; + __u64 hlimit = 0, slimit = 0; time_t *time = NULL; unsigned int grace = 0; + struct lustre_qunit_size *lqs = NULL; int rc = 0; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ)) RETURN(-EIO); - dquot = lustre_dqget(obd, info, qdata->qd_id, qdata_type); + dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); @@ -258,14 +369,14 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) GOTO(out, rc = -EBUSY); } - if (is_blk) { - grace = info->qi_info[qdata_type].dqi_bgrace; + if (QDATA_IS_BLK(qdata)) { + grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; hlimit = dquot->dq_dqb.dqb_bhardlimit; slimit = dquot->dq_dqb.dqb_bsoftlimit; time = &dquot->dq_dqb.dqb_btime; } else { - grace = info->qi_info[qdata_type].dqi_igrace; + grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_igrace; usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes; hlimit = dquot->dq_dqb.dqb_ihardlimit; slimit = dquot->dq_dqb.dqb_isoftlimit; @@ -281,12 +392,21 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) switch (opc) { case QUOTA_DQACQ: - if (hlimit && - QUSG(*usage + qdata->qd_count, is_blk) > hlimit) - GOTO(out, rc = -EDQUOT); + if (hlimit && + QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > hlimit) + { + if (QDATA_IS_CHANGE_QS(qdata) && + QUSG(*usage, QDATA_IS_BLK(qdata)) < hlimit) + qdata->qd_count = (hlimit - + QUSG(*usage, QDATA_IS_BLK(qdata))) + * (QDATA_IS_BLK(qdata) ? + QUOTABLOCK_SIZE : 1); + else + GOTO(out, rc = -EDQUOT); + } if (slimit && - QUSG(*usage + qdata->qd_count, is_blk) > slimit) { + QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > slimit) { if (*time && cfs_time_current_sec() >= *time) GOTO(out, rc = -EDQUOT); else if (!*time) @@ -304,7 +424,7 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) *usage -= qdata->qd_count; /* (usage <= soft limit) but not (usage < soft limit) */ - if (!slimit || QUSG(*usage, is_blk) <= slimit) + if (!slimit || QUSG(*usage, QDATA_IS_BLK(qdata)) <= slimit) *time = 0; break; default: @@ -317,35 +437,95 @@ out: up(&dquot->dq_sem); up(&mds->mds_qonoff_sem); lustre_dqput(dquot); + if (rc != -EDQUOT) + dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), + QDATA_IS_BLK(qdata)); + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : + qctxt->lqc_iunit_sz; + } else { + spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + spin_unlock(&lqs->lqs_lock); + } + + if (QDATA_IS_BLK(qdata)) + QDATA_SET_ADJBLK(qdata); + else + QDATA_SET_ADJINO(qdata); + + QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); + if (lqs) + lqs_putref(lqs); + return rc; } -int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[], - unsigned int qpids[], int rc, int opc) +int mds_quota_adjust(struct obd_device *obd, const unsigned int qcids[], + const unsigned int qpids[], int rc, int opc) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; int rc2 = 0; ENTRY; - if (rc && rc != -EDQUOT) + if (rc && rc != -EDQUOT && rc != ENOLCK) RETURN(0); switch (opc) { - case FSFILT_OP_RENAME: - /* acquire/release block quota on owner of original parent */ - rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0); - /* fall-through */ case FSFILT_OP_SETATTR: - /* acquire/release file quota on original owner */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0); - /* fall-through */ - case FSFILT_OP_CREATE: + /* release file quota on original owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 0, 0, NULL); + /* release block quota on original owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* acquire file quota on current owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* acquire block quota on current owner */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_UNLINK_PARTIAL_CHILD: + /* release file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* rlease block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_CREATE_PARTIAL_CHILD: + /* acquire file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + /* acquire block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + break; + case FSFILT_OP_LINK: + /* acquire block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + break; case FSFILT_OP_UNLINK: - /* acquire/release file/block quota on owner of child (or current owner) */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0); - rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); - /* acquire/release block quota on owner of parent (or original owner) */ - rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0); + /* release block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* release file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + if (qpids[0] != qcids[0] || qpids[1] != qcids[1]) + /* release block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, + NULL); + break; + case FSFILT_OP_UNLINK_PARTIAL_PARENT: + /* release block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + break; + case FSFILT_OP_CREATE: + /* acquire block quota on parent */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); + /* acquire file quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 0, 0, NULL); + if (qpids[0] != qcids[0] || qpids[1] != qcids[1]) + /* acquire block quota on child */ + rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, + NULL); break; default: LBUG(); @@ -353,12 +533,14 @@ int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[], } if (rc2) - CERROR("mds adjust qunit failed! (opc:%d rc:%d)\n", opc, rc2); + CDEBUG(rc2 == QUOTA_REQ_RETURNED ? D_QUOTA: D_ERROR, + "mds adjust qunit %ssuccessfully! (opc:%d rc:%d)\n", + rc2 == QUOTA_REQ_RETURNED ? "" : "un", opc, rc2); RETURN(0); } -int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[], - unsigned int qpids[], int rc, int opc) +int filter_quota_adjust(struct obd_device *obd, const unsigned int qcids[], + const unsigned int qpids[], int rc, int opc) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; int rc2 = 0; @@ -370,50 +552,132 @@ int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[], switch (opc) { case FSFILT_OP_SETATTR: /* acquire/release block quota on original & current owner */ - rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); - rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0); + rc = qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); + rc2 = qctxt_adjust_qunit(obd, qctxt, qpids, 1, 0, NULL); break; case FSFILT_OP_UNLINK: /* release block quota on this owner */ case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */ /* acquire block quota on this owner */ - rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0); + rc = qctxt_adjust_qunit(obd, qctxt, qcids, 1, 0, NULL); break; default: LBUG(); break; } - if (rc || rc2) - CERROR("filter adjust qunit failed! (opc:%d rc%d)\n", - opc, rc ?: rc2); + if (rc || rc2) { + if (!rc) + rc = rc2; + CDEBUG(rc == QUOTA_REQ_RETURNED ? D_QUOTA: D_ERROR, + "filter adjust qunit %ssuccessfully! (opc:%d rc%d)\n", + QUOTA_REQ_RETURNED ? "" : "un", opc, rc); + } + RETURN(0); } -#define LUSTRE_ADMIN_QUOTAFILES {\ - "admin_quotafile.usr", /* user admin quotafile */\ - "admin_quotafile.grp" /* group admin quotafile */\ -} static const char prefix[] = "OBJECTS/"; +int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl) +{ + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + struct obd_device_target *obt = &obd->u.obt; + int rc = 0, i, rc1 = 0; + char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; + char name[64]; + struct lvfs_run_ctxt saved; + ENTRY; + + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + + down(&obt->obt_quotachecking); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + down(&mds->mds_qonoff_sem); + + for (i = 0; i < MAXQUOTAS; i++) { + struct file *fp; + + if (!Q_TYPESET(oqctl, i)) + continue; + + /* quota file has been opened ? */ + if (qinfo->qi_files[i]) { + CWARN("quota[%d] is on yet\n", i); + rc1 = -EBUSY; + continue; + } + + LASSERT(strlen(quotafile[i]) + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); + + fp = filp_open(name, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (IS_ERR(fp)) { + rc = PTR_ERR(fp); + CERROR("error invalidating admin quotafile %s (rc:%d)\n", + name, rc); + } + else + filp_close(fp, 0); + } + + up(&mds->mds_qonoff_sem); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + up(&obt->obt_quotachecking); + RETURN(rc ? : rc1); +} + +int mds_quota_finvalidate(struct obd_device *obd, struct obd_quotactl *oqctl) +{ + struct mds_obd *mds = &obd->u.mds; + struct obd_device_target *obt = &obd->u.obt; + int rc; + struct lvfs_run_ctxt saved; + ENTRY; + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + + down(&obt->obt_quotachecking); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + down(&mds->mds_qonoff_sem); + + oqctl->qc_cmd = Q_FINVALIDATE; + rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + if (!rc) + rc = obd_quotactl(mds->mds_osc_exp, oqctl); + + up(&mds->mds_qonoff_sem); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + up(&obt->obt_quotachecking); + RETURN(rc); +} + int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES; + const char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; struct lvfs_run_ctxt saved; char name[64]; int i, rc = 0; - struct dentry *dparent = mds->mds_objects_dir; - struct inode *iparent = dparent->d_inode; ENTRY; - LASSERT(iparent); + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); down(&mds->mds_qonoff_sem); - for (i = 0; i < MAXQUOTAS; i++) { - struct dentry *de; + + for (i = 0; i < MAXQUOTAS && !rc; i++) { struct file *fp; if (!Q_TYPESET(oqctl, i)) @@ -426,33 +690,44 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) continue; } - /* lookup quota file */ - rc = 0; - LOCK_INODE_MUTEX(iparent); - de = lookup_one_len(quotafiles[i], dparent, - strlen(quotafiles[i])); - UNLOCK_INODE_MUTEX(iparent); - if (IS_ERR(de) || de->d_inode == NULL || - !S_ISREG(de->d_inode->i_mode)) - rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT; - if (!IS_ERR(de)) - dput(de); - - if (rc && rc != -ENOENT) { - CERROR("error lookup quotafile %s! (rc:%d)\n", + LASSERT(strlen(quotafile[i]) + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); + + /* check if quota file exists and is correct */ + fp = filp_open(name, O_RDONLY, 0); + if (!IS_ERR(fp)) { + /* irregular file is not the right place for quota */ + if (!S_ISREG(fp->f_dentry->d_inode->i_mode)) { + CERROR("admin quota file %s is not " + "regular!", name); + filp_close(fp, 0); + rc = -EINVAL; + break; + } + qinfo->qi_files[i] = fp; + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK); + qinfo->qi_files[i] = 0; + filp_close(fp, 0); + } + else + rc = PTR_ERR(fp); + + if (!rc) + continue; + + /* -EINVAL may be returned by quotainfo for bad quota file */ + if (rc != -ENOENT && rc != -EINVAL) { + CERROR("error opening old quota file %s (%d)\n", name, rc); break; - } else if (!rc) { - continue; } - LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name)); - sprintf(name, "%s%s", prefix, quotafiles[i]); + CDEBUG(D_INFO, "%s new quota file %s\n", name, + rc == -ENOENT ? "creating" : "overwriting"); - LASSERT(rc == -ENOENT); - /* create quota file */ - fp = filp_open(name, O_CREAT | O_EXCL, 0644); - if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) { + /* create quota file overwriting old if needed */ + fp = filp_open(name, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (IS_ERR(fp)) { rc = PTR_ERR(fp); CERROR("error creating admin quotafile %s (rc:%d)\n", name, rc); @@ -460,15 +735,14 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) } qinfo->qi_files[i] = fp; - rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO); - filp_close(fp, 0); - qinfo->qi_files[i] = NULL; - if (rc) { + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_INIT_INFO); + if (rc) CERROR("error init %s admin quotafile! (rc:%d)\n", i == USRQUOTA ? "user" : "group", rc); - break; - } + + filp_close(fp, 0); + qinfo->qi_files[i] = NULL; } up(&mds->mds_qonoff_sem); @@ -476,7 +750,7 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(rc); } -static int close_quota_files(struct obd_quotactl *oqctl, +static int close_quota_files(struct obd_quotactl *oqctl, struct lustre_quota_info *qinfo) { int i, rc = 0; @@ -486,7 +760,8 @@ static int close_quota_files(struct obd_quotactl *oqctl, if (!Q_TYPESET(oqctl, i)) continue; if (qinfo->qi_files[i] == NULL) { - rc = -ESRCH; + CWARN("quota[%d] is off already\n", i); + rc = -EALREADY; continue; } filp_close(qinfo->qi_files[i], 0); @@ -499,13 +774,12 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - const char *quotafiles[] = LUSTRE_ADMIN_QUOTAFILES; + const char *quotafile[] = LUSTRE_ADMIN_QUOTAFILES_V2; char name[64]; - int i, rc = 0; - struct inode *iparent = mds->mds_objects_dir->d_inode; + int i, rc = 0, rc1 = 0; ENTRY; - LASSERT(iparent); + LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); /* open admin quota files and read quotafile info */ for (i = 0; i < MAXQUOTAS; i++) { @@ -514,39 +788,46 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) if (!Q_TYPESET(oqctl, i)) continue; - LASSERT(strlen(quotafiles[i]) + sizeof(prefix) <= sizeof(name)); - sprintf(name, "%s%s", prefix, quotafiles[i]); + LASSERT(strlen(quotafile[i]) + + sizeof(prefix) <= sizeof(name)); + sprintf(name, "%s%s", prefix, quotafile[i]); if (qinfo->qi_files[i] != NULL) { - rc = -EBUSY; - break; + CWARN("quota[%d] is on already\n", i); + rc1 = -EALREADY; + continue; } - fp = filp_open(name, O_RDWR | O_EXCL, 0644); + fp = filp_open(name, O_RDWR, 0); if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) { - rc = PTR_ERR(fp); - CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR, - "open %s failed! (rc:%d)\n", name, rc); + rc = IS_ERR(fp) ? PTR_ERR(fp) : -EINVAL; + CERROR("error open/create %s! (rc:%d)\n", name, rc); break; } qinfo->qi_files[i] = fp; + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK); + if (rc) { + CERROR("invalid quota file %s! (rc:%d)\n", name, rc); + break; + } + rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_RD_INFO); if (rc) { - CERROR("error read quotainfo of %s! (rc:%d)\n", - name, rc); + CERROR("error read quotainfo of %s! (rc:%d)\n", name, + rc); break; } } - if (rc && rc != -EBUSY) + if (rc && rc1 != -EALREADY) close_quota_files(oqctl, qinfo); - RETURN(rc); + RETURN(rc ? : rc1); } -static int mds_admin_quota_off(struct obd_device *obd, - struct obd_quotactl *oqctl) +int mds_admin_quota_off(struct obd_device *obd, + struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_info *qinfo = &mds->mds_quota_info; @@ -562,65 +843,155 @@ int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; struct lvfs_run_ctxt saved; - int rc; + int rc = 0, rc1 = 0, rc2 = 0; ENTRY; - if (!atomic_dec_and_test(&obt->obt_quotachecking)) { - CDEBUG(D_INFO, "other people are doing quotacheck\n"); - atomic_inc(&obt->obt_quotachecking); - RETURN(-EBUSY); + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + + down(&obt->obt_quotachecking); + if (obt->obt_qctxt.lqc_immutable) { + LCONSOLE_ERROR("Failed to turn Quota on, immutable mode " + "(is SOM enabled?)\n"); + up(&obt->obt_quotachecking); + RETURN(-ECANCELED); } - down(&mds->mds_qonoff_sem); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = mds_admin_quota_on(obd, oqctl); - if (rc) - goto out; + down(&mds->mds_qonoff_sem); + rc2 = mds_admin_quota_on(obd, oqctl); + if (rc2 && rc2 != -EALREADY) { + CWARN("mds quota[%d] is failed to be on for %d\n", oqctl->qc_type, rc2); + GOTO(out, rc2); + } + + rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + if (!rc1) { + qctxt->lqc_flags |= UGQUOTA2LQC(oqctl->qc_type); + /* when quotaon, create lqs for every quota uid/gid b=18574 */ + build_lqs(obd); + } else if (rc1 == -EBUSY && quota_is_on(qctxt, oqctl)) { + CWARN("mds local quota[%d] is on already\n", oqctl->qc_type); + rc1 = -EALREADY; + } else { + if (rc2 != -EALREADY) { + CWARN("mds local quota[%d] is failed to be on for %d\n", + oqctl->qc_type, rc1); + oqctl->qc_cmd = Q_QUOTAOFF; + mds_admin_quota_off(obd, oqctl); + oqctl->qc_cmd = Q_QUOTAON; + } + GOTO(out, rc1); + } rc = obd_quotactl(mds->mds_osc_exp, oqctl); - if (rc) - goto out; + if (rc && rc != -EALREADY) { + CWARN("mds remote quota[%d] is failed to be on for %d\n", + oqctl->qc_type, rc); + oqctl->qc_cmd = Q_QUOTAOFF; + if (rc2 != -EALREADY) + mds_admin_quota_off(obd, oqctl); + if (rc1 != -EALREADY) { + fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + qctxt->lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type); + } + oqctl->qc_cmd = Q_QUOTAON; + } + + EXIT; - rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); - if (!rc) - obt->obt_qctxt.lqc_status = 1; out: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); up(&mds->mds_qonoff_sem); - atomic_inc(&obt->obt_quotachecking); - RETURN(rc); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + up(&obt->obt_quotachecking); + return rc ? : (rc1 ? : rc2); } -int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) +/* with obt->obt_quotachecking held */ +int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; struct lvfs_run_ctxt saved; - int rc, rc2; + int rc = 0, rc1 = 0, rc2 = 0, imm; ENTRY; - if (!atomic_dec_and_test(&obt->obt_quotachecking)) { - CDEBUG(D_INFO, "other people are doing quotacheck\n"); - atomic_inc(&obt->obt_quotachecking); - RETURN(-EBUSY); - } + LASSERT_SEM_LOCKED(&obt->obt_quotachecking); + + imm = oqctl->qc_type & IMMQUOTA; + oqctl->qc_type &= ~IMMQUOTA; + + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA && + oqctl->qc_type != UGQUOTA) + RETURN(-EINVAL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); down(&mds->mds_qonoff_sem); /* close admin quota files */ - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - mds_admin_quota_off(obd, oqctl); + rc2 = mds_admin_quota_off(obd, oqctl); + if (rc2 && rc2 != -EALREADY) { + CWARN("mds quota[%d] is failed to be off for %d\n", oqctl->qc_type, rc2); + GOTO(out, rc2); + } + + rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + if (!rc1) { + if (imm) + obt->obt_qctxt.lqc_immutable = 1; + obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type); + } else if (quota_is_off(qctxt, oqctl)) { + CWARN("mds local quota[%d] is off already\n", oqctl->qc_type); + rc1 = -EALREADY; + } else { + if (rc2 != -EALREADY) { + CWARN("mds local quota[%d] is failed to be off for %d\n", + oqctl->qc_type, rc1); + oqctl->qc_cmd = Q_QUOTAON; + mds_admin_quota_on(obd, oqctl); + oqctl->qc_cmd = Q_QUOTAOFF; + } + GOTO(out, rc1); + } rc = obd_quotactl(mds->mds_osc_exp, oqctl); - rc2 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); - if (!rc2) - obt->obt_qctxt.lqc_status = 0; + if (rc && rc != -EALREADY) { + CWARN("mds remote quota[%d] is failed to be off for %d\n", + oqctl->qc_type, rc); + oqctl->qc_cmd = Q_QUOTAON; + if (rc2 != -EALREADY) + mds_admin_quota_on(obd, oqctl); + if (rc1 != -EALREADY) { + fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); + if (imm) + obt->obt_qctxt.lqc_immutable = 0; + qctxt->lqc_flags |= UGQUOTA2LQC(oqctl->qc_type); + } + oqctl->qc_cmd = Q_QUOTAOFF; + } + EXIT; - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); +out: up(&mds->mds_qonoff_sem); - atomic_inc(&obt->obt_quotachecking); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + return rc ? : (rc1 ? : rc2); +} + +int mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) +{ + struct obd_device_target *obt = &obd->u.obt; + int rc; + ENTRY; - RETURN(rc ?: rc2); + down(&obt->obt_quotachecking); + rc = do_mds_quota_off(obd, oqctl); + up(&obt->obt_quotachecking); + RETURN(rc); } int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) @@ -631,10 +1002,14 @@ int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) int rc; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { - rc = -ESRCH; - goto out; + CWARN("quota[%u] is off\n", oqctl->qc_type); + GOTO(out, rc = -ESRCH); } qinfo->qi_info[oqctl->qc_type].dqi_bgrace = dqinfo->dqi_bgrace; @@ -642,10 +1017,11 @@ int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) qinfo->qi_info[oqctl->qc_type].dqi_flags = dqinfo->dqi_flags; rc = fsfilt_quotainfo(obd, qinfo, oqctl->qc_type, QFILE_WR_INFO); + EXIT; out: up(&mds->mds_qonoff_sem); - RETURN(rc); + return rc; } int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) @@ -656,18 +1032,126 @@ int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) int rc = 0; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { - rc = -ESRCH; - goto out; + CWARN("quota[%u] is off\n", oqctl->qc_type); + GOTO(out, rc = -ESRCH); } dqinfo->dqi_bgrace = qinfo->qi_info[oqctl->qc_type].dqi_bgrace; dqinfo->dqi_igrace = qinfo->qi_info[oqctl->qc_type].dqi_igrace; dqinfo->dqi_flags = qinfo->qi_info[oqctl->qc_type].dqi_flags; + EXIT; out: up(&mds->mds_qonoff_sem); + return rc; +} + +int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt, + struct lustre_dquot *dquot, __u32 ost_num, __u32 mdt_num, + int type, struct quota_adjust_qunit *oqaq) +{ + __u64 bunit_curr_o, iunit_curr_o; + unsigned long shrink_qunit_limit = qctxt->lqc_cqs_boundary_factor; + unsigned long cqs_factor = qctxt->lqc_cqs_qs_factor; + __u64 blimit = dquot->dq_dqb.dqb_bhardlimit ? + dquot->dq_dqb.dqb_bhardlimit : dquot->dq_dqb.dqb_bsoftlimit; + __u64 ilimit = dquot->dq_dqb.dqb_ihardlimit ? + dquot->dq_dqb.dqb_ihardlimit : dquot->dq_dqb.dqb_isoftlimit; + int rc = 0; + ENTRY; + + if (!dquot || !oqaq) + RETURN(-EINVAL); + LASSERT_SEM_LOCKED(&dquot->dq_sem); + LASSERT(oqaq->qaq_iunit_sz); + LASSERT(oqaq->qaq_bunit_sz); + + /* don't change qunit size */ + if (!qctxt->lqc_switch_qs) + RETURN(rc); + + bunit_curr_o = oqaq->qaq_bunit_sz; + iunit_curr_o = oqaq->qaq_iunit_sz; + + if (dquot->dq_type == GRPQUOTA) + QAQ_SET_GRP(oqaq); + + if ((type & LQUOTA_FLAGS_ADJBLK) && blimit) { + __u64 b_limitation = + oqaq->qaq_bunit_sz * (ost_num + 1) * shrink_qunit_limit; + /* enlarge block qunit size */ + while (blimit > + QUSG(dquot->dq_dqb.dqb_curspace + 2 * b_limitation, 1)) { + oqaq->qaq_bunit_sz = + QUSG(oqaq->qaq_bunit_sz * cqs_factor, 1) + << QUOTABLOCK_BITS; + b_limitation = oqaq->qaq_bunit_sz * (ost_num + 1) * + shrink_qunit_limit; + } + + if (oqaq->qaq_bunit_sz > qctxt->lqc_bunit_sz) + oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz; + + /* shrink block qunit size */ + while (blimit < + QUSG(dquot->dq_dqb.dqb_curspace + b_limitation, 1)) { + do_div(oqaq->qaq_bunit_sz , cqs_factor); + oqaq->qaq_bunit_sz = QUSG(oqaq->qaq_bunit_sz, 1) << + QUOTABLOCK_BITS; + b_limitation = oqaq->qaq_bunit_sz * (ost_num + 1) * + shrink_qunit_limit; + if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit) + break; + } + + if (oqaq->qaq_bunit_sz < qctxt->lqc_cqs_least_bunit) + oqaq->qaq_bunit_sz = qctxt->lqc_cqs_least_bunit; + + if (bunit_curr_o != oqaq->qaq_bunit_sz) + QAQ_SET_ADJBLK(oqaq); + + } + + if ((type & LQUOTA_FLAGS_ADJINO) && ilimit) { + __u64 i_limitation = + oqaq->qaq_iunit_sz * mdt_num * shrink_qunit_limit; + /* enlarge file qunit size */ + while (ilimit > dquot->dq_dqb.dqb_curinodes + + 2 * i_limitation) { + oqaq->qaq_iunit_sz = oqaq->qaq_iunit_sz * cqs_factor; + i_limitation = oqaq->qaq_iunit_sz * mdt_num * + shrink_qunit_limit; + } + + if (oqaq->qaq_iunit_sz > qctxt->lqc_iunit_sz) + oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz; + + /* shrink file qunit size */ + while (ilimit < dquot->dq_dqb.dqb_curinodes + + i_limitation) { + do_div(oqaq->qaq_iunit_sz, cqs_factor); + i_limitation = oqaq->qaq_iunit_sz * mdt_num * + shrink_qunit_limit; + if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit) + break; + } + + if (oqaq->qaq_iunit_sz < qctxt->lqc_cqs_least_iunit) + oqaq->qaq_iunit_sz = qctxt->lqc_cqs_least_iunit; + + if (iunit_curr_o != oqaq->qaq_iunit_sz) + QAQ_SET_ADJINO(oqaq); + + } + + QAQ_DEBUG(oqaq, "the oqaq computed\n"); + RETURN(rc); } @@ -675,29 +1159,49 @@ static int mds_init_slave_ilimits(struct obd_device *obd, struct obd_quotactl *oqctl, int set) { /* XXX: for file limits only adjust local now */ - unsigned int uid = 0, gid = 0; + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + unsigned int id[MAXQUOTAS] = { 0, 0 }; struct obd_quotactl *ioqc = NULL; + struct lustre_qunit_size *lqs; int flag; int rc; ENTRY; /* if we are going to set zero limit, needn't init slaves */ if (!oqctl->qc_dqblk.dqb_ihardlimit && !oqctl->qc_dqblk.dqb_isoftlimit && - set) + !set) RETURN(0); OBD_ALLOC_PTR(ioqc); if (!ioqc) RETURN(-ENOMEM); - - flag = oqctl->qc_dqblk.dqb_ihardlimit || - oqctl->qc_dqblk.dqb_isoftlimit || set; + + flag = oqctl->qc_dqblk.dqb_ihardlimit || + oqctl->qc_dqblk.dqb_isoftlimit || !set; ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA; ioqc->qc_id = oqctl->qc_id; ioqc->qc_type = oqctl->qc_type; ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS; ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0; + /* build lqs for mds */ + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, flag ? 1 : 0); + if (lqs && !IS_ERR(lqs)) { + if (flag) + lqs->lqs_flags |= QI_SET; + else + lqs->lqs_flags &= ~QI_SET; + lqs_putref(lqs); + } else { + CERROR("fail to %s lqs for inode(%s id: %u)!\n", + flag ? "create" : "search", + oqctl->qc_type ? "group" : "user", + oqctl->qc_id); + GOTO(out, rc = PTR_ERR(lqs)); + } + /* set local limit to MIN_QLIMIT */ rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc); if (rc) @@ -705,13 +1209,18 @@ static int mds_init_slave_ilimits(struct obd_device *obd, /* trigger local qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; - rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0); + rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 0, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } if (rc) { - CERROR("error mds adjust local file quota! (rc:%d)\n", rc); + CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n", + rc); GOTO(out, rc); } /* FIXME initialize all slaves in CMD */ @@ -725,68 +1234,131 @@ out: static int mds_init_slave_blimits(struct obd_device *obd, struct obd_quotactl *oqctl, int set) { + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; struct mds_obd *mds = &obd->u.mds; struct obd_quotactl *ioqc; - unsigned int uid = 0, gid = 0; - int flag; + struct lustre_qunit_size *lqs; + unsigned int id[MAXQUOTAS] = { 0, 0 }; int rc; + int flag; ENTRY; /* if we are going to set zero limit, needn't init slaves */ if (!oqctl->qc_dqblk.dqb_bhardlimit && !oqctl->qc_dqblk.dqb_bsoftlimit && - set) + !set) RETURN(0); OBD_ALLOC_PTR(ioqc); if (!ioqc) RETURN(-ENOMEM); - flag = oqctl->qc_dqblk.dqb_bhardlimit || - oqctl->qc_dqblk.dqb_bsoftlimit || set; + flag = oqctl->qc_dqblk.dqb_bhardlimit || + oqctl->qc_dqblk.dqb_bsoftlimit || !set; ioqc->qc_cmd = flag ? Q_INITQUOTA : Q_SETQUOTA; ioqc->qc_id = oqctl->qc_id; ioqc->qc_type = oqctl->qc_type; ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS; ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0; + /* build lqs for mds */ + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, flag ? 1 : 0); + if (lqs && !IS_ERR(lqs)) { + if (flag) + lqs->lqs_flags |= QB_SET; + else + lqs->lqs_flags &= ~QB_SET; + lqs_putref(lqs); + } else { + CERROR("fail to %s lqs for block(%s id: %u)!\n", + flag ? "create" : "search", + oqctl->qc_type ? "group" : "user", + oqctl->qc_id); + GOTO(out, rc = PTR_ERR(lqs)); + } + rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc); if (rc) GOTO(out, rc); /* trigger local qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; + + /* initialize all slave's limit */ + rc = obd_quotactl(mds->mds_osc_exp, ioqc); - rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0); + rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL); + if (rc == -EDQUOT || rc == -EBUSY) { + CDEBUG(D_QUOTA, "rc: %d.\n", rc); + rc = 0; + } if (rc) { CERROR("error mds adjust local block quota! (rc:%d)\n", rc); GOTO(out, rc); } - /* initialize all slave's limit */ - rc = obd_quotactl(mds->mds_osc_exp, ioqc); EXIT; out: OBD_FREE_PTR(ioqc); return rc; } +static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq) +{ + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + int rc = 0; + + QAQ_SET_CREATE_LQS(qaq); + /* adjust local lqs */ + rc = quota_adjust_slave_lqs(qaq, qctxt); + if (rc < 0) + CERROR("adjust master's qunit size failed!(rc=%d)\n", rc); + + /* adjust remote lqs */ + if (QAQ_IS_ADJBLK(qaq)) { + rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt); + if (rc < 0) + CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc); + + } +} + int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp); + struct lov_obd *lov = &lov_obd->u.lov; + struct quota_adjust_qunit *oqaq = NULL; struct lustre_quota_info *qinfo = &mds->mds_quota_info; - __u32 ihardlimit, isoftlimit, bhardlimit, bsoftlimit; + __u64 ihardlimit, isoftlimit, bhardlimit, bsoftlimit; time_t btime, itime; struct lustre_dquot *dquot; struct obd_dqblk *dqblk = &oqctl->qc_dqblk; - int set, rc; + /* orig_set means if quota was set before; now_set means we are + * setting/cancelling quota */ + int orig_set, now_set; + int rc, rc2 = 0, flag = 0; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + RETURN(-ENOMEM); down(&mds->mds_qonoff_sem); - if (qinfo->qi_files[oqctl->qc_type] == NULL) + init_oqaq(oqaq, qctxt, oqctl->qc_id, oqctl->qc_type); + + if (qinfo->qi_files[oqctl->qc_type] == NULL) { + CWARN("quota[%u] is off\n", oqctl->qc_type); GOTO(out_sem, rc = -ESRCH); + } dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); if (IS_ERR(dquot)) @@ -819,18 +1391,20 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) dquot->dq_dqb.dqb_bhardlimit = dqblk->dqb_bhardlimit; dquot->dq_dqb.dqb_bsoftlimit = dqblk->dqb_bsoftlimit; /* clear usage (limit pool) */ - if (!dquot->dq_dqb.dqb_bhardlimit && + if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit) dquot->dq_dqb.dqb_curspace = 0; /* clear grace time */ - if (!dqblk->dqb_bsoftlimit || + if (!dqblk->dqb_bsoftlimit || toqb(dquot->dq_dqb.dqb_curspace) <= dqblk->dqb_bsoftlimit) dquot->dq_dqb.dqb_btime = 0; /* set grace only if user hasn't provided his own */ else if (!(dqblk->dqb_valid & QIF_BTIME)) - dquot->dq_dqb.dqb_btime = cfs_time_current_sec() + + dquot->dq_dqb.dqb_btime = cfs_time_current_sec() + qinfo->qi_info[dquot->dq_type].dqi_bgrace; + + flag |= LQUOTA_FLAGS_ADJBLK; } if (dqblk->dqb_valid & QIF_ILIMITS) { @@ -847,7 +1421,16 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) else if (!(dqblk->dqb_valid & QIF_ITIME)) dquot->dq_dqb.dqb_itime = cfs_time_current_sec() + qinfo->qi_info[dquot->dq_type].dqi_igrace; + + flag |= LQUOTA_FLAGS_ADJINO; } + QAQ_DEBUG(oqaq, "before dquot_create_oqaq\n"); + rc = dquot_create_oqaq(qctxt, dquot, lov->desc.ld_tgt_count, 1, + flag, oqaq); + QAQ_DEBUG(oqaq, "after dquot_create_oqaq\n"); + if (rc < 0) + CDEBUG(D_QUOTA, "adjust qunit size failed! (rc:%d)\n", rc); + rc = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); @@ -859,38 +1442,50 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) } up(&mds->mds_qonoff_sem); - if (dqblk->dqb_valid & QIF_ILIMITS) { - set = !(ihardlimit || isoftlimit); - rc = mds_init_slave_ilimits(obd, oqctl, set); + + adjust_lqs(obd, oqaq); + + orig_set = ihardlimit || isoftlimit; + now_set = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit; + if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) { + down(&dquot->dq_sem); + dquot->dq_dqb.dqb_curinodes = 0; + up(&dquot->dq_sem); + rc = mds_init_slave_ilimits(obd, oqctl, orig_set); if (rc) { CERROR("init slave ilimits failed! (rc:%d)\n", rc); goto revoke_out; } } - if (dqblk->dqb_valid & QIF_BLIMITS) { - set = !(bhardlimit || bsoftlimit); - rc = mds_init_slave_blimits(obd, oqctl, set); + orig_set = bhardlimit || bsoftlimit; + now_set = dqblk->dqb_bhardlimit || dqblk->dqb_bsoftlimit; + if (dqblk->dqb_valid & QIF_BLIMITS && orig_set != now_set) { + down(&dquot->dq_sem); + dquot->dq_dqb.dqb_curspace = 0; + up(&dquot->dq_sem); + rc = mds_init_slave_blimits(obd, oqctl, orig_set); if (rc) { CERROR("init slave blimits failed! (rc:%d)\n", rc); goto revoke_out; } } - down(&mds->mds_qonoff_sem); revoke_out: + down(&mds->mds_qonoff_sem); + down(&dquot->dq_sem); if (rc) { /* cancel previous setting */ - down(&dquot->dq_sem); dquot->dq_dqb.dqb_ihardlimit = ihardlimit; dquot->dq_dqb.dqb_isoftlimit = isoftlimit; dquot->dq_dqb.dqb_bhardlimit = bhardlimit; dquot->dq_dqb.dqb_bsoftlimit = bsoftlimit; dquot->dq_dqb.dqb_btime = btime; dquot->dq_dqb.dqb_itime = itime; - fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); - up(&dquot->dq_sem); } + rc2 = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); + up(&dquot->dq_sem); + out: down(&dquot->dq_sem); dquot->dq_status &= ~DQ_STATUS_SET; @@ -899,14 +1494,18 @@ out: EXIT; out_sem: up(&mds->mds_qonoff_sem); - return rc; + + if (oqaq) + OBD_FREE_PTR(oqaq); + + return rc ? rc : rc2; } static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl) { struct obd_quotactl *soqc; struct lvfs_run_ctxt saved; - int rc; + int rc, rc1; ENTRY; OBD_ALLOC_PTR(soqc); @@ -917,26 +1516,29 @@ static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl) soqc->qc_id = oqctl->qc_id; soqc->qc_type = oqctl->qc_type; + /* get block usage from OSS */ + soqc->qc_dqblk.dqb_curspace = 0; rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc); - if (rc) - GOTO(out, rc); - - oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace; + if (!rc || rc == -EREMOTEIO) { + oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace; + oqctl->qc_dqblk.dqb_valid |= QIF_SPACE; + } - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + /* get block/inode usage from MDS */ soqc->qc_dqblk.dqb_curspace = 0; - rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc); + soqc->qc_dqblk.dqb_curinodes = 0; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, soqc); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (!rc1) { + oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace; + oqctl->qc_dqblk.dqb_curinodes = soqc->qc_dqblk.dqb_curinodes; + oqctl->qc_dqblk.dqb_valid |= QIF_INODES; + } - if (rc) - GOTO(out, rc); - - oqctl->qc_dqblk.dqb_curinodes += soqc->qc_dqblk.dqb_curinodes; - oqctl->qc_dqblk.dqb_curspace += soqc->qc_dqblk.dqb_curspace; - EXIT; -out: OBD_FREE_PTR(soqc); - return rc; + + RETURN(rc ? : rc1); } int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) @@ -948,9 +1550,16 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) int rc; ENTRY; + if (oqctl->qc_type != USRQUOTA && + oqctl->qc_type != GRPQUOTA) + RETURN(-EINVAL); + down(&mds->mds_qonoff_sem); - if (qinfo->qi_files[oqctl->qc_type] == NULL) + dqblk->dqb_valid = 0; + if (qinfo->qi_files[oqctl->qc_type] == NULL) { + CWARN("quota[%u] is off\n", oqctl->qc_type); GOTO(out, rc = -ESRCH); + } dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); if (IS_ERR(dquot)) @@ -963,15 +1572,18 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) dqblk->dqb_bsoftlimit = dquot->dq_dqb.dqb_bsoftlimit; dqblk->dqb_btime = dquot->dq_dqb.dqb_btime; dqblk->dqb_itime = dquot->dq_dqb.dqb_itime; + dqblk->dqb_valid |= QIF_LIMITS | QIF_TIMES; up(&dquot->dq_sem); lustre_dqput(dquot); + up(&mds->mds_qonoff_sem); /* the usages in admin quota file is inaccurate */ dqblk->dqb_curinodes = 0; dqblk->dqb_curspace = 0; rc = mds_get_space(obd, oqctl); EXIT; + return rc; out: up(&mds->mds_qonoff_sem); return rc; @@ -997,7 +1609,7 @@ static int dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) { struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo= &obd->u.mds.mds_quota_info; + struct lustre_quota_info *qinfo= &mds->mds_quota_info; struct lustre_dquot *dquot; struct obd_quotactl *qctl; __u64 total_limits = 0; @@ -1030,7 +1642,7 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) qctl->qc_type = type; qctl->qc_id = id; qctl->qc_stat = QUOTA_RECOVERING; - rc = obd_quotactl(obd->u.mds.mds_osc_exp, qctl); + rc = obd_quotactl(mds->mds_osc_exp, qctl); if (rc) GOTO(out, rc); total_limits = qctl->qc_dqblk.dqb_bhardlimit; @@ -1074,17 +1686,22 @@ static int qmaster_recovery_main(void *arg) { struct qmaster_recov_thread_data *data = arg; struct obd_device *obd = data->obd; + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; int rc = 0; unsigned short type; ENTRY; - ptlrpc_daemonize("qmaster_recovd"); + cfs_daemonize_ctxt("qmaster_recovd"); + + /* for mds */ + class_incref(obd, "qmaster_recovd_mds", obd); + /* for lov */ + class_incref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); complete(&data->comp); for (type = USRQUOTA; type < MAXQUOTAS; type++) { - struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo = &mds->mds_quota_info; struct list_head id_list; struct dquot_id *dqid, *tmp; @@ -1094,7 +1711,7 @@ static int qmaster_recovery_main(void *arg) continue; } CFS_INIT_LIST_HEAD(&id_list); - rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, + rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, &id_list); up(&mds->mds_qonoff_sem); @@ -1111,26 +1728,33 @@ static int qmaster_recovery_main(void *arg) CERROR("qmaster recovery failed! (id:%d type:%d" " rc:%d)\n", dqid->di_id, type, rc); free: - kfree(dqid); + OBD_FREE_PTR(dqid); } } + class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); + class_decref(obd, "qmaster_recovd_mds", obd); RETURN(rc); } int mds_quota_recovery(struct obd_device *obd) { - struct lov_obd *lov = &obd->u.mds.mds_osc_obd->u.lov; + struct mds_obd *mds = &obd->u.mds; struct qmaster_recov_thread_data data; int rc = 0; ENTRY; - mutex_down(&lov->lov_lock); - if (lov->desc.ld_tgt_count != lov->desc.ld_active_tgt_count) { - CWARN("Not all osts are active, abort quota recovery\n"); - mutex_up(&lov->lov_lock); + if (unlikely(!mds->mds_quota || obd->obd_stopping)) + RETURN(rc); + + mutex_down(&obd->obd_dev_sem); + if (mds->mds_lov_desc.ld_active_tgt_count != mds->mds_lov_objid_count) { + CWARN("Only %u/%u OSTs are active, abort quota recovery\n", + mds->mds_lov_desc.ld_active_tgt_count, + mds->mds_lov_objid_count); + mutex_up(&obd->obd_dev_sem); RETURN(rc); } - mutex_up(&lov->lov_lock); + mutex_up(&obd->obd_dev_sem); data.obd = obd; init_completion(&data.comp); @@ -1142,3 +1766,5 @@ int mds_quota_recovery(struct obd_device *obd) wait_for_completion(&data.comp); RETURN(rc); } + +#endif /* HAVE_QUOTA_SUPPORT */