From: Niu Yawei Date: Wed, 22 Jan 2014 04:24:00 +0000 (-0500) Subject: LU-4505 quota: race of edquot updating X-Git-Tag: 2.5.56~31 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=4d8a24a319a980650897596b064f85578059caa9 LU-4505 quota: race of edquot updating The slave edquot flag could be set mistakenly as following: - slave A acquires quota from master, master found that the user is running out of quota, set edquot in reply; - another slave deletes files and release quota to master, master clears edquot and notify all slaves by glimpse; - glimpse reaches slave A before the reply of dqacq, so edquot flag will be set on slave A at the end. Given that edquot can't be fully trusted, it should only be revalidated every 5 seconds on the sync acquire path. Signed-off-by: Niu Yawei Change-Id: Id4db47462bdf620a42cd31f75726fbcaff869179 Reviewed-on: http://review.whamcloud.com/8954 Tested-by: Jenkins Reviewed-by: Fan Yong Reviewed-by: Johann Lombardi Tested-by: Maloo --- diff --git a/lustre/quota/lquota_internal.h b/lustre/quota/lquota_internal.h index e80b25b..dc9bbac 100644 --- a/lustre/quota/lquota_internal.h +++ b/lustre/quota/lquota_internal.h @@ -129,6 +129,9 @@ struct lquota_slv_entry { /* when latest acquire RPC completed */ __u64 lse_acq_time; + + /* when latest edquot set */ + __u64 lse_edquot_time; }; /* In-memory entry for each enforced quota id @@ -208,6 +211,7 @@ struct lquota_site { #define lqe_lockh u.se.lse_lockh #define lqe_acq_rc u.se.lse_acq_rc #define lqe_acq_time u.se.lse_acq_time +#define lqe_edquot_time u.se.lse_edquot_time #define LQUOTA_BUMP_VER 0x1 #define LQUOTA_SET_VER 0x2 diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 627f897..22a5d9d 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -388,10 +388,7 @@ static void qsd_req_completion(const struct lu_env *env, if (ret == 0 && lvb != 0) { if (lvb->lvb_id_qunit != 0) qsd_set_qunit(lqe, lvb->lvb_id_qunit); - if (lvb->lvb_flags & LQUOTA_FL_EDQUOT) - lqe->lqe_edquot = true; - else - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT)); } else if (repbody != NULL && repbody->qb_qunit != 0) { qsd_set_qunit(lqe, repbody->qb_qunit); } @@ -461,9 +458,15 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) lqe->lqe_pending_write += space; lqe->lqe_waiting_write -= space; rc = 0; - } else if (lqe->lqe_edquot) { + /* lqe_edquot flag is used to avoid flooding dqacq requests when + * the user is over quota, however, the lqe_edquot could be stale + * sometimes due to the race reply of dqacq vs. id lock glimpse + * (see LU-4505), so we revalidate it every 5 seconds. */ + } else if (lqe->lqe_edquot && + cfs_time_before_64(cfs_time_shift_64(-5), + lqe->lqe_edquot_time)) { rc = -EDQUOT; - } else { + }else { rc = -EAGAIN; } lqe_write_unlock(lqe); diff --git a/lustre/quota/qsd_internal.h b/lustre/quota/qsd_internal.h index a518f4a..b1b150f 100644 --- a/lustre/quota/qsd_internal.h +++ b/lustre/quota/qsd_internal.h @@ -317,6 +317,14 @@ static inline void qsd_set_qunit(struct lquota_entry *lqe, __u64 qunit) lqe->lqe_nopreacq = false; } +/* helper function to set/clear edquot flag */ +static inline void qsd_set_edquot(struct lquota_entry *lqe, bool edquot) +{ + lqe->lqe_edquot = edquot; + if (edquot) + lqe->lqe_edquot_time = cfs_time_current_64(); +} + #define QSD_WB_INTERVAL 60 /* 60 seconds */ /* helper function calculating how long a service thread should be waiting for diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c index f6790a9..d609830 100644 --- a/lustre/quota/qsd_lock.c +++ b/lustre/quota/qsd_lock.c @@ -348,7 +348,7 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de /* Clear lqe_lockh & reset qunit to 0 */ qsd_set_qunit(lqe, 0); memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh)); - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, false); rel = true; } lqe_write_unlock(lqe); @@ -443,7 +443,7 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data) } } - lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT); + qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT)); lqe_write_unlock(lqe); if (wakeup) @@ -512,7 +512,7 @@ int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe) if (lustre_handle_is_used(&qti->qti_lockh)) { memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh)); qsd_set_qunit(lqe, 0); - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, false); } lqe_write_unlock(lqe);