X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_master.c;h=591a13078bd0d69f810ef2c096adb854fbc7f644;hb=fb7c149abff469705f7997d1b7e9f73573bd3442;hp=cc90f761a5e47f01ca7f15dde37880ff24641c61;hpb=cd40f5454d9fefcbdf0a2eb50dee78be5d19e72c;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index cc90f76..591a130 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -26,7 +26,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -64,9 +64,9 @@ #ifdef HAVE_QUOTA_SUPPORT -/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */ +/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem > lqs->lqs_lock */ static cfs_list_t lustre_dquot_hash[NR_DQHASH]; -static cfs_spinlock_t dquot_hash_lock = CFS_SPIN_LOCK_UNLOCKED; +static cfs_rwlock_t dquot_hash_lock = CFS_RW_LOCK_UNLOCKED; cfs_mem_cache_t *lustre_dquot_cachep; @@ -126,7 +126,6 @@ static struct lustre_dquot *find_dquot(int hashent, struct lustre_dquot *dquot; ENTRY; - LASSERT_SPIN_LOCKED(&dquot_hash_lock); cfs_list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) { if (dquot->dq_info == lqi && dquot->dq_id == id && dquot->dq_type == type) @@ -147,11 +146,10 @@ static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, CFS_INIT_LIST_HEAD(&dquot->dq_hash); cfs_init_mutex_locked(&dquot->dq_sem); - dquot->dq_refcnt = 1; + cfs_atomic_set(&dquot->dq_refcnt, 1); dquot->dq_info = lqi; dquot->dq_id = id; dquot->dq_type = type; - dquot->dq_status = DQ_STATUS_AVAIL; RETURN(dquot); } @@ -178,52 +176,72 @@ static void remove_dquot_nolock(struct lustre_dquot *dquot) static void lustre_dqput(struct lustre_dquot *dquot) { ENTRY; - cfs_spin_lock(&dquot_hash_lock); - LASSERT(dquot->dq_refcnt); - dquot->dq_refcnt--; - if (!dquot->dq_refcnt) { + cfs_write_lock(&dquot_hash_lock); + LASSERT(cfs_atomic_read(&dquot->dq_refcnt)); + cfs_atomic_dec(&dquot->dq_refcnt); + if (cfs_atomic_read(&dquot->dq_refcnt) == 0) { remove_dquot_nolock(dquot); free_dquot(dquot); } - cfs_spin_unlock(&dquot_hash_lock); + cfs_write_unlock(&dquot_hash_lock); EXIT; } static struct lustre_dquot *lustre_dqget(struct obd_device *obd, struct lustre_quota_info *lqi, - qid_t id, int type) + qid_t id, int type, int can_fake) { unsigned int hashent = dquot_hashfn(lqi, id, type); struct lustre_dquot *dquot, *empty; + int free_dq = 0; ENTRY; if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); - cfs_spin_lock(&dquot_hash_lock); + cfs_read_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { - dquot->dq_refcnt++; - cfs_spin_unlock(&dquot_hash_lock); - free_dquot(empty); + cfs_atomic_inc(&dquot->dq_refcnt); + cfs_read_unlock(&dquot_hash_lock); + free_dq = 1; } else { int rc; - dquot = empty; - insert_dquot_nolock(dquot); - cfs_spin_unlock(&dquot_hash_lock); + cfs_read_unlock(&dquot_hash_lock); + dquot = empty; rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT); cfs_up(&dquot->dq_sem); if (rc) { CERROR("can't read dquot from admin quotafile! " "(rc:%d)\n", rc); - lustre_dqput(dquot); + free_dquot(dquot); RETURN(ERR_PTR(rc)); + } else { + cfs_write_lock(&dquot_hash_lock); + if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { + cfs_atomic_inc(&dquot->dq_refcnt); + free_dq = 1; + } else { + dquot = empty; + insert_dquot_nolock(dquot); + } + cfs_write_unlock(&dquot_hash_lock); } } LASSERT(dquot); + if (!can_fake && cfs_test_bit(DQ_FAKE_B, &dquot->dq_flags)) { + DQUOT_DEBUG(dquot, "It is a fake dquot: unexpected!\n"); + lustre_dqput(dquot); + dquot = ERR_PTR(-ENOENT); + } + + if (free_dq) + free_dquot(empty); + + RETURN(dquot); } @@ -265,10 +283,12 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, ENTRY; LASSERT(mds); - dquot = lustre_dqget(obd, info, id, type); + cfs_down_read(&mds->mds_qonoff_sem); + dquot = lustre_dqget(obd, info, id, type, 0); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); + cfs_up_read(&mds->mds_qonoff_sem); OBD_ALLOC_PTR(oqaq); if (!oqaq) GOTO(out, rc = -ENOMEM); @@ -322,7 +342,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, /* only when block qunit is reduced, boardcast to osts */ if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq)) - rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt); + rc = obd_quota_adjust_qunit(mds->mds_lov_exp, oqaq, qctxt, NULL); out: lustre_dqput(dquot); @@ -352,21 +372,39 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ)) RETURN(-EIO); - dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); - if (IS_ERR(dquot)) - RETURN(PTR_ERR(dquot)); + if (!ll_sb_has_quota_active(qctxt->lqc_sb, + QDATA_IS_GRP(qdata) ? GRPQUOTA : USRQUOTA)) + RETURN(-EIO); + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + RETURN(rc); + + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); + LQS_DEBUG(lqs, "this lqs is under recovery\n"); + GOTO(skip, rc = -EBUSY); + } + cfs_spin_unlock(&lqs->lqs_lock); + + cfs_down_write(&mds->mds_qonoff_sem); + dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata), 0); + if (IS_ERR(dquot)) { + cfs_up_write(&mds->mds_qonoff_sem); + GOTO(skip, rc = PTR_ERR(dquot)); + } DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n"); QINFO_DEBUG(dquot->dq_info, "get dquot in dqadq_handler\n"); - cfs_down(&mds->mds_qonoff_sem); cfs_down(&dquot->dq_sem); - if (dquot->dq_status & DQ_STATUS_RECOVERY) { - DQUOT_DEBUG(dquot, "this dquot is under recovering.\n"); - GOTO(out, rc = -EBUSY); - } - if (QDATA_IS_BLK(qdata)) { grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; @@ -416,7 +454,7 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) case QUOTA_DQREL: /* The usage in administrative file might be incorrect before * recovery done */ - if (*usage - qdata->qd_count < 0) + if (*usage < qdata->qd_count) *usage = 0; else *usage -= qdata->qd_count; @@ -433,24 +471,16 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) EXIT; out: cfs_up(&dquot->dq_sem); - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); lustre_dqput(dquot); if (rc != -EDQUOT) dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata)); - lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), - qctxt, 0); - if (lqs == NULL || IS_ERR(lqs)) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : - qctxt->lqc_iunit_sz; - } else { - cfs_spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : - lqs->lqs_iunit_sz; - cfs_spin_unlock(&lqs->lqs_lock); - } + cfs_spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + cfs_spin_unlock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) QDATA_SET_ADJBLK(qdata); @@ -458,8 +488,8 @@ out: QDATA_SET_ADJINO(qdata); QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); - if (lqs) - lqs_putref(lqs); +skip: + lqs_putref(lqs); return rc; } @@ -597,7 +627,7 @@ int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl) cfs_down(&obt->obt_quotachecking); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); for (i = 0; i < MAXQUOTAS; i++) { struct file *fp; @@ -625,7 +655,7 @@ int mds_quota_invalidate(struct obd_device *obd, struct obd_quotactl *oqctl) filp_close(fp, 0); } - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); cfs_up(&obt->obt_quotachecking); RETURN(rc ? : rc1); @@ -645,16 +675,19 @@ int mds_quota_finvalidate(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(-EINVAL); cfs_down(&obt->obt_quotachecking); + if (obt->obt_qctxt.lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) + GOTO(out, rc = -EBUSY); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); oqctl->qc_cmd = Q_FINVALIDATE; rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); if (!rc) - rc = obd_quotactl(mds->mds_osc_exp, oqctl); + rc = obd_quotactl(mds->mds_lov_exp, oqctl); - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); +out: cfs_up(&obt->obt_quotachecking); RETURN(rc); } @@ -672,8 +705,7 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) LASSERT(qinfo->qi_version == LUSTRE_QUOTA_V2); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); for (i = 0; i < MAXQUOTAS && !rc; i++) { struct file *fp; @@ -704,7 +736,7 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) } qinfo->qi_files[i] = fp; rc = fsfilt_quotainfo(obd, qinfo, i, QFILE_CHK); - qinfo->qi_files[i] = 0; + qinfo->qi_files[i] = NULL; filp_close(fp, 0); } else @@ -742,8 +774,8 @@ int init_admin_quotafiles(struct obd_device *obd, struct obd_quotactl *oqctl) filp_close(fp, 0); qinfo->qi_files[i] = NULL; } - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); RETURN(rc); } @@ -783,19 +815,12 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) for (i = 0; i < MAXQUOTAS; i++) { struct file *fp; - if (!Q_TYPESET(oqctl, i)) + if (!Q_TYPESET(oqctl, i) || qinfo->qi_files[i] != NULL) continue; LASSERT(strlen(quotafile[i]) + sizeof(prefix) <= sizeof(name)); sprintf(name, "%s%s", prefix, quotafile[i]); - - if (qinfo->qi_files[i] != NULL) { - CWARN("quota[%d] is on already\n", i); - rc1 = -EALREADY; - continue; - } - fp = filp_open(name, O_RDWR, 0); if (IS_ERR(fp) || !S_ISREG(fp->f_dentry->d_inode->i_mode)) { rc = IS_ERR(fp) ? PTR_ERR(fp) : -EINVAL; @@ -824,26 +849,9 @@ int mds_admin_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(rc ? : rc1); } -int mds_admin_quota_off(struct obd_device *obd, - struct obd_quotactl *oqctl) -{ - struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo = &mds->mds_quota_info; - int rc; - ENTRY; - - /* close admin quota files */ - rc = close_quota_files(oqctl, qinfo); - RETURN(rc); -} - int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) { - struct mds_obd *mds = &obd->u.mds; - struct obd_device_target *obt = &obd->u.obt; - struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; - struct lvfs_run_ctxt saved; - int rc = 0, rc1 = 0, rc2 = 0; + int rc; ENTRY; if (oqctl->qc_type != USRQUOTA && @@ -851,57 +859,26 @@ int mds_quota_on(struct obd_device *obd, struct obd_quotactl *oqctl) oqctl->qc_type != UGQUOTA) RETURN(-EINVAL); - cfs_down(&obt->obt_quotachecking); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - cfs_down(&mds->mds_qonoff_sem); - rc2 = mds_admin_quota_on(obd, oqctl); - if (rc2 && rc2 != -EALREADY) { - CWARN("mds quota[%d] is failed to be on for %d\n", oqctl->qc_type, rc2); - GOTO(out, rc2); - } + rc = generic_quota_on(obd, oqctl, 1); - rc1 = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); - if (!rc1) { - qctxt->lqc_flags |= UGQUOTA2LQC(oqctl->qc_type); - /* when quotaon, create lqs for every quota uid/gid b=18574 */ - build_lqs(obd); - } else if (rc1 == -EBUSY && quota_is_on(qctxt, oqctl)) { - CWARN("mds local quota[%d] is on already\n", oqctl->qc_type); - rc1 = -EALREADY; - } else { - if (rc2 != -EALREADY) { - CWARN("mds local quota[%d] is failed to be on for %d\n", - oqctl->qc_type, rc1); - oqctl->qc_cmd = Q_QUOTAOFF; - mds_admin_quota_off(obd, oqctl); - oqctl->qc_cmd = Q_QUOTAON; - } - GOTO(out, rc1); - } + RETURN(rc); +} - rc = obd_quotactl(mds->mds_osc_exp, oqctl); - if (rc && rc != -EALREADY) { - CWARN("mds remote quota[%d] is failed to be on for %d\n", - oqctl->qc_type, rc); - oqctl->qc_cmd = Q_QUOTAOFF; - if (rc2 != -EALREADY) - mds_admin_quota_off(obd, oqctl); - if (rc1 != -EALREADY) { - fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); - qctxt->lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type); - } - oqctl->qc_cmd = Q_QUOTAON; - } - EXIT; +int mds_admin_quota_off(struct obd_device *obd, + struct obd_quotactl *oqctl) +{ + struct mds_obd *mds = &obd->u.mds; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + int rc; + ENTRY; -out: - cfs_up(&mds->mds_qonoff_sem); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - cfs_up(&obt->obt_quotachecking); - return rc ? : (rc1 ? : rc2); + /* close admin quota files */ + rc = close_quota_files(oqctl, qinfo); + RETURN(rc); } + /* with obt->obt_quotachecking held */ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) { @@ -920,7 +897,7 @@ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) RETURN(-EINVAL); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); /* close admin quota files */ rc2 = mds_admin_quota_off(obd, oqctl); if (rc2 && rc2 != -EALREADY) { @@ -945,7 +922,7 @@ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out, rc1); } - rc = obd_quotactl(mds->mds_osc_exp, oqctl); + rc = obd_quotactl(mds->mds_lov_exp, oqctl); if (rc && rc != -EALREADY) { CWARN("mds remote quota[%d] is failed to be off for %d\n", oqctl->qc_type, rc); @@ -961,7 +938,9 @@ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) EXIT; out: - cfs_up(&mds->mds_qonoff_sem); + CDEBUG(D_QUOTA, "%s: quotaoff type:flags:rc %u:%lu:%d\n", + obd->obd_name, oqctl->qc_type, qctxt->lqc_flags, rc); + cfs_up_write(&mds->mds_qonoff_sem); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); return rc ? : (rc1 ? : rc2); } @@ -990,7 +969,7 @@ int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) oqctl->qc_type != GRPQUOTA) RETURN(-EINVAL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { CWARN("quota[%u] is off\n", oqctl->qc_type); GOTO(out, rc = -ESRCH); @@ -1004,7 +983,7 @@ int mds_set_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) EXIT; out: - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); return rc; } @@ -1020,7 +999,7 @@ int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) oqctl->qc_type != GRPQUOTA) RETURN(-EINVAL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_read(&mds->mds_qonoff_sem); if (qinfo->qi_files[oqctl->qc_type] == NULL) { CWARN("quota[%u] is off\n", oqctl->qc_type); GOTO(out, rc = -ESRCH); @@ -1032,7 +1011,7 @@ int mds_get_dqinfo(struct obd_device *obd, struct obd_quotactl *oqctl) EXIT; out: - cfs_up(&mds->mds_qonoff_sem); + cfs_up_read(&mds->mds_qonoff_sem); return rc; } @@ -1075,6 +1054,8 @@ int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt, oqaq->qaq_bunit_sz = QUSG(oqaq->qaq_bunit_sz * cqs_factor, 1) << QUOTABLOCK_BITS; + if (oqaq->qaq_bunit_sz >= qctxt->lqc_bunit_sz) + break; b_limitation = oqaq->qaq_bunit_sz * (ost_num + 1) * shrink_qunit_limit; } @@ -1109,6 +1090,8 @@ int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt, while (ilimit > dquot->dq_dqb.dqb_curinodes + 2 * i_limitation) { oqaq->qaq_iunit_sz = oqaq->qaq_iunit_sz * cqs_factor; + if (oqaq->qaq_iunit_sz >= qctxt->lqc_iunit_sz) + break; i_limitation = oqaq->qaq_iunit_sz * mdt_num * shrink_qunit_limit; } @@ -1273,7 +1256,7 @@ static int mds_init_slave_blimits(struct obd_device *obd, id[GRPQUOTA] = oqctl->qc_id; /* initialize all slave's limit */ - rc = obd_quotactl(mds->mds_osc_exp, ioqc); + rc = obd_quotactl(mds->mds_lov_exp, ioqc); rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, id, 1, 0, NULL); if (rc == -EDQUOT || rc == -EBUSY) { @@ -1304,7 +1287,7 @@ static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq) /* adjust remote lqs */ if (QAQ_IS_ADJBLK(qaq)) { - rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt); + rc = obd_quota_adjust_qunit(obd->u.mds.mds_lov_exp, qaq, qctxt, NULL); if (rc < 0) CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc); @@ -1315,7 +1298,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) { struct mds_obd *mds = &obd->u.mds; struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; - struct obd_device *lov_obd = class_exp2obd(mds->mds_osc_exp); + struct obd_device *lov_obd = class_exp2obd(mds->mds_lov_exp); struct lov_obd *lov = &lov_obd->u.lov; struct quota_adjust_qunit *oqaq = NULL; struct lustre_quota_info *qinfo = &mds->mds_quota_info; @@ -1326,7 +1309,8 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) /* orig_set means if quota was set before; now_set means we are * setting/cancelling quota */ int orig_set, now_set; - int rc, rc2 = 0, flag = 0; + struct lustre_qunit_size *lqs; + int rc = 0, rc2 = 0, flag = 0; ENTRY; if (oqctl->qc_type != USRQUOTA && @@ -1336,7 +1320,8 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) OBD_ALLOC_PTR(oqaq); if (!oqaq) RETURN(-ENOMEM); - cfs_down(&mds->mds_qonoff_sem); + + cfs_down_write(&mds->mds_qonoff_sem); init_oqaq(oqaq, qctxt, oqctl->qc_id, oqctl->qc_type); if (qinfo->qi_files[oqctl->qc_type] == NULL) { @@ -1344,20 +1329,29 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out_sem, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out_sem, rc = PTR_ERR(dquot)); DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n"); QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n"); - cfs_down(&dquot->dq_sem); + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(out, rc); - if (dquot->dq_status) { + cfs_down(&dquot->dq_sem); + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); - GOTO(out_sem, rc = -EBUSY); + GOTO(skip, rc = -EBUSY); } - dquot->dq_status |= DQ_STATUS_SET; + LQS_SET_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); ihardlimit = dquot->dq_dqb.dqb_ihardlimit; isoftlimit = dquot->dq_dqb.dqb_isoftlimit; @@ -1422,11 +1416,10 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) if (rc) { CERROR("set limit failed! (rc:%d)\n", rc); - goto out; + GOTO(update_fail, rc); } - cfs_up(&mds->mds_qonoff_sem); - + cfs_up_write(&mds->mds_qonoff_sem); adjust_lqs(obd, oqaq); orig_set = ihardlimit || isoftlimit; @@ -1456,7 +1449,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) } revoke_out: - cfs_down(&mds->mds_qonoff_sem); + cfs_down_write(&mds->mds_qonoff_sem); cfs_down(&dquot->dq_sem); if (rc) { /* cancel previous setting */ @@ -1469,15 +1462,17 @@ revoke_out: } rc2 = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); cfs_up(&dquot->dq_sem); - +update_fail: + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip: + lqs_putref(lqs); out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_SET; - cfs_up(&dquot->dq_sem); lustre_dqput(dquot); EXIT; out_sem: - cfs_up(&mds->mds_qonoff_sem); + cfs_up_write(&mds->mds_qonoff_sem); if (oqaq) OBD_FREE_PTR(oqaq); @@ -1502,7 +1497,7 @@ static int mds_get_space(struct obd_device *obd, struct obd_quotactl *oqctl) /* get block usage from OSS */ soqc->qc_dqblk.dqb_curspace = 0; - rc = obd_quotactl(obd->u.mds.mds_osc_exp, soqc); + rc = obd_quotactl(obd->u.mds.mds_lov_exp, soqc); if (!rc || rc == -EREMOTEIO) { oqctl->qc_dqblk.dqb_curspace = soqc->qc_dqblk.dqb_curspace; oqctl->qc_dqblk.dqb_valid |= QIF_SPACE; @@ -1538,14 +1533,14 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) oqctl->qc_type != GRPQUOTA) RETURN(-EINVAL); - cfs_down(&mds->mds_qonoff_sem); + cfs_down_read(&mds->mds_qonoff_sem); dqblk->dqb_valid = 0; if (qinfo->qi_files[oqctl->qc_type] == NULL) { CWARN("quota[%u] is off\n", oqctl->qc_type); GOTO(out, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out, rc = PTR_ERR(dquot)); @@ -1560,16 +1555,17 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) cfs_up(&dquot->dq_sem); lustre_dqput(dquot); - cfs_up(&mds->mds_qonoff_sem); + cfs_up_read(&mds->mds_qonoff_sem); /* the usages in admin quota file is inaccurate */ dqblk->dqb_curinodes = 0; dqblk->dqb_curspace = 0; rc = mds_get_space(obd, oqctl); - EXIT; - return rc; + + RETURN(rc); + out: - cfs_up(&mds->mds_qonoff_sem); + cfs_up_read(&mds->mds_qonoff_sem); return rc; } @@ -1593,40 +1589,59 @@ static int dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) { struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo= &mds->mds_quota_info; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + struct lustre_qunit_size *lqs; struct lustre_dquot *dquot; struct obd_quotactl *qctl; __u64 total_limits = 0; - int rc; + int rc = 0; ENTRY; OBD_ALLOC_PTR(qctl); if (qctl == NULL) RETURN(-ENOMEM); - dquot = lustre_dqget(obd, qinfo, id, type); + dquot = lustre_dqget(obd, qinfo, id, type, 0); if (IS_ERR(dquot)) { CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot)); OBD_FREE_PTR(qctl); RETURN(PTR_ERR(dquot)); } + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(skip, rc); + cfs_down(&dquot->dq_sem); - /* don't recovery the dquot without limits or under setting */ + /* don't recover the dquot without limits or quota is setting or + * another recovery is already going on */ if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) || - dquot->dq_status) - GOTO(skip, rc = 0); - dquot->dq_status |= DQ_STATUS_RECOVERY; + LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_up(&dquot->dq_sem); + GOTO(skip1, rc = 0); + } + cfs_spin_lock(&lqs->lqs_lock); + LQS_SET_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); + /* release mds_qonoff_sem during obd_quotactl ops here */ + cfs_up_write(&mds->mds_qonoff_sem); + /* get real bhardlimit from all slaves. */ qctl->qc_cmd = Q_GETOQUOTA; qctl->qc_type = type; qctl->qc_id = id; qctl->qc_stat = QUOTA_RECOVERING; - rc = obd_quotactl(mds->mds_osc_exp, qctl); + rc = obd_quotactl(mds->mds_lov_exp, qctl); + cfs_down_write(&mds->mds_qonoff_sem); if (rc) GOTO(out, rc); total_limits = qctl->qc_dqblk.dqb_bhardlimit; @@ -1638,7 +1653,6 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) total_limits += qctl->qc_dqblk.dqb_bhardlimit; /* amend the usage of the administrative quotafile */ - cfs_down(&mds->mds_qonoff_sem); cfs_down(&dquot->dq_sem); dquot->dq_dqb.dqb_curspace = total_limits << QUOTABLOCK_BITS; @@ -1648,14 +1662,14 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) CERROR("write dquot failed! (rc:%d)\n", rc); cfs_up(&dquot->dq_sem); - cfs_up(&mds->mds_qonoff_sem); EXIT; out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_RECOVERY; + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip1: + lqs_putref(lqs); skip: - cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); OBD_FREE_PTR(qctl); return rc; @@ -1681,24 +1695,21 @@ static int qmaster_recovery_main(void *arg) /* for mds */ class_incref(obd, "qmaster_recovd_mds", obd); /* for lov */ - class_incref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); + class_incref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd); cfs_complete(&data->comp); + cfs_down_write(&mds->mds_qonoff_sem); for (type = USRQUOTA; type < MAXQUOTAS; type++) { cfs_list_t id_list; struct dquot_id *dqid, *tmp; - cfs_down(&mds->mds_qonoff_sem); - if (qinfo->qi_files[type] == NULL) { - cfs_up(&mds->mds_qonoff_sem); + if (qinfo->qi_files[type] == NULL) continue; - } + CFS_INIT_LIST_HEAD(&id_list); rc = fsfilt_qids(obd, qinfo->qi_files[type], NULL, type, &id_list); - cfs_up(&mds->mds_qonoff_sem); - if (rc) CERROR("error get ids from admin quotafile.(%d)\n", rc); @@ -1716,7 +1727,8 @@ free: OBD_FREE_PTR(dqid); } } - class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd); + cfs_up_write(&mds->mds_qonoff_sem); + class_decref(mds->mds_lov_obd, "qmaster_recovd_lov", mds->mds_lov_obd); class_decref(obd, "qmaster_recovd_mds", obd); RETURN(rc); } @@ -1728,6 +1740,9 @@ int mds_quota_recovery(struct obd_device *obd) int rc = 0; ENTRY; + if (!ll_sb_any_quota_active(obd->u.obt.obt_qctxt.lqc_sb)) + RETURN(0); + if (unlikely(!mds->mds_quota || obd->obd_stopping)) RETURN(rc); @@ -1744,8 +1759,8 @@ int mds_quota_recovery(struct obd_device *obd) data.obd = obd; cfs_init_completion(&data.comp); - rc = cfs_kernel_thread(qmaster_recovery_main, &data, - CLONE_VM|CLONE_FILES); + rc = cfs_create_thread(qmaster_recovery_main, &data, + CFS_DAEMON_FLAGS); if (rc < 0) CERROR("%s: cannot start quota recovery thread: rc %d\n", obd->obd_name, rc);