Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 4aac3ae..258b809 100644 (file)
@@ -302,7 +302,7 @@ check_cur_qunit(struct obd_device *obd,
         if (QDATA_IS_BLK(qdata)) {
                 qunit_sz = lqs->lqs_bunit_sz;
                 tune_sz  = lqs->lqs_btune_sz;
-                pending_write = lqs->lqs_bwrite_pending * CFS_PAGE_SIZE;
+                pending_write = lqs->lqs_bwrite_pending;
                 record   = lqs->lqs_blk_rec;
                 LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
         } else {
@@ -417,26 +417,13 @@ out:
         return ret;
 }
 
-/* caller must hold qunit_hash_lock */
-static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
-                                            struct qunit_data *qdata)
-{
-        unsigned int hashent = qunit_hashfn(qctxt, qdata);
-        struct lustre_qunit *qunit;
-        ENTRY;
-
-        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
-        qunit = find_qunit(hashent, qctxt, qdata);
-        RETURN(qunit);
-}
-
 static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
                                         struct qunit_data *qdata, int opc)
 {
         struct lustre_qunit *qunit = NULL;
         ENTRY;
 
-        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
         if (qunit == NULL)
                 RETURN(NULL);
 
@@ -468,6 +455,21 @@ static void qunit_put(struct lustre_qunit *qunit)
                 free_qunit(qunit);
 }
 
+/* caller must hold qunit_hash_lock and release ref of qunit after using it */
+static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
+                                            struct qunit_data *qdata)
+{
+        unsigned int hashent = qunit_hashfn(qctxt, qdata);
+        struct lustre_qunit *qunit;
+        ENTRY;
+
+        LASSERT_SPIN_LOCKED(&qunit_hash_lock);
+        qunit = find_qunit(hashent, qctxt, qdata);
+        if (qunit)
+                qunit_get(qunit);
+        RETURN(qunit);
+}
+
 static void
 insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 {
@@ -497,7 +499,6 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
                 /* this is for schedule_dqacq */
                 lqs_putref(lqs);
         }
-
 }
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
@@ -539,6 +540,10 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
 
+        /* do it only when a releasing quota req more than 5MB b=18491 */
+        if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
+
         /* update local operational quota file */
         if (rc == 0) {
                 __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
@@ -634,10 +639,15 @@ out:
 
         compute_lqs_after_removing_qunit(qunit);
 
-        /* wake up all waiters */
+        if (rc == 0)
+                rc = QUOTA_REQ_RETURNED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+        /* wake up all waiters */
         wake_up_all(&qunit->lq_waitq);
 
+        /* this is for dqacq_in_flight() */
+        qunit_put(qunit);
+        /* this is for alloc_qunit() */
         qunit_put(qunit);
         if (rc < 0 && rc != -EDQUOT)
                  RETURN(err);
@@ -659,8 +669,12 @@ out:
                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
                  RETURN(rc1);
          }
-         if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
-                RETURN(err);
+         if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
+                 RETURN(err);
+
+         if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+                 RETURN(err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
@@ -769,25 +783,56 @@ int check_qm(struct lustre_quota_ctxt *qctxt)
         RETURN(rc);
 }
 
+/* wake up all waiting threads when lqc_import is NULL */
+void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
+{
+        struct lustre_qunit *qunit, *tmp;
+        int i;
+        ENTRY;
+
+        spin_lock(&qunit_hash_lock);
+        for (i = 0; i < NR_DQHASH; i++) {
+                list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+                        if (qunit->lq_ctxt != qctxt)
+                                continue;
+
+                        /* Wake up all waiters. Do not change lq_state.
+                         * The waiters will check lq_rc which is kept as 0
+                         * if no others change it, then the waiters will return
+                         * -EAGAIN to caller who can perform related quota
+                         * acq/rel if necessary. */
+                        wake_up_all(&qunit->lq_waitq);
+                }
+        }
+        spin_unlock(&qunit_hash_lock);
+        EXIT;
+}
+
 static int got_qunit(struct lustre_qunit *qunit)
 {
-        int rc;
+        struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
+        int rc = 0;
         ENTRY;
 
         spin_lock(&qunit->lq_lock);
         switch (qunit->lq_state) {
         case QUNIT_IN_HASH:
         case QUNIT_RM_FROM_HASH:
-                rc = 0;
                 break;
         case QUNIT_FINISHED:
                 rc = 1;
                 break;
         default:
-                rc = 0;
                 CERROR("invalid qunit state %d\n", qunit->lq_state);
         }
         spin_unlock(&qunit->lq_lock);
+
+        if (!rc) {
+                spin_lock(&qctxt->lqc_lock);
+                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                spin_unlock(&qctxt->lqc_lock);
+        }
+
         RETURN(rc);
 }
 
