Whamcloud - gitweb
b=18630 avoid the race of acquiring/releasing quota
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 29e2f3a..0a19113 100644 (file)
 #include <obd_class.h>
 #include <lustre_quota.h>
 #include <lustre_fsfilt.h>
-#include <class_hash.h>
 #include <lprocfs_status.h>
 #include "quota_internal.h"
 
 #ifdef HAVE_QUOTA_SUPPORT
 
-static lustre_hash_ops_t lqs_hash_ops;
+static cfs_hash_ops_t lqs_hash_ops;
 
 unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */
 unsigned long default_btune_ratio = 50;             /* 50 percentage */
@@ -247,7 +246,7 @@ check_cur_qunit(struct obd_device *obd,
         int ret = 0;
         ENTRY;
 
-        if (!sb_any_quota_enabled(sb))
+        if (!ll_sb_any_quota_active(sb))
                 RETURN(0);
 
         spin_lock(&qctxt->lqc_lock);
@@ -290,8 +289,8 @@ check_cur_qunit(struct obd_device *obd,
         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
                                qctxt, 0);
         if (IS_ERR(lqs) || lqs == NULL) {
-                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
-                       QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
+                CERROR("fail to find a lqs for %sid: %u)!\n",
+                       QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id);
                 GOTO (out, ret = 0);
         }
         spin_lock(&lqs->lqs_lock);
@@ -338,11 +337,17 @@ check_cur_qunit(struct obd_device *obd,
                 ret = 2;
                 /* if there are other pending writes for this uid/gid, releasing
                  * quota is put off until the last pending write b=16645 */
-                if (ret == 2 && pending_write) {
+                /* if there is an ongoing quota request, a releasing request is aborted.
+                 * That ongoing quota request will call this function again when
+                 * it returned b=18630 */
+                if (pending_write || record) {
                         CDEBUG(D_QUOTA, "delay quota release\n");
                         ret = 0;
                 }
         }
+        if (ret > 0)
+                quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
+
         CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
                ", pending_write: "LPU64", record: "LPD64
                ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
@@ -371,7 +376,7 @@ int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         int ret = QUOTA_RET_OK;
         ENTRY;
 
-        if (!sb_any_quota_enabled(sb))
+        if (!ll_sb_any_quota_active(sb))
                 RETURN(QUOTA_RET_NOQUOTA);
 
         /* ignore root user */
@@ -520,8 +525,9 @@ void* quota_barrier(struct lustre_quota_ctxt *qctxt,
 
         OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
         if (qunit == NULL) {
-                CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
-                       oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+                CERROR("locating %sunit failed for %sid %u\n",
+                       isblk ? "b" : "i", oqctl->qc_type ? "g" : "u",
+                       oqctl->qc_id);
                 qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
                                          oqctl->qc_type, isblk);
                 return NULL;
@@ -726,10 +732,8 @@ out:
 
         /* this is for dqacq_in_flight() */
         qunit_put(qunit);
-        /* this is for alloc_qunit() */
-        qunit_put(qunit);
         if (rc < 0 && rc != -EDQUOT)
-                 RETURN(err);
+                GOTO(out1, err);
 
         /* don't reschedule in such cases:
          *   - acq/rel failure and qunit isn't changed,
@@ -739,21 +743,21 @@ out:
          */
          OBD_ALLOC_PTR(oqaq);
          if (!oqaq)
-                 RETURN(-ENOMEM);
+                 GOTO(out1, err = -ENOMEM);
          qdata_to_oqaq(qdata, oqaq);
          /* adjust the qunit size in slaves */
          rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
          OBD_FREE_PTR(oqaq);
          if (rc1 < 0) {
                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
-                 RETURN(rc1);
+                 GOTO(out1, err = rc1);
          }
          if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
-                 RETURN(err);
+                 GOTO(out1, err);
 
          if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
              OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
-                 RETURN(err);
+                 GOTO(out1, err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
@@ -765,6 +769,9 @@ out:
                 rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL);
                 QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
         }
+ out1:
+        /* this is for alloc_qunit() */
+        qunit_put(qunit);
         RETURN(err);
 }
 
@@ -799,7 +806,7 @@ static int dqacq_interpret(const struct lu_env *env,
                 DEBUG_REQ(D_ERROR, req,
                           "error unpacking qunit_data(rc: %ld)\n",
                           PTR_ERR(qdata));
-                RETURN(PTR_ERR(qdata));
+                qdata = &qunit->lq_data;
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -807,14 +814,14 @@ static int dqacq_interpret(const struct lu_env *env,
 
         if (qdata->qd_id != qunit->lq_data.qd_id ||
             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) {
-                CDEBUG(D_ERROR, "the returned qd_id isn't expected!"
+                CERROR("the returned qd_id isn't expected!"
                        "(qdata: %u, lq_data: %u)\n", qdata->qd_id,
                        qunit->lq_data.qd_id);
                 qdata->qd_id = qunit->lq_data.qd_id;
                 rc = -EPROTO;
         }
         if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) {
-                CDEBUG(D_ERROR, "the returned grp/usr isn't expected!"
+                CERROR("the returned grp/usr isn't expected!"
                        "(qdata: %u, lq_data: %u)\n", qdata->qd_flags,
                        qunit->lq_data.qd_flags);
                 if (QDATA_IS_GRP(&qunit->lq_data))
@@ -824,12 +831,16 @@ static int dqacq_interpret(const struct lu_env *env,
                 rc = -EPROTO;
         }
         if (qdata->qd_count > qunit->lq_data.qd_count) {
-                CDEBUG(D_ERROR, "the returned qd_count isn't expected!"
+                CERROR("the returned qd_count isn't expected!"
                        "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count,
                        qunit->lq_data.qd_count);
                 rc = -EPROTO;
         }
 
+        if (unlikely(rc == -ESRCH))
+                CERROR("quota for %s has been enabled by master, but disabled "
+                       "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user");
+
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
@@ -908,6 +919,16 @@ static int got_qunit(struct lustre_qunit *qunit, int is_master)
         RETURN(rc);
 }
 
+static inline void
+revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc)
+{
+        /* revoke lqs_xxx_rec which is computed in check_cur_qunit
+         * b=18630 */
+        spin_lock(&lqs->lqs_lock);
+        quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0);
+        spin_unlock(&lqs->lqs_lock);
+}
+
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
@@ -927,8 +948,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
         do_gettimeofday(&work_start);
-        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
+
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs == NULL || IS_ERR(lqs)) {
+                CERROR("Can't find the lustre qunit size!\n");
+                RETURN(-EPERM);
+        }
+
+        if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) {
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 RETURN(-ENOMEM);
+        }
+
+        OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5);
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
@@ -936,6 +971,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 spin_unlock(&qunit_hash_lock);
                 qunit_put(empty);
 
