From: Landen Date: Mon, 13 Sep 2010 10:52:24 +0000 (+0400) Subject: b=23541 move DQ_STATUS_SET and DQ_STATUS_RECOVERY to X-Git-Tag: 2.0.53.0~60 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=aa9f441514640d7253ae59e35b0b0804c9ce792e;p=fs%2Flustre-release.git b=23541 move DQ_STATUS_SET and DQ_STATUS_RECOVERY to i=johann i=andrew.perepechko --- diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index f56d210..0ce99a9 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1679,6 +1679,8 @@ extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q); #define LQUOTA_FLAGS_ADJINO 8UL /* adjust the inode qunit size */ #define LQUOTA_FLAGS_CHG_QS 16UL /* indicate whether it has capability of * OBD_CONNECT_CHANGE_QS */ +#define LQUOTA_FLAGS_RECOVERY 32UL /* recovery is going on a uid/gid */ +#define LQUOTA_FLAGS_SETQUOTA 64UL /* being setquota on a uid/gid */ /* flags is specific for quota_adjust_qunit */ #define LQUOTA_QAQ_CREATE_LQS (1 << 31) /* when it is set, need create lqs */ diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 6da7ce8..1e12858 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -162,10 +162,6 @@ struct lustre_quota_info { lustre_quota_version_t qi_version; }; -#define DQ_STATUS_AVAIL 0x0 /* Available dquot */ -#define DQ_STATUS_SET 0x01 /* Sombody is setting dquot */ -#define DQ_STATUS_RECOVERY 0x02 /* dquot is in recovery */ - struct lustre_mem_dqblk { __u64 dqb_bhardlimit; /**< absolute limit on disk blks alloc */ __u64 dqb_bsoftlimit; /**< preferred limit on disk blks */ @@ -183,7 +179,7 @@ struct lustre_dquot { /** Protect the data in lustre_dquot */ cfs_semaphore_t dq_sem; /** Use count */ - int dq_refcnt; + cfs_atomic_t dq_refcnt; /** Pointer of quota info it belongs to */ struct lustre_quota_info *dq_info; /** Offset of dquot on disk */ @@ -192,8 +188,6 @@ struct lustre_dquot { unsigned int dq_id; /** Type fo quota (USRQUOTA, GRPQUOUTA) */ int dq_type; - /** See DQ_STATUS_ */ - unsigned short dq_status; /** See DQ_ in quota.h */ unsigned long dq_flags; /** Diskquota usage */ @@ -381,13 +375,20 @@ struct lustre_qunit_size { struct lustre_quota_ctxt *lqs_ctxt; /** quota ctxt */ }; -#define LQS_IS_GRP(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_GRP) -#define LQS_IS_ADJBLK(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_ADJBLK) -#define LQS_IS_ADJINO(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_ADJINO) +#define LQS_IS_GRP(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_GRP) +#define LQS_IS_ADJBLK(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_ADJBLK) +#define LQS_IS_ADJINO(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_ADJINO) +#define LQS_IS_RECOVERY(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_RECOVERY) +#define LQS_IS_SETQUOTA(lqs) ((lqs)->lqs_flags & LQUOTA_FLAGS_SETQUOTA) + +#define LQS_SET_GRP(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_GRP) +#define LQS_SET_ADJBLK(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJBLK) +#define LQS_SET_ADJINO(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJINO) +#define LQS_SET_RECOVERY(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_RECOVERY) +#define LQS_SET_SETQUOTA(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_SETQUOTA) -#define LQS_SET_GRP(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_GRP) -#define LQS_SET_ADJBLK(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJBLK) -#define LQS_SET_ADJINO(lqs) ((lqs)->lqs_flags |= LQUOTA_FLAGS_ADJINO) +#define LQS_CLEAR_RECOVERY(lqs) ((lqs)->lqs_flags &= ~LQUOTA_FLAGS_RECOVERY) +#define LQS_CLEAR_SETQUOTA(lqs) ((lqs)->lqs_flags &= ~LQUOTA_FLAGS_SETQUOTA) /* In the hash for lustre_qunit_size, the key is decided by * grp_or_usr and uid/gid, in here, I combine these two values, diff --git a/lustre/quota/quota_internal.h b/lustre/quota/quota_internal.h index 042bbe2..3cf35d0 100644 --- a/lustre/quota/quota_internal.h +++ b/lustre/quota/quota_internal.h @@ -54,7 +54,7 @@ #define DQUOT_DEBUG(dquot, fmt, arg...) \ CDEBUG(D_QUOTA, "refcnt(%u) id(%u) type(%u) off(%llu) flags(%lu) " \ "bhardlimit("LPU64") curspace("LPU64") ihardlimit("LPU64") " \ - "curinodes("LPU64"): " fmt, dquot->dq_refcnt, \ + "curinodes("LPU64"): " fmt, cfs_atomic_read(&dquot->dq_refcnt),\ dquot->dq_id, dquot->dq_type, dquot->dq_off, dquot->dq_flags, \ dquot->dq_dqb.dqb_bhardlimit, dquot->dq_dqb.dqb_curspace, \ dquot->dq_dqb.dqb_ihardlimit, dquot->dq_dqb.dqb_curinodes, \ diff --git a/lustre/quota/quota_master.c b/lustre/quota/quota_master.c index 0f66753..d8d14d7 100644 --- a/lustre/quota/quota_master.c +++ b/lustre/quota/quota_master.c @@ -64,9 +64,9 @@ #ifdef HAVE_QUOTA_SUPPORT -/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */ +/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem > lqs->lqs_lock */ static cfs_list_t lustre_dquot_hash[NR_DQHASH]; -static cfs_spinlock_t dquot_hash_lock = CFS_SPIN_LOCK_UNLOCKED; +static cfs_rwlock_t dquot_hash_lock = CFS_RW_LOCK_UNLOCKED; cfs_mem_cache_t *lustre_dquot_cachep; @@ -126,7 +126,6 @@ static struct lustre_dquot *find_dquot(int hashent, struct lustre_dquot *dquot; ENTRY; - LASSERT_SPIN_LOCKED(&dquot_hash_lock); cfs_list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) { if (dquot->dq_info == lqi && dquot->dq_id == id && dquot->dq_type == type) @@ -147,11 +146,10 @@ static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, CFS_INIT_LIST_HEAD(&dquot->dq_hash); cfs_init_mutex_locked(&dquot->dq_sem); - dquot->dq_refcnt = 1; + cfs_atomic_set(&dquot->dq_refcnt, 1); dquot->dq_info = lqi; dquot->dq_id = id; dquot->dq_type = type; - dquot->dq_status = DQ_STATUS_AVAIL; RETURN(dquot); } @@ -178,40 +176,40 @@ static void remove_dquot_nolock(struct lustre_dquot *dquot) static void lustre_dqput(struct lustre_dquot *dquot) { ENTRY; - cfs_spin_lock(&dquot_hash_lock); - LASSERT(dquot->dq_refcnt); - dquot->dq_refcnt--; - if (!dquot->dq_refcnt) { + cfs_write_lock(&dquot_hash_lock); + LASSERT(cfs_atomic_read(&dquot->dq_refcnt)); + cfs_atomic_dec(&dquot->dq_refcnt); + if (cfs_atomic_read(&dquot->dq_refcnt) == 0) { remove_dquot_nolock(dquot); free_dquot(dquot); } - cfs_spin_unlock(&dquot_hash_lock); + cfs_write_unlock(&dquot_hash_lock); EXIT; } static struct lustre_dquot *lustre_dqget(struct obd_device *obd, struct lustre_quota_info *lqi, - qid_t id, int type) + qid_t id, int type, int can_fake) { unsigned int hashent = dquot_hashfn(lqi, id, type); struct lustre_dquot *dquot, *empty; + int free_dq = 0; ENTRY; if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); - cfs_spin_lock(&dquot_hash_lock); + cfs_read_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { - dquot->dq_refcnt++; - cfs_spin_unlock(&dquot_hash_lock); - free_dquot(empty); + cfs_atomic_inc(&dquot->dq_refcnt); + cfs_read_unlock(&dquot_hash_lock); + free_dq = 1; } else { int rc; - dquot = empty; - insert_dquot_nolock(dquot); - cfs_spin_unlock(&dquot_hash_lock); + cfs_read_unlock(&dquot_hash_lock); + dquot = empty; rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT); cfs_up(&dquot->dq_sem); if (rc) { @@ -219,11 +217,31 @@ static struct lustre_dquot *lustre_dqget(struct obd_device *obd, "(rc:%d)\n", rc); lustre_dqput(dquot); RETURN(ERR_PTR(rc)); + } else { + cfs_write_lock(&dquot_hash_lock); + if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { + cfs_atomic_inc(&dquot->dq_refcnt); + free_dq = 1; + } else { + dquot = empty; + insert_dquot_nolock(dquot); + } + cfs_write_unlock(&dquot_hash_lock); } } LASSERT(dquot); + if (!can_fake && cfs_test_bit(DQ_FAKE_B, &dquot->dq_flags)) { + DQUOT_DEBUG(dquot, "It is a fake dquot: unexpected!\n"); + lustre_dqput(dquot); + dquot = ERR_PTR(-ENOENT); + } + + if (free_dq) + free_dquot(empty); + + RETURN(dquot); } @@ -266,7 +284,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, LASSERT(mds); cfs_down_read(&mds->mds_qonoff_sem); - dquot = lustre_dqget(obd, info, id, type); + dquot = lustre_dqget(obd, info, id, type, 0); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); @@ -354,11 +372,28 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) if (OBD_FAIL_CHECK(OBD_FAIL_OBD_DQACQ)) RETURN(-EIO); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + RETURN(rc); + + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); + LQS_DEBUG(lqs, "this lqs is under recovery\n"); + GOTO(skip, rc = -EBUSY); + } + cfs_spin_unlock(&lqs->lqs_lock); + cfs_down_write(&mds->mds_qonoff_sem); - dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); + dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata), 0); if (IS_ERR(dquot)) { cfs_up_write(&mds->mds_qonoff_sem); - RETURN(PTR_ERR(dquot)); + GOTO(skip, rc = PTR_ERR(dquot)); } DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n"); @@ -366,11 +401,6 @@ int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc) cfs_down(&dquot->dq_sem); - if (dquot->dq_status & DQ_STATUS_RECOVERY) { - DQUOT_DEBUG(dquot, "this dquot is under recovering.\n"); - GOTO(out, rc = -EBUSY); - } - if (QDATA_IS_BLK(qdata)) { grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; @@ -442,18 +472,11 @@ out: if (rc != -EDQUOT) dqacq_adjust_qunit_sz(obd, qdata->qd_id, QDATA_IS_GRP(qdata), QDATA_IS_BLK(qdata)); - lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), - qctxt, 0); - if (lqs == NULL || IS_ERR(lqs)) { - CDEBUG(D_INFO, "Can't find the lustre qunit size!\n"); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? qctxt->lqc_bunit_sz : - qctxt->lqc_iunit_sz; - } else { - cfs_spin_lock(&lqs->lqs_lock); - qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : - lqs->lqs_iunit_sz; - cfs_spin_unlock(&lqs->lqs_lock); - } + + cfs_spin_lock(&lqs->lqs_lock); + qdata->qd_qunit = QDATA_IS_BLK(qdata) ? lqs->lqs_bunit_sz : + lqs->lqs_iunit_sz; + cfs_spin_unlock(&lqs->lqs_lock); if (QDATA_IS_BLK(qdata)) QDATA_SET_ADJBLK(qdata); @@ -461,8 +484,8 @@ out: QDATA_SET_ADJINO(qdata); QDATA_DEBUG(qdata, "alloc/release qunit in dqacq_handler\n"); - if (lqs) - lqs_putref(lqs); +skip: + lqs_putref(lqs); return rc; } @@ -1276,7 +1299,8 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) /* orig_set means if quota was set before; now_set means we are * setting/cancelling quota */ int orig_set, now_set; - int rc, rc2 = 0, flag = 0; + struct lustre_qunit_size *lqs; + int rc = 0, rc2 = 0, flag = 0; ENTRY; if (oqctl->qc_type != USRQUOTA && @@ -1295,20 +1319,29 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out_sem, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out_sem, rc = PTR_ERR(dquot)); DQUOT_DEBUG(dquot, "get dquot in mds_set_blk\n"); QINFO_DEBUG(dquot->dq_info, "get dquot in mds_set_blk\n"); - cfs_down(&dquot->dq_sem); + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(out, rc); - if (dquot->dq_status) { + cfs_down(&dquot->dq_sem); + cfs_spin_lock(&lqs->lqs_lock); + if (LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); - GOTO(out_sem, rc = -EBUSY); + GOTO(skip, rc = -EBUSY); } - dquot->dq_status |= DQ_STATUS_SET; + LQS_SET_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); ihardlimit = dquot->dq_dqb.dqb_ihardlimit; isoftlimit = dquot->dq_dqb.dqb_isoftlimit; @@ -1373,7 +1406,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) if (rc) { CERROR("set limit failed! (rc:%d)\n", rc); - goto out; + GOTO(update_fail, rc); } cfs_up_write(&mds->mds_qonoff_sem); @@ -1419,11 +1452,13 @@ revoke_out: } rc2 = fsfilt_dquot(obd, dquot, QFILE_WR_DQUOT); cfs_up(&dquot->dq_sem); - +update_fail: + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_SETQUOTA(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip: + lqs_putref(lqs); out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_SET; - cfs_up(&dquot->dq_sem); lustre_dqput(dquot); EXIT; out_sem: @@ -1495,7 +1530,7 @@ int mds_get_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl) GOTO(out, rc = -ESRCH); } - dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type); + dquot = lustre_dqget(obd, qinfo, oqctl->qc_id, oqctl->qc_type, 1); if (IS_ERR(dquot)) GOTO(out, rc = PTR_ERR(dquot)); @@ -1555,32 +1590,47 @@ static int dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) { struct mds_obd *mds = &obd->u.mds; - struct lustre_quota_info *qinfo= &mds->mds_quota_info; + struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; + struct lustre_quota_info *qinfo = &mds->mds_quota_info; + struct lustre_qunit_size *lqs; struct lustre_dquot *dquot; struct obd_quotactl *qctl; __u64 total_limits = 0; - int rc; + int rc = 0; ENTRY; OBD_ALLOC_PTR(qctl); if (qctl == NULL) RETURN(-ENOMEM); - dquot = lustre_dqget(obd, qinfo, id, type); + dquot = lustre_dqget(obd, qinfo, id, type, 0); if (IS_ERR(dquot)) { CERROR("Get dquot failed. (rc:%ld)\n", PTR_ERR(dquot)); OBD_FREE_PTR(qctl); RETURN(PTR_ERR(dquot)); } + lqs = quota_search_lqs(LQS_KEY(type, id), qctxt, 1); + if (lqs == NULL) + rc = -ENOENT; + if (IS_ERR(lqs)) + rc = PTR_ERR(lqs); + if (rc) + GOTO(skip, rc); + cfs_down(&dquot->dq_sem); - /* don't recovery the dquot without limits or under setting */ + /* don't recover the dquot without limits or quota is setting or + * another recovery is already going on */ if (!(dquot->dq_dqb.dqb_bhardlimit || dquot->dq_dqb.dqb_bsoftlimit) || - dquot->dq_status) - GOTO(skip, rc = 0); - dquot->dq_status |= DQ_STATUS_RECOVERY; + LQS_IS_SETQUOTA(lqs) || LQS_IS_RECOVERY(lqs)) { + cfs_up(&dquot->dq_sem); + GOTO(skip1, rc = 0); + } + cfs_spin_lock(&lqs->lqs_lock); + LQS_SET_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); cfs_up(&dquot->dq_sem); /* release mds_qonoff_sem during obd_quotactl ops here */ @@ -1615,11 +1665,12 @@ dquot_recovery(struct obd_device *obd, unsigned int id, unsigned short type) cfs_up(&dquot->dq_sem); EXIT; out: - cfs_down(&dquot->dq_sem); - dquot->dq_status &= ~DQ_STATUS_RECOVERY; + cfs_spin_lock(&lqs->lqs_lock); + LQS_CLEAR_RECOVERY(lqs); + cfs_spin_unlock(&lqs->lqs_lock); +skip1: + lqs_putref(lqs); skip: - cfs_up(&dquot->dq_sem); - lustre_dqput(dquot); OBD_FREE_PTR(qctl); return rc;