/* when latest acquire RPC completed */
__u64 lse_acq_time;
+
+ /* when latest edquot set */
+ __u64 lse_edquot_time;
};
/* In-memory entry for each enforced quota id
#define lqe_lockh u.se.lse_lockh
#define lqe_acq_rc u.se.lse_acq_rc
#define lqe_acq_time u.se.lse_acq_time
+#define lqe_edquot_time u.se.lse_edquot_time
#define LQUOTA_BUMP_VER 0x1
#define LQUOTA_SET_VER 0x2
if (ret == 0 && lvb != 0) {
if (lvb->lvb_id_qunit != 0)
qsd_set_qunit(lqe, lvb->lvb_id_qunit);
- if (lvb->lvb_flags & LQUOTA_FL_EDQUOT)
- lqe->lqe_edquot = true;
- else
- lqe->lqe_edquot = false;
+ qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
} else if (repbody != NULL && repbody->qb_qunit != 0) {
qsd_set_qunit(lqe, repbody->qb_qunit);
}
lqe->lqe_pending_write += space;
lqe->lqe_waiting_write -= space;
rc = 0;
- } else if (lqe->lqe_edquot) {
+ /* lqe_edquot flag is used to avoid flooding dqacq requests when
+ * the user is over quota, however, the lqe_edquot could be stale
+ * sometimes due to the race reply of dqacq vs. id lock glimpse
+ * (see LU-4505), so we revalidate it every 5 seconds. */
+ } else if (lqe->lqe_edquot &&
+ cfs_time_before_64(cfs_time_shift_64(-5),
+ lqe->lqe_edquot_time)) {
rc = -EDQUOT;
- } else {
+ }else {
rc = -EAGAIN;
}
lqe_write_unlock(lqe);
lqe->lqe_nopreacq = false;
}
+/* helper function to set/clear edquot flag */
+static inline void qsd_set_edquot(struct lquota_entry *lqe, bool edquot)
+{
+ lqe->lqe_edquot = edquot;
+ if (edquot)
+ lqe->lqe_edquot_time = cfs_time_current_64();
+}
+
#define QSD_WB_INTERVAL 60 /* 60 seconds */
/* helper function calculating how long a service thread should be waiting for
/* Clear lqe_lockh & reset qunit to 0 */
qsd_set_qunit(lqe, 0);
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
- lqe->lqe_edquot = false;
+ qsd_set_edquot(lqe, false);
rel = true;
}
lqe_write_unlock(lqe);
}
}
- lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+ qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
lqe_write_unlock(lqe);
if (wakeup)
if (lustre_handle_is_used(&qti->qti_lockh)) {
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
qsd_set_qunit(lqe, 0);
- lqe->lqe_edquot = false;
+ qsd_set_edquot(lqe, false);
}
lqe_write_unlock(lqe);