Whamcloud - gitweb
LU-550 incorrect quota hash function
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 859deee..181474b 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
 #include <lprocfs_status.h>
 #include "quota_internal.h"
 
+static int hash_lqs_cur_bits = HASH_LQS_CUR_BITS;
+CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
+                "the current bits of lqs hash");
+
 #ifdef HAVE_QUOTA_SUPPORT
 
 static cfs_hash_ops_t lqs_hash_ops;
@@ -1036,17 +1040,19 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 cfs_spin_lock(&qctxt->lqc_lock);
                 if (wait && !qctxt->lqc_import) {
                         cfs_spin_unlock(&qctxt->lqc_lock);
-
-                        LASSERT(oti && oti->oti_thread &&
-                                oti->oti_thread->t_watchdog);
-
-                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        LASSERT(oti && oti->oti_thread);
+                        /* The recovery thread doesn't have watchdog
+                         * attached. LU-369 */
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_disable(oti->oti_thread->\
+                                                t_watchdog);
                         CDEBUG(D_QUOTA, "sleep for quota master\n");
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
-                                 CFS_GET_TIMEOUT(oti->oti_thread->t_svc));
+                        if (oti->oti_thread->t_watchdog)
+                                lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                      CFS_GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         cfs_spin_unlock(&qctxt->lqc_lock);
                 }
@@ -1267,9 +1273,13 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         cfs_spin_unlock(&qctxt->lqc_lock);
 
         qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH",
-                                              HASH_LQS_CUR_BITS,
+                                              hash_lqs_cur_bits,
                                               HASH_LQS_MAX_BITS,
-                                              &lqs_hash_ops, CFS_HASH_REHASH);
+                                              min(hash_lqs_cur_bits,
+                                                  HASH_LQS_BKT_BITS),
+                                              0, CFS_HASH_MIN_THETA,
+                                              CFS_HASH_MAX_THETA,
+                                              &lqs_hash_ops, CFS_HASH_DEFAULT);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
                 RETURN(-ENOMEM);
@@ -1294,10 +1304,12 @@ static int check_lqs(struct lustre_quota_ctxt *qctxt)
         RETURN(rc);
 }
 
-
-void hash_put_lqs(void *obj, void *data)
+int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                 cfs_hlist_node_t *hnode, void *data)
 {
-        lqs_putref((struct lustre_qunit_size *)obj);
+        /* remove from hash and -1 refcount */
+        cfs_hash_bd_del_locked(hs, bd, hnode);
+        return 0;
 }
 
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
@@ -1345,10 +1357,12 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                                    cfs_time_seconds(1));
         }
 
-        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+        /* release refcount on lustre_qunit_size holding by lqs_hash */
+        cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL);
+
         l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
         cfs_down_write(&obt->obt_rwsem);
-        cfs_hash_destroy(qctxt->lqc_lqs_hash);
+        cfs_hash_putref(qctxt->lqc_lqs_hash);
         qctxt->lqc_lqs_hash = NULL;
         cfs_up_write(&obt->obt_rwsem);
 
@@ -1473,8 +1487,8 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt)
         data.qctxt = qctxt;
         cfs_init_completion(&data.comp);
 
-        rc = cfs_kernel_thread(qslave_recovery_main, &data,
-                               CLONE_VM|CLONE_FILES);
+        rc = cfs_create_thread(qslave_recovery_main, &data,
+                               CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("Cannot start quota recovery thread: rc %d\n", rc);
                 goto exit;
@@ -1484,30 +1498,17 @@ exit:
         EXIT;
 }
 
-int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+inline int quota_is_on(struct lustre_quota_ctxt *qctxt,
+                       struct obd_quotactl *oqctl)
 {
-        unsigned int type;
-
-        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
-                if (!Q_TYPESET(oqctl, type))
-                        continue;
-                if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)))
-                        return 0;
-        }
-        return 1;
+        return ((qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) ==
+                UGQUOTA2LQC(oqctl->qc_type));
 }
 
-int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+inline int quota_is_off(struct lustre_quota_ctxt *qctxt,
+                        struct obd_quotactl *oqctl)
 {
-        unsigned int type;
-
-        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
-                if (!Q_TYPESET(oqctl, type))
-                        continue;
-                if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))
-                        return 0;
-        }
-        return 1;
+        return !(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type));
 }
 
 /**
@@ -1574,15 +1575,15 @@ void build_lqs(struct obd_device *obd)
  * string hashing using djb2 hash algorithm
  */
 static unsigned
-lqs_hash(cfs_hash_t *hs, void *key, unsigned mask)
+lqs_hash(cfs_hash_t *hs, const void *key, unsigned mask)
 {
-        struct quota_adjust_qunit *lqs_key;
+        unsigned long long id;
         unsigned hash;
         ENTRY;
 
         LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
-        hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id;
+        id = *((unsigned long long *)key);
+        hash = (LQS_KEY_GRP(id) ? 5381 : 5387) * (unsigned)LQS_KEY_ID(id);
 
         RETURN(hash & mask);
 }
@@ -1598,71 +1599,51 @@ lqs_key(cfs_hlist_node_t *hnode)
 }
 
 static int
-lqs_compare(void *key, cfs_hlist_node_t *hnode)
+lqs_keycmp(const void *key, cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q;
-        int rc;
-        ENTRY;
-
-        LASSERT(key);
-        q = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-
-        cfs_spin_lock(&q->lqs_lock);
-        rc = (q->lqs_key == *((unsigned long long *)key));
-        cfs_spin_unlock(&q->lqs_lock);
+        struct lustre_qunit_size *q =
+                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
-        RETURN(rc);
+        RETURN(q->lqs_key == *((unsigned long long *)key));
 }
 
 static void *
-lqs_get(cfs_hlist_node_t *hnode)
+lqs_object(cfs_hlist_node_t *hnode)
 {
-        struct lustre_qunit_size *q =
-                cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
-
-        __lqs_getref(q);
-
-        RETURN(q);
+        return cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 }
 
-static void *
-lqs_put(cfs_hlist_node_t *hnode)
+static void
+lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
         struct lustre_qunit_size *q =
                 cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
-
-        __lqs_putref(q);
 
-        RETURN(q);
+        lqs_getref(q);
 }
 
 static void
-lqs_exit(cfs_hlist_node_t *hnode)
+lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
 {
         struct lustre_qunit_size *q =
                 cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        ENTRY;
 
-        /*
-         * Nothing should be left. User of lqs put it and
-         * lqs also was deleted from table by this time
-         * so we should have 0 refs.
-         */
-        LASSERTF(cfs_atomic_read(&q->lqs_refcount) == 0,
-                 "Busy lqs %p with %d refs\n", q,
-                 cfs_atomic_read(&q->lqs_refcount));
-        OBD_FREE_PTR(q);
-        EXIT;
+        lqs_putref(q);
+}
+
+static void
+lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+        CERROR("It should not have any item left to be handled by this!");
 }
 
 static cfs_hash_ops_t lqs_hash_ops = {
-        .hs_hash    = lqs_hash,
-        .hs_key     = lqs_key,
-        .hs_compare = lqs_compare,
-        .hs_get     = lqs_get,
-        .hs_put     = lqs_put,
-        .hs_exit    = lqs_exit
+        .hs_hash        = lqs_hash,
+        .hs_key         = lqs_key,
+        .hs_keycmp      = lqs_keycmp,
+        .hs_object      = lqs_object,
+        .hs_get         = lqs_get,
+        .hs_put_locked  = lqs_put_locked,
+        .hs_exit        = lqs_exit
 };
 #endif /* HAVE_QUOTA_SUPPORT */