From e76824ef85947c10cba883fdae576b4182840518 Mon Sep 17 00:00:00 2001 From: anserper Date: Wed, 1 Apr 2009 21:33:55 +0000 Subject: [PATCH] b=18317 i=Johann Lombardi i=ZhiYong Tian wait until all lqs have been freed on cleanup --- lustre/include/lustre_quota.h | 11 +++++++++-- lustre/quota/quota_adjust_qunit.c | 10 ++++++++-- lustre/quota/quota_context.c | 28 ++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 43a0d31..5871586 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -233,6 +233,9 @@ struct lustre_quota_ctxt { * b=14840 */ struct proc_dir_entry *lqc_proc_dir; struct lprocfs_stats *lqc_stats; /* lquota statistics */ + + atomic_t lqc_lqs; /* the number of used hashed lqs */ + cfs_waitq_t lqc_lqs_waitq; /* no lqs are in use */ }; #define QUOTA_MASTER_READY(qctxt) (qctxt)->lqc_setup = 1 @@ -274,7 +277,9 @@ struct lustre_qunit_size { static inline void lqs_getref(struct lustre_qunit_size *lqs) { - atomic_inc(&lqs->lqs_refcount); + if (atomic_inc_return(&lqs->lqs_refcount) == 2) /* quota_create_lqs */ + atomic_inc(&lqs->lqs_ctxt->lqc_lqs); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, atomic_read(&lqs->lqs_refcount)); } @@ -289,7 +294,9 @@ static inline void lqs_putref(struct lustre_qunit_size *lqs) &lqs->lqs_key, &lqs->lqs_hash); OBD_FREE_PTR(lqs); } else { - atomic_dec(&lqs->lqs_refcount); + if (atomic_dec_return(&lqs->lqs_refcount) == 1) + if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs)) + cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq); CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, atomic_read(&lqs->lqs_refcount)); diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index 8f61bdf..05d6fe4 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -188,8 +188,15 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq, lqs->lqs_last_ishrink = 0; } lqs_initref(lqs); - rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, + + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid) + rc = -EBUSY; + else + rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, &lqs->lqs_key, &lqs->lqs_hash); + spin_unlock(&qctxt->lqc_lock); + LQS_DEBUG(lqs, "create lqs\n"); if (!rc) { lqs_getref(lqs); @@ -297,7 +304,6 @@ search_lqs: spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); - if (b_tmp > 0) rc |= LQS_BLK_DECREASE; else if (b_tmp < 0) diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index dcebb74..d435d8e 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -350,6 +350,7 @@ check_cur_qunit(struct obd_device *obd, spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); + EXIT; out: OBD_FREE_PTR(qctl); @@ -1161,6 +1162,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); + cfs_waitq_init(&qctxt->lqc_lqs_waitq); + atomic_set(&qctxt->lqc_lqs, 0); spin_lock_init(&qctxt->lqc_lock); spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; @@ -1197,10 +1200,21 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); } +static int check_lqs(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + rc = !atomic_read(&qctxt->lqc_lqs); + + RETURN(rc); +} + void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; struct list_head tmp_list; + struct l_wait_info lwi = { 0 }; int i; ENTRY; @@ -1231,8 +1245,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) qunit_put(qunit); } - lustre_hash_exit(qctxt->lqc_lqs_hash); - /* after qctxt_cleanup, qctxt might be freed, then check_qm() is * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { @@ -1241,6 +1253,9 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) cfs_time_seconds(1)); } + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); + lustre_hash_exit(qctxt->lqc_lqs_hash); + ptlrpcd_decref(); #ifdef LPROCFS @@ -1409,7 +1424,8 @@ lqs_get(struct hlist_node *hnode) hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - atomic_inc(&q->lqs_refcount); + if (atomic_inc_return(&q->lqs_refcount) == 2) /* quota_search_lqs */ + atomic_inc(&q->lqs_ctxt->lqc_lqs); CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", q, atomic_read(&q->lqs_refcount)); @@ -1424,7 +1440,11 @@ lqs_put(struct hlist_node *hnode) ENTRY; LASSERT(atomic_read(&q->lqs_refcount) > 0); - atomic_dec(&q->lqs_refcount); + + if (atomic_dec_return(&q->lqs_refcount) == 1) + if (atomic_dec_and_test(&q->lqs_ctxt->lqc_lqs)) + cfs_waitq_signal(&q->lqs_ctxt->lqc_lqs_waitq); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", q, atomic_read(&q->lqs_refcount)); -- 1.8.3.1