Whamcloud - gitweb
b=18317
authoranserper <anserper>
Wed, 1 Apr 2009 21:33:55 +0000 (21:33 +0000)
committeranserper <anserper>
Wed, 1 Apr 2009 21:33:55 +0000 (21:33 +0000)
i=Johann Lombardi
i=ZhiYong Tian

wait until all lqs have been freed on cleanup

lustre/include/lustre_quota.h
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c

index 43a0d31..5871586 100644 (file)
@@ -233,6 +233,9 @@ struct lustre_quota_ctxt {
                                              * b=14840 */
         struct proc_dir_entry *lqc_proc_dir;
         struct lprocfs_stats  *lqc_stats; /* lquota statistics */
+
+        atomic_t      lqc_lqs;              /* the number of used hashed lqs */
+        cfs_waitq_t   lqc_lqs_waitq;        /* no lqs are in use */
 };
 
 #define QUOTA_MASTER_READY(qctxt)   (qctxt)->lqc_setup = 1
@@ -274,7 +277,9 @@ struct lustre_qunit_size {
 
 static inline void lqs_getref(struct lustre_qunit_size *lqs)
 {
-        atomic_inc(&lqs->lqs_refcount);
+        if (atomic_inc_return(&lqs->lqs_refcount) == 2) /* quota_create_lqs */
+                atomic_inc(&lqs->lqs_ctxt->lqc_lqs);
+
         CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
                lqs, atomic_read(&lqs->lqs_refcount));
 }
@@ -289,7 +294,9 @@ static inline void lqs_putref(struct lustre_qunit_size *lqs)
                                 &lqs->lqs_key, &lqs->lqs_hash);
                 OBD_FREE_PTR(lqs);
         } else {
-                atomic_dec(&lqs->lqs_refcount);
+                if (atomic_dec_return(&lqs->lqs_refcount) == 1)
+                        if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs))
+                                cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq);
                 CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
                        lqs, atomic_read(&lqs->lqs_refcount));
 
index 8f61bdf..05d6fe4 100644 (file)
@@ -188,8 +188,15 @@ int quota_create_lqs(struct qunit_data *qdata, struct quota_adjust_qunit *oqaq,
                 lqs->lqs_last_ishrink  = 0;
         }
         lqs_initref(lqs);
-        rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
+
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid)
+                rc = -EBUSY;
+        else
+                rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
                                     &lqs->lqs_key, &lqs->lqs_hash);
+        spin_unlock(&qctxt->lqc_lock);
+
         LQS_DEBUG(lqs, "create lqs\n");
         if (!rc) {
                 lqs_getref(lqs);
@@ -297,7 +304,6 @@ search_lqs:
         spin_unlock(&lqs->lqs_lock);
 
         lqs_putref(lqs);
-
         if (b_tmp > 0)
                 rc |= LQS_BLK_DECREASE;
         else if (b_tmp < 0)
index dcebb74..d435d8e 100644 (file)
@@ -350,6 +350,7 @@ check_cur_qunit(struct obd_device *obd,
 
         spin_unlock(&lqs->lqs_lock);
         lqs_putref(lqs);
+
         EXIT;
  out:
         OBD_FREE_PTR(qctl);
@@ -1161,6 +1162,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
                 RETURN(rc);
 
         cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+        cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+        atomic_set(&qctxt->lqc_lqs, 0);
         spin_lock_init(&qctxt->lqc_lock);
         spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_handler = handler;
@@ -1197,10 +1200,21 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         RETURN(rc);
 }
 
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+        int rc;
+        ENTRY;
+
+        rc = !atomic_read(&qctxt->lqc_lqs);
+
+        RETURN(rc);
+}
+
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
         struct list_head tmp_list;
+        struct l_wait_info lwi = { 0 };
         int i;
         ENTRY;
 
@@ -1231,8 +1245,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                 qunit_put(qunit);
         }
 
-        lustre_hash_exit(qctxt->lqc_lqs_hash);
-
         /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
          * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
         while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
@@ -1241,6 +1253,9 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                      cfs_time_seconds(1));
         }
 
+        l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+        lustre_hash_exit(qctxt->lqc_lqs_hash);
+
         ptlrpcd_decref();
 
 #ifdef LPROCFS
@@ -1409,7 +1424,8 @@ lqs_get(struct hlist_node *hnode)
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        atomic_inc(&q->lqs_refcount);
+        if (atomic_inc_return(&q->lqs_refcount) == 2) /* quota_search_lqs */
+                atomic_inc(&q->lqs_ctxt->lqc_lqs);
         CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
                q, atomic_read(&q->lqs_refcount));
 
@@ -1424,7 +1440,11 @@ lqs_put(struct hlist_node *hnode)
         ENTRY;
 
         LASSERT(atomic_read(&q->lqs_refcount) > 0);
-        atomic_dec(&q->lqs_refcount);
+
+        if (atomic_dec_return(&q->lqs_refcount) == 1)
+                if (atomic_dec_and_test(&q->lqs_ctxt->lqc_lqs))
+                        cfs_waitq_signal(&q->lqs_ctxt->lqc_lqs_waitq);
+
         CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
                q, atomic_read(&q->lqs_refcount));