Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index f7e3665..1e928c3 100644 (file)
@@ -417,19 +417,6 @@ out:
         return ret;
 }
 
-/* caller must hold qunit_hash_lock */
-static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
-                                            struct qunit_data *qdata)
-{
-        unsigned int hashent = qunit_hashfn(qctxt, qdata);
-        struct lustre_qunit *qunit;
-        ENTRY;
-
-        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
-        qunit = find_qunit(hashent, qctxt, qdata);
-        RETURN(qunit);
-}
-
 static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
                                         struct qunit_data *qdata, int opc)
 {
@@ -468,6 +455,21 @@ static void qunit_put(struct lustre_qunit *qunit)
                 free_qunit(qunit);
 }
 
+/* caller must hold qunit_hash_lock and release ref of qunit after using it */
+static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
+                                            struct qunit_data *qdata)
+{
+        unsigned int hashent = qunit_hashfn(qctxt, qdata);
+        struct lustre_qunit *qunit;
+        ENTRY;
+
+        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
+        qunit = find_qunit(hashent, qctxt, qdata);
+        if (qunit)
+                qunit_get(qunit);
+        RETURN(qunit);
+}
+
 static void
 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 {
@@ -638,6 +640,9 @@ out:
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
         wake_up_all(&qunit->lq_waitq);
 
+        /* this is for dqacq_in_flight() */
+        qunit_put(qunit);
+        /* this is for alloc_qunit() */
         qunit_put(qunit);
         if (rc < 0 && rc != -EDQUOT)
                  RETURN(err);
@@ -816,19 +821,16 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                if (wait)
-                        qunit_get(qunit);
                 spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
                 goto wait_completion;
         }
         qunit = empty;
+        qunit_get(qunit);
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        LASSERT(qunit);
-
         quota_search_lqs(qdata, NULL, qctxt, &lqs);
         if (lqs) {
                 spin_lock(&lqs->lqs_lock);
@@ -852,6 +854,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 QDATA_SET_CHANGE_QS(qdata);
                 rc = qctxt->lqc_handler(obd, qdata, opc);
                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+
                 do_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
                 if (opc == QUOTA_DQACQ)
@@ -879,6 +884,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
                 wake_up_all(&qunit->lq_waitq);
 
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+                /* this for alloc_qunit() */
                 qunit_put(qunit);
                 spin_lock(&qctxt->lqc_lock);
                 if (wait && !qctxt->lqc_import) {
@@ -911,6 +919,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         if (req == NULL) {
                 CDEBUG(D_ERROR, "Can't alloc request\n");
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
                 RETURN(-ENOMEM);
         }
 
@@ -921,12 +931,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
                 ptlrpc_req_finished(req);
                 dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
                 RETURN(rc);
         }
 
-        if (wait && qunit)
-                qunit_get(qunit);
-
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         aa->aa_ctxt = qctxt;
@@ -956,9 +965,9 @@ wait_completion:
                 spin_unlock(&qunit->lq_lock);
                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
                        qunit, rc);
-                qunit_put(qunit);
         }
 
+        qunit_put(qunit);
         do_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
         if (opc == QUOTA_DQACQ)
@@ -1037,11 +1046,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, &qdata);
-        if (qunit)
-                /* grab reference on this qunit to handle races with
-                 * dqacq_completion(). Otherwise, this qunit could be freed just
-                 * after we release the qunit_hash_lock */
-                qunit_get(qunit);
         spin_unlock(&qunit_hash_lock);
 
         if (qunit) {
@@ -1058,6 +1062,7 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 else
                         rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
+                /* this is for dqacq_in_flight() */
                 qunit_put(qunit);
                 do_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);