From: Landen Date: Thu, 25 Nov 2010 09:03:37 +0000 (+0800) Subject: b=24188 rehash patch makes oops in quota_search_lqs X-Git-Tag: 2.1.57.0~15 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=9364f1f12c830985b4d262c6a963bc0949facd17;hp=5700f5bc3c5bb0e65ca84f34c7b642066d8489e2 b=24188 rehash patch makes oops in quota_search_lqs Problem is: we should do "cfs_hash_getref(qctxt->lqc_lqs_hash)" when we search and create a new quota hash entry. This patch does some optimization for lquota code so that lquota can take one less reference for lqs. i=landen i=vitaly.fertman --- diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 1e12858..a153d6b 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -397,39 +397,25 @@ struct lustre_qunit_size { #define LQS_KEY_ID(key) (key & 0xffffffff) #define LQS_KEY_GRP(key) (key >> 32) -static inline void __lqs_getref(struct lustre_qunit_size *lqs) +static inline void lqs_getref(struct lustre_qunit_size *lqs) { int count = cfs_atomic_inc_return(&lqs->lqs_refcount); - if (count == 2) /* quota_create_lqs */ - cfs_atomic_inc(&lqs->lqs_ctxt->lqc_lqs); CDEBUG(D_INFO, "lqs=%p refcount %d\n", lqs, count); } -static inline void lqs_getref(struct lustre_qunit_size *lqs) +static inline void lqs_putref(struct lustre_qunit_size *lqs) { - __lqs_getref(lqs); -} + int count = cfs_atomic_read(&lqs->lqs_refcount); -static inline void __lqs_putref(struct lustre_qunit_size *lqs) -{ - LASSERT(cfs_atomic_read(&lqs->lqs_refcount) > 0); + LASSERT(count > 0); + CDEBUG(D_INFO, "lqs=%p refcount %d\n", lqs, count - 1); - if (cfs_atomic_dec_return(&lqs->lqs_refcount) == 1) + if (cfs_atomic_dec_and_test(&lqs->lqs_refcount)) { if (cfs_atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs)) cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq); - CDEBUG(D_INFO, "lqs=%p refcount %d\n", - lqs, cfs_atomic_read(&lqs->lqs_refcount)); -} - -static inline void lqs_putref(struct lustre_qunit_size *lqs) -{ - __lqs_putref(lqs); -} - -static inline void lqs_initref(struct lustre_qunit_size *lqs) -{ - cfs_atomic_set(&lqs->lqs_refcount, 0); + OBD_FREE_PTR(lqs); + } } #else diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index 8f01e0d..45f8c38 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -90,20 +90,45 @@ void quota_compute_lqs(struct qunit_data *qdata, struct lustre_qunit_size *lqs, } -static struct lustre_qunit_size * -quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt) +struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, + struct lustre_quota_ctxt *qctxt, + int create) { - struct lustre_qunit_size *lqs = NULL; + struct lustre_qunit_size *lqs; + struct lustre_qunit_size *lqs2; cfs_hash_t *hs = NULL; int rc = 0; + cfs_spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_valid) { + LASSERT(qctxt->lqc_lqs_hash != NULL); + hs = cfs_hash_getref(qctxt->lqc_lqs_hash); + } + cfs_spin_unlock(&qctxt->lqc_lock); + + if (hs == NULL) { + rc = -EBUSY; + goto out; + } + + /* cfs_hash_lookup will +1 refcount for caller */ + lqs = cfs_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key); + if (lqs != NULL) /* found */ + goto out_put; + + if (!create) + goto out_put; + OBD_ALLOC_PTR(lqs); - if (!lqs) - GOTO(out, rc = -ENOMEM); + if (!lqs) { + rc = -ENOMEM; + goto out_put; + } lqs->lqs_key = lqs_key; cfs_spin_lock_init(&lqs->lqs_lock); + lqs->lqs_bwrite_pending = 0; lqs->lqs_iwrite_pending = 0; lqs->lqs_ino_rec = 0; @@ -114,77 +139,38 @@ quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt) lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz; lqs->lqs_btune_sz = qctxt->lqc_btune_sz; lqs->lqs_itune_sz = qctxt->lqc_itune_sz; - lqs->lqs_ctxt = qctxt; if (qctxt->lqc_handler) { lqs->lqs_last_bshrink = 0; lqs->lqs_last_ishrink = 0; } - lqs_initref(lqs); - cfs_spin_lock(&qctxt->lqc_lock); - if (qctxt->lqc_valid) - hs = cfs_hash_getref(qctxt->lqc_lqs_hash); - cfs_spin_unlock(&qctxt->lqc_lock); + lqs->lqs_ctxt = qctxt; /* must be called before lqs_initref */ + cfs_atomic_set(&lqs->lqs_refcount, 1); /* 1 for caller */ + cfs_atomic_inc(&lqs->lqs_ctxt->lqc_lqs); - if (hs) { - lqs_getref(lqs); - rc = cfs_hash_add_unique(qctxt->lqc_lqs_hash, - &lqs->lqs_key, &lqs->lqs_hash); - if (rc) - lqs_putref(lqs); - cfs_hash_putref(hs); - } else { - rc = -EBUSY; - } + /* lqc_lqs_hash will take +1 refcount on lqs on adding */ + lqs2 = cfs_hash_findadd_unique(qctxt->lqc_lqs_hash, + &lqs->lqs_key, &lqs->lqs_hash); + if (lqs2 == lqs) /* added to hash */ + goto out_put; - out: - if (rc && lqs) - OBD_FREE_PTR(lqs); + create = 0; + lqs_putref(lqs); + lqs = lqs2; - if (rc) + out_put: + cfs_hash_putref(hs); + out: + if (rc != 0) { /* error */ + CERROR("get lqs error(rc: %d)\n", rc); return ERR_PTR(rc); - else - return lqs; -} - -struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key, - struct lustre_quota_ctxt *qctxt, - int create) -{ - struct lustre_qunit_size *lqs; - int rc = 0; - - search_lqs: - lqs = cfs_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key); - if (IS_ERR(lqs)) - GOTO(out, rc = PTR_ERR(lqs)); - - if (create && lqs == NULL) { - /* if quota_create_lqs is successful, it will get a - * ref to the lqs. The ref will be released when - * qctxt_cleanup() or quota is nullified */ - lqs = quota_create_lqs(lqs_key, qctxt); - if (IS_ERR(lqs)) - rc = PTR_ERR(lqs); - if (rc == -EALREADY) - GOTO(search_lqs, rc = 0); - /* get a reference for the caller when creating lqs - * successfully */ - if (rc == 0) - lqs_getref(lqs); } - if (lqs && rc == 0) + if (lqs != NULL) { LQS_DEBUG(lqs, "%s\n", (create == 1 ? "create lqs" : "search lqs")); - - out: - if (rc == 0) { - return lqs; - } else { - CERROR("get lqs error(rc: %d)\n", rc); - return ERR_PTR(rc); } + return lqs; } int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index c3e0a64..ace8caa 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -1302,12 +1302,11 @@ static int check_lqs(struct lustre_quota_ctxt *qctxt) RETURN(rc); } - -int hash_put_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd, +int qctxt_del_lqs(cfs_hash_t *hs, cfs_hash_bd_t *bd, cfs_hlist_node_t *hnode, void *data) - { - lqs_putref((struct lustre_qunit_size *)cfs_hash_object(hs, hnode)); + /* remove from hash and -1 refcount */ + cfs_hash_bd_del_locked(hs, bd, hnode); return 0; } @@ -1356,7 +1355,9 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) cfs_time_seconds(1)); } - cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); + /* release refcount on lustre_qunit_size holding by lqs_hash */ + cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, qctxt_del_lqs, NULL); + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); cfs_down_write(&obt->obt_rwsem); cfs_hash_putref(qctxt->lqc_lqs_hash); @@ -1629,7 +1630,7 @@ lqs_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct lustre_qunit_size *q = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - __lqs_getref(q); + lqs_getref(q); } static void @@ -1638,24 +1639,13 @@ lqs_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct lustre_qunit_size *q = cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - __lqs_putref(q); + lqs_putref(q); } static void lqs_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) { - struct lustre_qunit_size *q = - cfs_hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - - /* - * Nothing should be left. User of lqs put it and - * lqs also was deleted from table by this time - * so we should have 0 refs. - */ - LASSERTF(cfs_atomic_read(&q->lqs_refcount) == 0, - "Busy lqs %p with %d refs\n", q, - cfs_atomic_read(&q->lqs_refcount)); - OBD_FREE_PTR(q); + CERROR("It should not have any item left to be handled by this!"); } static cfs_hash_ops_t lqs_hash_ops = {