Whamcloud - gitweb
b=18630 avoid the race of acquiring/releasing quota
[fs/lustre-release.git] / lustre / quota / quota_context.c
index acfe3e5..0a19113 100644 (file)
@@ -337,11 +337,17 @@ check_cur_qunit(struct obd_device *obd,
                 ret = 2;
                 /* if there are other pending writes for this uid/gid, releasing
                  * quota is put off until the last pending write b=16645 */
-                if (ret == 2 && pending_write) {
+                /* if there is an ongoing quota request, a releasing request is aborted.
+                 * That ongoing quota request will call this function again when
+                 * it returned b=18630 */
+                if (pending_write || record) {
                         CDEBUG(D_QUOTA, "delay quota release\n");
                         ret = 0;
                 }
         }
+        if (ret > 0)
+                quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
+
         CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
                ", pending_write: "LPU64", record: "LPD64
                ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
@@ -913,6 +919,16 @@ static int got_qunit(struct lustre_qunit *qunit, int is_master)
         RETURN(rc);
 }
 
+static inline void
+revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc)
+{
+        /* revoke lqs_xxx_rec which is computed in check_cur_qunit
+         * b=18630 */
+        spin_lock(&lqs->lqs_lock);
+        quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0);
+        spin_unlock(&lqs->lqs_lock);
+}
+
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
@@ -932,8 +948,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
         do_gettimeofday(&work_start);
-        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
+
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs == NULL || IS_ERR(lqs)) {
+                CERROR("Can't find the lustre qunit size!\n");
+                RETURN(-EPERM);
+        }
+
+        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) {
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 RETURN(-ENOMEM);
+        }
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5);
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
@@ -941,6 +971,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 goto wait_completion;
         }
         qunit = empty;
@@ -948,19 +981,12 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
-                               qctxt, 0);
-        if (lqs && !IS_ERR(lqs)) {
-                spin_lock(&lqs->lqs_lock);
-                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
-                /* when this qdata returned from mds, it will call lqs_putref */
-                lqs_getref(lqs);
-                spin_unlock(&lqs->lqs_lock);
-                /* this is for quota_search_lqs */
-                lqs_putref(lqs);
-        } else {
-                CERROR("Can't find the lustre qunit size!\n");
-        }
+        /* From here, the quota request will be sent anyway.
+         * When this qdata request returned or is cancelled,
+         * lqs_putref will be called at that time */
+        lqs_getref(lqs);
+        /* this is for quota_search_lqs */
+        lqs_putref(lqs);
 
         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");