All fields of an lqe, especially flags, must be changed only under a
lock protection. The flags like lqe_gl, lqe_is_reset, lqe_edquot are
the part of an unsigned long(8 bytes) memory region in lquota_entry.
It is impossible to change only 1 bit in a memory, so the general way
is to "mov" 1 byte to some register(AL), change it(AND, OR) in this
register and "mov" back to the memory. It is a window for a race and
might cause a corruption of the neighbour flags. So this patch adds a
locking for remain places where the flags or any lqe members are
changed without locking. One of known issues this patch is aimed to
sort out is the following assertion:
(qmt_lock.c:973:qmt_id_lock_glimpse()) ASSERTION( lqe->lqe_gl )
Restore one old assertion in qmt_id_lock_glimpse that was removed by
mistake in
16ee9ecdb0 - qmt_reba_thread() is the only thread that can
change lqe_gl flag. Also check that lqe_gl is not set when we are
freeing an lqe.
Fixes:
16ee9ecdb0 ("LU-16913 quota: fix ASSERTION(lqe->lqe_gl)")
Fixes:
78be823f33 ("LU-15218 quota: delete unused quota ID")
Signed-off-by: Sergey Cheremencev <scherementsev@ddn.com>
Change-Id: I954ed8aa23be01846e479a64c376c4b37af05268
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/58612
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
static inline void lqe_set_deleted(struct lquota_entry *lqe)
{
+ lqe_write_lock(lqe);
lqe->lqe_enforced = 0;
lqe->lqe_edquot = 0;
lqe->lqe_is_default = 0;
lqe->lqe_gracetime = 0;
lqe->lqe_is_deleted = 1;
+ lqe_write_unlock(lqe);
}
/* lquota_disk.c */
struct lquota_entry *lqe = container_of(kref, struct lquota_entry,
lqe_ref);
+ LASSERT(!lqe->lqe_gl);
OBD_SLAB_FREE_PTR(lqe, lqe_kmem);
}
if (IS_ERR(lqe))
RETURN(PTR_ERR(lqe));
+ lqe_write_lock(lqe);
lqe->lqe_is_deleted = 0;
lqe->lqe_is_reset = 0;
+ lqe_write_unlock(lqe);
rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid,
is_default, is_updated);
- if (rc == 0)
- lqe->lqe_is_deleted = 0;
-
lqe_putref(lqe);
RETURN(rc);
}
if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_NOSYNC))
th->th_sync = 0;
- lqe_write_lock(lqe);
rc = lquota_disk_delete(env, th,
qpi->qpi_glb_obj[qtype], qid, &ver);
-
- lqe_write_unlock(lqe);
dt_trans_stop(env, qmt->qmt_child, th);
if (rc == 0) {
need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
if (lvb->lvb_id_rel == 0) {
/* nothing to release */
- if (lvb->lvb_id_may_rel != 0)
+ if (lvb->lvb_id_may_rel != 0) {
/* but might still release later ... */
+ lqe_write_lock(lqe);
lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+ lqe_write_unlock(lqe);
+ }
}
if (!need_revoke && lvb->lvb_id_rel == 0)
}
lqe_write_lock(lqe);
- /*
- * It is possible to add an lqe in a 2nd time while the same lqe
- * from the 1st time is still sending glimpse
- */
- if (lqe->lqe_gl)
- GOTO(out, 0);
/* The purpose of glimpse callback on per-ID lock is twofold:
* - notify slaves of new qunit value and hope they will release some
* spare quota space in return
* replies if needed */
lqe->lqe_may_rel = 0;
+ /* The rebalance thread is the only thread which can issue glimpses */
+ LASSERT(!lqe->lqe_gl);
lqe->lqe_gl = true;
lqe_write_unlock(lqe);
}
LASSERT(lqe->lqe_gl);
lqe->lqe_gl = false;
-out:
lqe_write_unlock(lqe);
ldlm_resource_putref(res);
EXIT;
rc = 0;
break;
case 0:
+ lqe_write_lock(lqe);
if (qqi->qqi_qsd->qsd_is_md)
lqe->lqe_usage = rec->ispace;
else
lqe->lqe_usage = toqb(rec->bspace);
+ lqe_write_unlock(lqe);
break;
default:
LQUOTA_ERROR(lqe, "failed to read disk usage, rc:%d", rc);
GOTO(out, rc = PTR_ERR(lqe));
}
+ lqe_write_lock(lqe);
lqe->lqe_is_reset = false;
lqe->lqe_is_deleted = 0;
+ lqe_write_unlock(lqe);
/* The in-memory lqe update for slave index copy isn't deferred,
* we shouldn't touch it here. */
spin_unlock(&qsd->qsd_adjust_lock);
if (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
- LQUOTA_FLAG_REVOKE)
+ LQUOTA_FLAG_REVOKE) {
+ lqe_write_lock(lqe);
lqe->lqe_revoke = 1;
+ lqe_write_unlock(lqe);
+ }
/* Report usage asynchronously */
rc = qsd_adjust(env, lqe);
+ lqe_write_lock(lqe);
lqe->lqe_revoke = 0;
+ lqe_write_unlock(lqe);
if (rc)
LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
}
upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
(LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
LQUOTA_FLAG_DEFAULT)) {
+ lqe_write_lock(lqe);
lqe->lqe_is_default = true;
if (qqi->qqi_default_softlimit == 0 &&
qqi->qqi_default_hardlimit == 0)
lqe->lqe_enforced = false;
else
lqe->lqe_enforced = true;
+ lqe_write_unlock(lqe);
LQUOTA_DEBUG(lqe, "update to use default quota");
}
LQUOTA_FLAG_RESET)) {
struct lquota_slv_rec srec;
+ lqe_write_lock(lqe);
lqe->lqe_granted = 0;
lqe->lqe_softlimit = 0;
lqe->lqe_hardlimit = 0;
lqe->lqe_is_default = false;
lqe->lqe_is_reset = true;
+ lqe_write_unlock(lqe);
memset(&srec, 0, sizeof(srec));
rc = qsd_update_index(env, qqi, &upd->qur_qid, false, 0, &srec);