@@ -816,19 +861,16 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                if (wait)
-                        qunit_get(qunit);
                 spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
                 goto wait_completion;
         }
         qunit = empty;
+        qunit_get(qunit);
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        LASSERT(qunit);
-
         quota_search_lqs(qdata, NULL, qctxt, &lqs);
         if (lqs) {
                 spin_lock(&lqs->lqs_lock);
@@ -852,6 +894,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 QDATA_SET_CHANGE_QS(qdata);
                 rc = qctxt->lqc_handler(obd, qdata, opc);
                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+
                 do_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
                 if (opc == QUOTA_DQACQ)
@@ -879,6 +924,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
                 wake_up_all(&qunit->lq_waitq);
 
+                /* this is for qunit_get() */
+                qunit_put(qunit);
+                /* this for alloc_qunit() */
                 qunit_put(qunit);
                 spin_lock(&qctxt->lqc_lock);
                 if (wait && !qctxt->lqc_import) {
@@ -892,7 +940,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                 GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         spin_unlock(&qctxt->lqc_lock);
                 }
@@ -911,6 +960,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         if (req == NULL) {
                 CDEBUG(D_ERROR, "Can't alloc request\n");
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
                 RETURN(-ENOMEM);
         }
 
@@ -921,12 +972,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
                 ptlrpc_req_finished(req);
                 dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
+                /* this is for qunit_get() */
+                qunit_put(qunit);
                 RETURN(rc);
         }
 
-        if (wait && qunit)
-                qunit_get(qunit);
-
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
         aa->aa_ctxt = qctxt;
@@ -943,22 +993,20 @@ wait_completion:
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                /* rc = -EAGAIN, it means a quota req is finished;
+                /* rc = -EAGAIN, it means the quota master isn't ready yet
+                 * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
                  * rc = -EBUSY, it means recovery is happening
                  * other rc < 0, it means real errors, functions who call
                  * schedule_dqacq should take care of this */
                 spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
+                rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
                        qunit, rc);
-                qunit_put(qunit);
         }
 
+        qunit_put(qunit);
         do_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
         if (opc == QUOTA_DQACQ)
@@ -1037,11 +1085,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, &qdata);
-        if (qunit)
-                /* grab reference on this qunit to handle races with
-                 * dqacq_completion(). Otherwise, this qunit could be freed just
-                 * after we release the qunit_hash_lock */
-                qunit_get(qunit);
         spin_unlock(&qunit_hash_lock);
 
         if (qunit) {
@@ -1051,6 +1094,11 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
                        qunit, qunit->lq_rc);
+                /* keep same as schedule_dqacq() b=17030 */
+                spin_lock(&qunit->lq_lock);
+                rc = qunit->lq_rc;
+                spin_unlock(&qunit->lq_lock);
+                /* this is for dqacq_in_flight() */
                 qunit_put(qunit);
                 do_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
@@ -1058,13 +1106,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                                     isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA :
                                             LQUOTA_WAIT_PENDING_INO_QUOTA,
                                     timediff);
-                /* keep same as schedule_dqacq() b=17030 */
-                spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
-                spin_unlock(&qunit->lq_lock);
         } else {
                 do_gettimeofday(&work_end);
                 timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
@@ -1117,7 +1158,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_sync_blk = 0;
         spin_unlock(&qctxt->lqc_lock);
 
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
+        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+                                               HASH_LQS_CUR_BITS,
+                                               HASH_LQS_MAX_BITS,
                                                &lqs_hash_ops, 0);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
@@ -1346,7 +1389,7 @@ lqs_compare(void *key, struct hlist_node *hnode)
 static void *
 lqs_get(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1360,7 +1403,7 @@ lqs_get(struct hlist_node *hnode)
 static void *
 lqs_put(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1379,12 +1422,12 @@ lqs_exit(struct hlist_node *hnode)
         ENTRY;
 
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        /* 
+        /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time
          * so we should have 0 refs.
          */
-        LASSERTF(atomic_read(&q->lqs_refcount) == 0, 
+        LASSERTF(atomic_read(&q->lqs_refcount) == 0,
                  "Busy lqs %p with %d refs\n", q,
                  atomic_read(&q->lqs_refcount));
         OBD_FREE_PTR(q);