+                revoke_lqs_rec(lqs, qdata, opc);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
                 goto wait_completion;
         }
         qunit = empty;
@@ -943,19 +981,12 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
-                               qctxt, 0);
-        if (lqs && !IS_ERR(lqs)) {
-                spin_lock(&lqs->lqs_lock);
-                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
-                /* when this qdata returned from mds, it will call lqs_putref */
-                lqs_getref(lqs);
-                spin_unlock(&lqs->lqs_lock);
-                /* this is for quota_search_lqs */
-                lqs_putref(lqs);
-        } else {
-                CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
-        }
+        /* From here, the quota request will be sent anyway.
+         * When this qdata request returned or is cancelled,
+         * lqs_putref will be called at that time */
+        lqs_getref(lqs);
+        /* this is for quota_search_lqs */
+        lqs_putref(lqs);
 
         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
@@ -1031,7 +1062,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                                         LUSTRE_MDS_VERSION, opc);
         class_import_put(imp);
         if (req == NULL) {
-                CDEBUG(D_ERROR, "Can't alloc request\n");
+                CERROR("Can't alloc request\n");
                 dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
                 /* this is for qunit_get() */
                 qunit_put(qunit);
@@ -1042,7 +1073,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         req->rq_no_resend = req->rq_no_delay = 1;
         rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
         if (rc < 0) {
-                CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+                CERROR("Can't pack qunit_data(rc: %d)\n", rc);
                 ptlrpc_req_finished(req);
                 dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
                 /* this is for qunit_get() */
@@ -1234,10 +1265,10 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_sync_blk = 0;
         spin_unlock(&qctxt->lqc_lock);
 
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
-                                               HASH_LQS_CUR_BITS,
-                                               HASH_LQS_MAX_BITS,
-                                               &lqs_hash_ops, 0);
+        qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH",
+                                              HASH_LQS_CUR_BITS,
+                                              HASH_LQS_MAX_BITS,
+                                              &lqs_hash_ops, CFS_HASH_REHASH);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
                 RETURN(-ENOMEM);
