Whamcloud - gitweb
LU-4505 quota: race of edquot updating 54/8954/3
authorNiu Yawei <yawei.niu@intel.com>
Wed, 22 Jan 2014 04:24:00 +0000 (23:24 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 21 Feb 2014 03:42:48 +0000 (03:42 +0000)
The slave edquot flag could be set mistakenly as following:

- slave A acquires quota from master, master found that the
  user is running out of quota, set edquot in reply;
- another slave deletes files and release quota to master,
  master clears edquot and notify all slaves by glimpse;
- glimpse reaches slave A before the reply of dqacq, so
  edquot flag will be set on slave A at the end.

Given that edquot can't be fully trusted, it should only be
revalidated every 5 seconds on the sync acquire path.

Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Change-Id: Id4db47462bdf620a42cd31f75726fbcaff869179
Reviewed-on: http://review.whamcloud.com/8954
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
lustre/quota/lquota_internal.h
lustre/quota/qsd_handler.c
lustre/quota/qsd_internal.h
lustre/quota/qsd_lock.c

index e80b25b..dc9bbac 100644 (file)
@@ -129,6 +129,9 @@ struct lquota_slv_entry {
 
        /* when latest acquire RPC completed */
        __u64                   lse_acq_time;
+
+       /* when latest edquot set */
+       __u64                   lse_edquot_time;
 };
 
 /* In-memory entry for each enforced quota id
@@ -208,6 +211,7 @@ struct lquota_site {
 #define lqe_lockh              u.se.lse_lockh
 #define lqe_acq_rc             u.se.lse_acq_rc
 #define lqe_acq_time           u.se.lse_acq_time
+#define lqe_edquot_time                u.se.lse_edquot_time
 
 #define LQUOTA_BUMP_VER 0x1
 #define LQUOTA_SET_VER  0x2
index 627f897..22a5d9d 100644 (file)
@@ -388,10 +388,7 @@ static void qsd_req_completion(const struct lu_env *env,
        if (ret == 0 && lvb != 0) {
                if (lvb->lvb_id_qunit != 0)
                        qsd_set_qunit(lqe, lvb->lvb_id_qunit);
-               if (lvb->lvb_flags & LQUOTA_FL_EDQUOT)
-                       lqe->lqe_edquot = true;
-               else
-                       lqe->lqe_edquot = false;
+               qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
        } else if (repbody != NULL && repbody->qb_qunit != 0) {
                qsd_set_qunit(lqe, repbody->qb_qunit);
        }
@@ -461,9 +458,15 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
                lqe->lqe_pending_write += space;
                lqe->lqe_waiting_write -= space;
                rc = 0;
-       } else if (lqe->lqe_edquot) {
+       /* lqe_edquot flag is used to avoid flooding dqacq requests when
+        * the user is over quota, however, the lqe_edquot could be stale
+        * sometimes due to the race reply of dqacq vs. id lock glimpse
+        * (see LU-4505), so we revalidate it every 5 seconds. */
+       } else if (lqe->lqe_edquot &&
+                  cfs_time_before_64(cfs_time_shift_64(-5),
+                                     lqe->lqe_edquot_time)) {
                rc = -EDQUOT;
-       } else {
+       }else {
                rc = -EAGAIN;
        }
        lqe_write_unlock(lqe);
index a518f4a..b1b150f 100644 (file)
@@ -317,6 +317,14 @@ static inline void qsd_set_qunit(struct lquota_entry *lqe, __u64 qunit)
        lqe->lqe_nopreacq = false;
 }
 
+/* helper function to set/clear edquot flag */
+static inline void qsd_set_edquot(struct lquota_entry *lqe, bool edquot)
+{
+       lqe->lqe_edquot = edquot;
+       if (edquot)
+               lqe->lqe_edquot_time = cfs_time_current_64();
+}
+
 #define QSD_WB_INTERVAL        60 /* 60 seconds */
 
 /* helper function calculating how long a service thread should be waiting for
index f6790a9..d609830 100644 (file)
@@ -348,7 +348,7 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de
                        /* Clear lqe_lockh & reset qunit to 0 */
                        qsd_set_qunit(lqe, 0);
                        memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
-                       lqe->lqe_edquot = false;
+                       qsd_set_edquot(lqe, false);
                        rel = true;
                }
                lqe_write_unlock(lqe);
@@ -443,7 +443,7 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                }
        }
 
-       lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+       qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
        lqe_write_unlock(lqe);
 
        if (wakeup)
@@ -512,7 +512,7 @@ int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
        if (lustre_handle_is_used(&qti->qti_lockh)) {
                memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
                qsd_set_qunit(lqe, 0);
-               lqe->lqe_edquot = false;
+               qsd_set_edquot(lqe, false);
        }
        lqe_write_unlock(lqe);