X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_master.c;h=f675772688f4778e1cf76de023fff5b0e3f8eb3b;hb=408c61bd36c1cf41a432e152de9891bcd0c3e49f;hp=bd6e966525b8f01fb3fa146387fc2176b60a9153;hpb=eb7c28ff977f4e0a280558aa74e23f2a9ab0ea0c;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index bd6e966..f675772 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -64,9 +64,9 @@ #ifdef HAVE_QUOTA_SUPPORT -/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */ +/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem > lqs->lqs_lock */ static cfs_list_t lustre_dquot_hash[NR_DQHASH]; -static cfs_spinlock_t dquot_hash_lock = CFS_SPIN_LOCK_UNLOCKED; +static cfs_rwlock_t dquot_hash_lock = CFS_RW_LOCK_UNLOCKED; cfs_mem_cache_t *lustre_dquot_cachep; @@ -126,7 +126,6 @@ static struct lustre_dquot *find_dquot(int hashent, struct lustre_dquot *dquot; ENTRY; - LASSERT_SPIN_LOCKED(&dquot_hash_lock); cfs_list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) { if (dquot->dq_info == lqi && dquot->dq_id == id && dquot->dq_type == type) @@ -147,11 +146,10 @@ static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, CFS_INIT_LIST_HEAD(&dquot->dq_hash); cfs_init_mutex_locked(&dquot->dq_sem); - dquot->dq_refcnt = 1; + cfs_atomic_set(&dquot->dq_refcnt, 1); dquot->dq_info = lqi; dquot->dq_id = id; dquot->dq_type = type; - dquot->dq_status = DQ_STATUS_AVAIL; RETURN(dquot); } @@ -178,52 +176,72 @@ static void remove_dquot_nolock(struct lustre_dquot *dquot) static void lustre_dqput(struct lustre_dquot *dquot) { ENTRY; - cfs_spin_lock(&dquot_hash_lock); - LASSERT(dquot->dq_refcnt); - dquot->dq_refcnt--; - if (!dquot->dq_refcnt) { + cfs_write_lock(&dquot_hash_lock); + LASSERT(cfs_atomic_read(&dquot->dq_refcnt)); + cfs_atomic_dec(&dquot->dq_refcnt); + if (cfs_atomic_read(&dquot->dq_refcnt) == 0) { remove_dquot_nolock(dquot); free_dquot(dquot); } - cfs_spin_unlock(&dquot_hash_lock); + cfs_write_unlock(&dquot_hash_lock); EXIT; } static struct lustre_dquot *lustre_dqget(struct obd_device *obd, struct lustre_quota_info *lqi, - qid_t id, int type) + qid_t id, int type, int can_fake) { unsigned int hashent = dquot_hashfn(lqi, id, type); struct lustre_dquot *dquot, *empty; + int free_dq = 0; ENTRY; if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); - cfs_spin_lock(&dquot_hash_lock); + cfs_read_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { - dquot->dq_refcnt++; - cfs_spin_unlock(&dquot_hash_lock); - free_dquot(empty); + cfs_atomic_inc(&dquot->dq_refcnt); + cfs_read_unlock(&dquot_hash_lock); + free_dq = 1; } else { int rc; - dquot = empty; - insert_dquot_nolock(dquot); - cfs_spin_unlock(&dquot_hash_lock); + cfs_read_unlock(&dquot_hash_lock); + dquot = empty; rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT); cfs_up(&dquot->dq_sem); if (rc) { CERROR("can't read dquot from admin quotafile! " "(rc:%d)\n", rc); - lustre_dqput(dquot); + free_dquot(dquot); RETURN(ERR_PTR(rc)); + } else { + cfs_write_lock(&dquot_hash_lock); + if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { + cfs_atomic_inc(&dquot->dq_refcnt); + free_dq = 1; + } else { + dquot = empty; + insert_dquot_nolock(dquot); + } + cfs_write_unlock(&dquot_hash_lock); } } LASSERT(dquot); + if (!can_fake && cfs_test_bit(DQ_FAKE_B, &dquot->dq_flags)) { + DQUOT_DEBUG(dquot, "It is a fake dquot: unexpected!\n"); + lustre_dqput(dquot); + dquot = ERR_PTR(-ENOENT); + } + + if (free_dq) + free_dquot(empty); + + RETURN(dquot); } @@ -266,7 +284,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, LASSERT(mds); cfs_down_read(&mds->mds_qonoff_sem); - dquot = lustre_dqget(obd, info, id, type); + dquot = lustre_dqget(obd, info, id, type, 0); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); @@ -324,7 +342,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, /* only when block qunit is reduced, boardcast to osts */ if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq)) - rc = obd_quota_adjust_qunit(mds->mds_lov_exp, oqaq, qctxt); + rc = obd_quota_adjust_qunit(mds->mds_lov_exp, oqaq, qctxt, NULL); out: lustre_dqput(dquot); @@ -354,11 +372,32 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ)) RETURN(-EIO); + if (!ll_sb_has_quota_active(qctxt->lqc_sb, + QDATA_IS_GRP(qdata) ? GRPQUOTA : USRQUOTA)) + RETURN(-EIO); + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + RETURN(rc); + + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); + LQS_DEBUG(lqs, "this lqs is under recovery\n"); + GOTO(skip, rc = -EBUSY); + } + cfs_spin_unlock(&lqs->lqs_lock); + cfs_down_write(&mds->mds_qonoff_sem); - dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); + dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata), 0); if (IS_ERR(dquot)) { cfs_up_write(&mds->mds_qonoff_sem); - RETURN(PTR_ERR(dquot)); + GOTO(skip, rc = PTR_ERR(dquot)); } DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n"); @@ -366,11 +405,6 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) cfs_down(&dquot->dq_sem); - if (dquot->dq_status & DQ_STATUS_RECOVERY) { - DQUOT_DEBUG(dquot, "this dquot is under recovering.\n"); - GOTO(out, rc = -EBUSY); - } - if (QDATA_IS_BLK(qdata)) { grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; @@ -442,18 +476,11 @@ out: if (rc != -EDQUOT) dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata)); - lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), - qctxt, 0); - if (lqs == NULL || IS_ERR(lqs)) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : - qctxt->lqc_iunit_sz; - } else { - cfs_spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : - lqs->lqs_iunit_sz; - cfs_spin_unlock(&lqs->lqs_lock); - } + + cfs_spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + cfs_spin_unlock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) QDATA_SET_ADJBLK(qdata); @@ -461,8 +488,8 @@ out: QDATA_SET_ADJINO(qdata); QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); - if (lqs) - lqs_putref(lqs); +skip: + lqs_putref(lqs); return rc; } @@ -911,6 +938,8 @@ int do_mds_quota_off(struct obd_device *obd, struct obd_quotactl *oqctl) EXIT; out: + CDEBUG(D_QUOTA, "%s: quotaoff type:flags:rc %u:%lu:%d\n", + obd->obd_name, oqctl->qc_type, qctxt->lqc_flags, rc); cfs_up_write(&mds->mds_qonoff_sem); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); return rc ? : (rc1 ? : rc2); @@ -1254,7 +1283,7 @@ static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq) /* adjust remote lqs */ if (QAQ_IS_ADJBLK(qaq)) { - rc = obd_quota_adjust_qunit(obd->u.mds.mds_lov_exp, qaq, qctxt); + rc = obd_quota_adjust_qunit(obd->u.mds.mds_lov_exp, qaq, qctxt, NULL); if (rc < 0) CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc); @@ -1276,7 +1305,8 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) /* orig_set means if quota was set before; now_set means we are * setting/cancelling quota */ int orig_set, now_set; - int rc, rc2 = 0, flag = 0; + struct lustre_qunit_size *lqs; + int rc = 0, rc2 = 0, flag = 0; ENTRY; if (oqctl->qc_type != USRQUOTA && @@ -1295,20 +1325,29 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out_sem, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out_sem, rc = PTR_ERR(dquot)); DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n"); QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n"); - cfs_down(&dquot->dq_sem); + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(out, rc); - if (dquot->dq_status) { + cfs_down(&dquot->dq_sem); + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); - GOTO(out_sem, rc = -EBUSY); + GOTO(skip, rc = -EBUSY); } - dquot->dq_status |= DQ_STATUS_SET; + LQS_SET_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); ihardlimit = dquot->dq_dqb.dqb_ihardlimit; isoftlimit = dquot->dq_dqb.dqb_isoftlimit; @@ -1373,7 +1412,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) if (rc) { CERROR("set limit failed! (rc:%d)\n", rc); - goto out; + GOTO(update_fail, rc); } cfs_up_write(&mds->mds_qonoff_sem); @@ -1419,11 +1458,13 @@ revoke_out: } rc2 = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); cfs_up(&dquot->dq_sem); - +update_fail: + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip: + lqs_putref(lqs); out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_SET; - cfs_up(&dquot->dq_sem); lustre_dqput(dquot); EXIT; out_sem: @@ -1495,7 +1536,7 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out, rc = PTR_ERR(dquot)); @@ -1516,6 +1557,18 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) dqblk->dqb_curinodes = 0; dqblk->dqb_curspace = 0; rc = mds_get_space(obd, oqctl); + + /* + * Querying of curinodes and/or curspace may have failed, administrative + * quota data are likely to be better approximation to the real usage in + * this case. + */ + if (!(dqblk->dqb_valid & QIF_INODES) && dquot->dq_dqb.dqb_curinodes > 0) + dqblk->dqb_curinodes = dquot->dq_dqb.dqb_curinodes; + + if (!(dqblk->dqb_valid & QIF_SPACE) && dquot->dq_dqb.dqb_curspace > 0) + dqblk->dqb_curspace = dquot->dq_dqb.dqb_curspace; + RETURN(rc); out: @@ -1543,32 +1596,47 @@ static int dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) { struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo= &mds->mds_quota_info; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + struct lustre_qunit_size *lqs; struct lustre_dquot *dquot; struct obd_quotactl *qctl; __u64 total_limits = 0; - int rc; + int rc = 0; ENTRY; OBD_ALLOC_PTR(qctl); if (qctl == NULL) RETURN(-ENOMEM); - dquot = lustre_dqget(obd, qinfo, id, type); + dquot = lustre_dqget(obd, qinfo, id, type, 0); if (IS_ERR(dquot)) { CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot)); OBD_FREE_PTR(qctl); RETURN(PTR_ERR(dquot)); } + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(skip, rc); + cfs_down(&dquot->dq_sem); - /* don't recovery the dquot without limits or under setting */ + /* don't recover the dquot without limits or quota is setting or + * another recovery is already going on */ if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) || - dquot->dq_status) - GOTO(skip, rc = 0); - dquot->dq_status |= DQ_STATUS_RECOVERY; + LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_up(&dquot->dq_sem); + GOTO(skip1, rc = 0); + } + cfs_spin_lock(&lqs->lqs_lock); + LQS_SET_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); /* release mds_qonoff_sem during obd_quotactl ops here */ @@ -1603,11 +1671,12 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) cfs_up(&dquot->dq_sem); EXIT; out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_RECOVERY; + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip1: + lqs_putref(lqs); skip: - cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); OBD_FREE_PTR(qctl); return rc; @@ -1678,6 +1747,9 @@ int mds_quota_recovery(struct obd_device *obd) int rc = 0; ENTRY; + if (!ll_sb_any_quota_active(obd->u.obt.obt_qctxt.lqc_sb)) + RETURN(0); + if (unlikely(!mds->mds_quota || obd->obd_stopping)) RETURN(rc);