Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index bb0bbd2..6bcbcc5 100644 (file)
@@ -423,7 +423,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         struct lustre_qunit *qunit = NULL;
         ENTRY;
 
-        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
         if (qunit == NULL)
                 RETURN(NULL);
 
@@ -541,6 +541,10 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
 
+        /* do it only when a releasing quota req more than 5MB b=18491 */
+        if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
+
         /* update local operational quota file */
         if (rc == 0) {
                 __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
@@ -636,8 +640,10 @@ out:
 
         compute_lqs_after_removing_qunit(qunit);
 
-        /* wake up all waiters */
+        if (rc == 0)
+                rc = QUOTA_REQ_RETURNED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+        /* wake up all waiters */
         wake_up_all(&qunit->lq_waitq);
 
         /* this is for dqacq_in_flight() */
@@ -664,8 +670,12 @@ out:
                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
                  RETURN(rc1);
          }
-         if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
-                RETURN(err);
+         if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
+                 RETURN(err);
+
+         if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+                 RETURN(err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
@@ -774,25 +784,56 @@ int check_qm(struct lustre_quota_ctxt *qctxt)
         RETURN(rc);
 }
 
+/* wake up all waiting threads when lqc_import is NULL */
+void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
+{
+        struct lustre_qunit *qunit, *tmp;
+        int i;
+        ENTRY;
+
+        spin_lock(&qunit_hash_lock);
+        for (i = 0; i < NR_DQHASH; i++) {
+                list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+                        if (qunit->lq_ctxt != qctxt)
+                                continue;
+
+                        /* Wake up all waiters. Do not change lq_state.
+                         * The waiters will check lq_rc which is kept as 0
+                         * if no others change it, then the waiters will return
+                         * -EAGAIN to caller who can perform related quota
+                         * acq/rel if necessary. */
+                        wake_up_all(&qunit->lq_waitq);
+                }
+        }
+        spin_unlock(&qunit_hash_lock);
+        EXIT;
+}
+
 static int got_qunit(struct lustre_qunit *qunit)
 {
-        int rc;
+        struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
+        int rc = 0;
         ENTRY;
 
         spin_lock(&qunit->lq_lock);
         switch (qunit->lq_state) {
         case QUNIT_IN_HASH:
         case QUNIT_RM_FROM_HASH:
-                rc = 0;
                 break;
         case QUNIT_FINISHED:
                 rc = 1;
                 break;
         default:
-                rc = 0;
                 CERROR("invalid qunit state %d\n", qunit->lq_state);
         }
         spin_unlock(&qunit->lq_lock);
+
+        if (!rc) {
+                spin_lock(&qctxt->lqc_lock);
+                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                spin_unlock(&qctxt->lqc_lock);
+        }
+
         RETURN(rc);
 }
 
@@ -952,16 +993,14 @@ wait_completion:
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                /* rc = -EAGAIN, it means a quota req is finished;
+                /* rc = -EAGAIN, it means the quota master isn't ready yet
+                 * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
                  * rc = -EBUSY, it means recovery is happening
                  * other rc < 0, it means real errors, functions who call
                  * schedule_dqacq should take care of this */
                 spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
+                rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
                        qunit, rc);
@@ -1057,10 +1096,7 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                        qunit, qunit->lq_rc);
                 /* keep same as schedule_dqacq() b=17030 */
                 spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
+                rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
                 /* this is for dqacq_in_flight() */
                 qunit_put(qunit);