@@ -1312,10 +1343,10 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                      cfs_time_seconds(1));
         }
 
-        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
         l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
         down_write(&obt->obt_rwsem);
-        lustre_hash_exit(qctxt->lqc_lqs_hash);
+        cfs_hash_destroy(qctxt->lqc_lqs_hash);
         qctxt->lqc_lqs_hash = NULL;
         up_write(&obt->obt_rwsem);
 
@@ -1370,7 +1401,7 @@ static int qslave_recovery_main(void *arg)
                 int ret;
 
                 LOCK_DQONOFF_MUTEX(dqopt);
-                if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) {
+                if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) {
                         UNLOCK_DQONOFF_MUTEX(dqopt);
                         break;
                 }
@@ -1411,9 +1442,8 @@ static int qslave_recovery_main(void *arg)
                                 rc = 0;
                         }
 
-                        if (rc)
-                                CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
-                                       "qslave recovery failed! (id:%d type:%d "
+                        if (rc && rc != -EBUSY)
+                                CERROR("qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
                         OBD_FREE_PTR(dqid);
@@ -1434,7 +1464,7 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
         int rc;
         ENTRY;
 
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (!ll_sb_any_quota_active(qctxt->lqc_sb))
                 goto exit;
 
         data.obd = obd;
@@ -1496,6 +1526,9 @@ void build_lqs(struct obd_device *obd)
         for (i = 0; i < MAXQUOTAS; i++) {
                 struct dquot_id *dqid, *tmp;
 
+                if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
+                        continue;
+
 #ifndef KERNEL_SUPPORTS_QUOTA_READ
                 rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
                                  i, &id_list);
@@ -1504,7 +1537,7 @@ void build_lqs(struct obd_device *obd)
                                  i, &id_list);
 #endif
                 if (rc) {
-                        CDEBUG(D_ERROR, "fail to get %s qids!\n",
+                        CERROR("%s: failed to get %s qids!\n", obd->obd_name,
                                i ? "group" : "user");
                         continue;
                 }
@@ -1520,8 +1553,8 @@ void build_lqs(struct obd_device *obd)
                                 lqs->lqs_flags |= dqid->di_flag;
                                 lqs_putref(lqs);
                         } else {
-                                CDEBUG(D_ERROR, "fail to create a lqs"
-                                       "(%s id: %u)!\n", i ? "group" : "user",
+                                CERROR("%s: failed to create a lqs for %sid %u"
+                                       "\n", obd->obd_name, i ? "g" : "u",
                                        dqid->di_id);
                         }
 
@@ -1538,7 +1571,7 @@ void build_lqs(struct obd_device *obd)
  * string hashing using djb2 hash algorithm
  */
 static unsigned
-lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
+lqs_hash(cfs_hash_t *hs, void *key, unsigned mask)
 {
         struct quota_adjust_qunit *lqs_key;
         unsigned hash;
@@ -1587,7 +1620,7 @@ lqs_put(struct hlist_node *hnode)
                 hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        __lqs_putref(q, 0);
+        __lqs_putref(q);
 
         RETURN(q);
 }
@@ -1611,11 +1644,11 @@ lqs_exit(struct hlist_node *hnode)
         EXIT;
 }
 
-static lustre_hash_ops_t lqs_hash_ops = {
-        .lh_hash    = lqs_hash,
-        .lh_compare = lqs_compare,
-        .lh_get     = lqs_get,
-        .lh_put     = lqs_put,
-        .lh_exit    = lqs_exit
+static cfs_hash_ops_t lqs_hash_ops = {
+        .hs_hash    = lqs_hash,
+        .hs_compare = lqs_compare,
+        .hs_get     = lqs_get,
+        .hs_put     = lqs_put,
+        .hs_exit    = lqs_exit
 };
 #endif /* HAVE_QUOTA_SUPPORT */