Whamcloud - gitweb
LU-882 quota: Quota code compares unsigned < 0
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 9f9035a..f1be4f8 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
 #include <lprocfs_status.h>
 #include "quota_internal.h"
 
+static int hash_lqs_cur_bits = HASH_LQS_CUR_BITS;
+CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
+                "the current bits of lqs hash");
+
 #ifdef HAVE_QUOTA_SUPPORT
 
 static cfs_hash_ops_t lqs_hash_ops;
@@ -318,7 +325,7 @@ check_cur_qunit(struct obd_device *obd,
         limit_org = limit;
         /* when a releasing quota req is sent, before it returned
            limit is assigned a small value. limit will overflow */
-        if (limit + record < 0)
+        if (record < 0)
                 usage -= record;
         else
                 limit += record;
@@ -1036,17 +1043,19 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 cfs_spin_lock(&qctxt->lqc_lock);
                 if (wait && !qctxt->lqc_import) {
                         cfs_spin_unlock(&qctxt->lqc_lock);
-
-                        LASSERT(oti && oti->oti_thread &&
-                                oti->oti_thread->t_watchdog);
-
-                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        LASSERT(oti && oti->oti_thread);
+                        /* The recovery thread doesn't have watchdog
+                         * attached. LU-369 */
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_disable(oti->oti_thread->\
+                                                t_watchdog);
                         CDEBUG(D_QUOTA, "sleep for quota master\n");
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
-                                 CFS_GET_TIMEOUT(oti->oti_thread->t_svc));
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                      CFS_GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         cfs_spin_unlock(&qctxt->lqc_lock);
                 }
@@ -1088,7 +1097,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         aa->aa_qunit = qunit;
 
         req->rq_interpret_reply = dqacq_interpret;
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
 
         QDATA_DEBUG(qdata, "%s scheduled.\n",
                     opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
@@ -1267,9 +1276,13 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         cfs_spin_unlock(&qctxt->lqc_lock);
 
         qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH",
-                                              HASH_LQS_CUR_BITS,
+                                              hash_lqs_cur_bits,
                                               HASH_LQS_MAX_BITS,
-                                              &lqs_hash_ops, CFS_HASH_REHASH);
+                                              min(hash_lqs_cur_bits,
+                                                  HASH_LQS_BKT_BITS),
+                                              0, CFS_HASH_MIN_THETA,
+                                              CFS_HASH_MAX_THETA,
+                                              &lqs_hash_ops, CFS_HASH_DEFAULT);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
                 RETURN(-ENOMEM);
@@ -1294,10 +1307,12 @@ static int check_lqs(struct lustre_quota_ctxt *qctxt)
         RETURN(rc);
 }
 
-
-void hash_put_lqs(void *obj, void *data)
+int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                 cfs_hlist_node_t *hnode, void *data)
 {
-        lqs_putref((struct lustre_qunit_size *)obj);
+        /* remove from hash and -1 refcount */
+        cfs_hash_bd_del_locked(hs, bd, hnode);
+        return 0;
 }
 
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
@@ -1345,7 +1360,9 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                                    cfs_time_seconds(1));
         }
 
-        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+        /* release refcount on lustre_qunit_size holding by lqs_hash */
+        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL);
+
         l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
         cfs_down_write(&obt->obt_rwsem);
         cfs_hash_putref(qctxt->lqc_lqs_hash);
@@ -1473,8 +1490,8 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
         data.qctxt = qctxt;
         cfs_init_completion(&data.comp);
 
-        rc = cfs_kernel_thread(qslave_recovery_main, &data,
-                               CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(qslave_recovery_main, &data,
+                               CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
                 goto exit;
@@ -1484,30 +1501,17 @@ exit:
         EXIT;
 }
 
-int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+inline int quota_is_on(struct lustre_quota_ctxt *qctxt,
+                       struct obd_quotactl *oqctl)
 {
-        unsigned int type;
-
-        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
-                if (!Q_TYPESET(oqctl, type))
-                        continue;
-                if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)))
-                        return 0;
-        }
-        return 1;
+        return ((qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) ==
+                UGQUOTA2LQC(oqctl->qc_type));
 }
 
-int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+inline int quota_is_off(struct lustre_quota_ctxt *qctxt,
+                        struct obd_quotactl *oqctl)
 {
-        unsigned int type;
-
-        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
-                if (!Q_TYPESET(oqctl, type))
-                        continue;
-                if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))
-                        return 0;
-        }
-        return 1;
+        return !(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type));
 }
 
 /**
@@ -1574,15 +1578,15 @@ void build_lqs(struct obd_device *obd)
  * string hashing using djb2 hash algorithm
  */
 static unsigned
-lqs_hash(cfs_hash_t *hs, void *key, unsigned mask)
+lqs_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
-        struct quota_adjust_qunit *lqs_key;
+        unsigned long long id;
         unsigned hash;
         ENTRY;
 
         LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
-        hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id;
+        id = *((unsigned long long *)key);
+        hash = (LQS_KEY_GRP(id) ? 5381 : 5387) * (unsigned)LQS_KEY_ID(id);
 
         RETURN(hash & mask);
 }
@@ -1598,71 +1602,51 @@ lqs_key(cfs_hlist_node_t *hnode)
 }
 
 static int
-lqs_compare(void *key, cfs_hlist_node_t *hnode)
+lqs_keycmp(const void *key, cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q;
-        int rc;
-        ENTRY;
-
-        LASSERT(key);
-        q = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-
-        cfs_spin_lock(&q->lqs_lock);
-        rc = (q->lqs_key == *((unsigned long long *)key));
-        cfs_spin_unlock(&q->lqs_lock);
+        struct lustre_qunit_size *q =
+                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
-        RETURN(rc);
+        RETURN(q->lqs_key == *((unsigned long long *)key));
 }
 
 static void *
-lqs_get(cfs_hlist_node_t *hnode)
+lqs_object(cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q =
-                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
-
-        __lqs_getref(q);
-
-        RETURN(q);
+        return cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 }
 
-static void *
-lqs_put(cfs_hlist_node_t *hnode)
+static void
+lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
         struct lustre_qunit_size *q =
                 cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
-
-        __lqs_putref(q);
 
-        RETURN(q);
+        lqs_getref(q);
 }
 
 static void
-lqs_exit(cfs_hlist_node_t *hnode)
+lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
         struct lustre_qunit_size *q =
                 cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
 
-        /*
-         * Nothing should be left. User of lqs put it and
-         * lqs also was deleted from table by this time
-         * so we should have 0 refs.
-         */
-        LASSERTF(cfs_atomic_read(&q->lqs_refcount) == 0,
-                 "Busy lqs %p with %d refs\n", q,
-                 cfs_atomic_read(&q->lqs_refcount));
-        OBD_FREE_PTR(q);
-        EXIT;
+        lqs_putref(q);
+}
+
+static void
+lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+        CERROR("It should not have any item left to be handled by this!");
 }
 
 static cfs_hash_ops_t lqs_hash_ops = {
-        .hs_hash    = lqs_hash,
-        .hs_key     = lqs_key,
-        .hs_compare = lqs_compare,
-        .hs_get     = lqs_get,
-        .hs_put     = lqs_put,
-        .hs_exit    = lqs_exit
+        .hs_hash        = lqs_hash,
+        .hs_key         = lqs_key,
+        .hs_keycmp      = lqs_keycmp,
+        .hs_object      = lqs_object,
+        .hs_get         = lqs_get,
+        .hs_put_locked  = lqs_put_locked,
+        .hs_exit        = lqs_exit
 };
 #endif /* HAVE_QUOTA_SUPPORT */