Whamcloud - gitweb
b=18630 avoid the race of acquiring/releasing quota
authorLanden <tianzy@sun.com>
Wed, 23 Dec 2009 07:52:25 +0000 (10:52 +0300)
committerMike Tappro <tappro@garden.(none)>
Wed, 23 Dec 2009 19:52:10 +0000 (22:52 +0300)
Before this patch, checking and computing the info of lqs
are located in two different functions(check_cur_qunit()
and schedule_dqacq()). It is racy. With this patch, they
will all happen in one function(check_cur_qunit()).

i=johann
i=panda

lustre/include/obd_support.h
lustre/quota/quota_context.c
lustre/tests/sanity-quota.sh

index 295ab00..4661626 100644 (file)
@@ -373,6 +373,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 
 #define OBD_FAIL_QUOTA_RET_QDATA         0xA02
 #define OBD_FAIL_QUOTA_DELAY_REL         0xA03
+#define OBD_FAIL_QUOTA_DELAY_SD          0xA04
 
 #define OBD_FAIL_LPROC_REMOVE            0xB00
 
index acfe3e5..0a19113 100644 (file)
@@ -337,11 +337,17 @@ check_cur_qunit(struct obd_device *obd,
                 ret = 2;
                 /* if there are other pending writes for this uid/gid, releasing
                  * quota is put off until the last pending write b=16645 */
-                if (ret == 2 && pending_write) {
+                /* if there is an ongoing quota request, a releasing request is aborted.
+                 * That ongoing quota request will call this function again when
+                 * it returned b=18630 */
+                if (pending_write || record) {
                         CDEBUG(D_QUOTA, "delay quota release\n");
                         ret = 0;
                 }
         }
+        if (ret > 0)
+                quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
+
         CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
                ", pending_write: "LPU64", record: "LPD64
                ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
@@ -913,6 +919,16 @@ static int got_qunit(struct lustre_qunit *qunit, int is_master)
         RETURN(rc);
 }
 
+static inline void
+revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc)
+{
+        /* revoke lqs_xxx_rec which is computed in check_cur_qunit
+         * b=18630 */
+        spin_lock(&lqs->lqs_lock);
+        quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0);
+        spin_unlock(&lqs->lqs_lock);
+}
+
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
@@ -932,8 +948,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
         do_gettimeofday(&work_start);
-        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
+
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs == NULL || IS_ERR(lqs)) {
+                CERROR("Can't find the lustre qunit size!\n");
+                RETURN(-EPERM);
+        }
+
+        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) {
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 RETURN(-ENOMEM);
+        }
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5);
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
@@ -941,6 +971,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 goto wait_completion;
         }
         qunit = empty;
@@ -948,19 +981,12 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
-                               qctxt, 0);
-        if (lqs && !IS_ERR(lqs)) {
-                spin_lock(&lqs->lqs_lock);
-                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
-                /* when this qdata returned from mds, it will call lqs_putref */
-                lqs_getref(lqs);
-                spin_unlock(&lqs->lqs_lock);
-                /* this is for quota_search_lqs */
-                lqs_putref(lqs);
-        } else {
-                CERROR("Can't find the lustre qunit size!\n");
-        }
+        /* From here, the quota request will be sent anyway.
+         * When this qdata request returned or is cancelled,
+         * lqs_putref will be called at that time */
+        lqs_getref(lqs);
+        /* this is for quota_search_lqs */
+        lqs_putref(lqs);
 
         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
index 4a0b5b7..3ef3ca7 100644 (file)
@@ -2128,6 +2128,54 @@ test_30()
 }
 run_test_with_stat 30 "hard limit updates should not reset grace times ================"
 
+# test duplicate quota releases b=18630
+test_31() {
+        mkdir -p $DIR/$tdir
+        chmod 0777 $DIR/$tdir
+
+        LIMIT=$(( $BUNIT_SZ * $(($OSTCOUNT + 1)) * 10)) # 10 bunits each sever
+        TESTFILE="$DIR/$tdir/$tfile-0"
+        TESTFILE2="$DIR/$tdir/$tfile-1"
+
+        wait_delete_completed
+
+        log "   User quota (limit: $LIMIT kbytes)"
+        $LFS setquota -u $TSTUSR -b 0 -B $LIMIT -i 0 -I 0 $DIR
+
+        $LFS setstripe $TESTFILE -i 0 -c 1
+        chown $TSTUSR.$TSTUSR $TESTFILE
+        $LFS setstripe $TESTFILE2 -i 0 -c 1
+        chown $TSTUSR.$TSTUSR $TESTFILE2
+
+        log "   step1: write out of block quota ..."
+        $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=5120
+        $RUNAS dd if=/dev/zero of=$TESTFILE2 bs=$BLK_SZ count=5120
+
+        #define OBD_FAIL_QUOTA_DELAY_SD      0xA04
+        #define OBD_FAIL_SOME        0x10000000 /* fail N times */
+        lustre_fail ost $((0x00000A04 | 0x10000000)) 1
+
+        log "   step2: delete two files so that triggering duplicate quota release ..."
+        rm -f $TESTFILE $TESTFILE2
+        sync; sleep 5; sync      #  OBD_FAIL_QUOTA_DELAY_SD will delay for 5 seconds
+        wait_delete_completed
+
+        log "   step3: verify if the ost failed"
+        do_facet ost1 dmesg > $TMP/lustre-log-${TESTNAME}.log
+        watchdog=`awk '/test 31/ {start = 1;}
+                       /release quota error/ {
+                               if (start) {
+                                       print;
+                               }
+                       }' $TMP/lustre-log-${TESTNAME}.log`
+        [ "$watchdog" ] && error "$watchdog"
+        rm -f $TMP/lustre-log-${TESTNAME}.log
+
+        lustre_fail ost 0
+        resetquota -u $TSTUSR
+}
+run_test_with_stat 31 "test duplicate quota releases ==="
+
 # turn off quota
 quota_fini()
 {