Whamcloud - gitweb
Branch HEAD
authorfanyong <fanyong>
Fri, 26 Jun 2009 06:01:04 +0000 (06:01 +0000)
committerfanyong <fanyong>
Fri, 26 Jun 2009 06:01:04 +0000 (06:01 +0000)
b=18317
i=tianzy
i=andrew.perepechko

Wait until all lqs have been freed on cleanup.

lustre/include/lustre_quota.h
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c
lustre/tests/sanity-quota.sh

index 5c154ad..7cdf1d1 100644 (file)
@@ -312,6 +312,10 @@ struct lustre_quota_ctxt {
         struct proc_dir_entry *lqc_proc_dir;
         /** lquota statistics */
         struct lprocfs_stats  *lqc_stats;
+        /** the number of used hashed lqs */
+        atomic_t      lqc_lqs;
+        /** no lqs are in use */
+        cfs_waitq_t   lqc_lqs_waitq;
 };
 
 #define QUOTA_MASTER_READY(qctxt)   (qctxt)->lqc_setup = 1
@@ -363,30 +367,49 @@ struct lustre_qunit_size {
 #define LQS_KEY_ID(key)      (key & 0xffffffff)
 #define LQS_KEY_GRP(key)     (key >> 32)
 
-static inline void lqs_getref(struct lustre_qunit_size *lqs)
+static inline void __lqs_getref(struct lustre_qunit_size *lqs)
 {
-        atomic_inc(&lqs->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               lqs, atomic_read(&lqs->lqs_refcount));
+        int count = atomic_inc_return(&lqs->lqs_refcount);
+
+        if (count == 2) /* quota_create_lqs */
+                atomic_inc(&lqs->lqs_ctxt->lqc_lqs);
+        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count);
 }
 
-static inline void lqs_putref(struct lustre_qunit_size *lqs)
+static inline void lqs_getref(struct lustre_qunit_size *lqs)
 {
-        LASSERT(atomic_read(&lqs->lqs_refcount) > 0);
+        __lqs_getref(lqs);
+}
 
-        /* killing last ref, let's let hash table kill it */
-        if (atomic_read(&lqs->lqs_refcount) == 1) {
-                lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash,
-                                &lqs->lqs_key, &lqs->lqs_hash);
-                OBD_FREE_PTR(lqs);
+static inline void __lqs_putref(struct lustre_qunit_size *lqs, int del)
+{
+        int count = atomic_read(&lqs->lqs_refcount);
+
+        LASSERT(count > 0);
+        if (count == 1) {
+                CDEBUG(D_QUOTA, "lqs=%p refcount to be 0\n", lqs);
+                if (del) {
+                        /* killing last ref, let's let hash table kill it */
+                        lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash,
+                                        &lqs->lqs_key, &lqs->lqs_hash);
+                        OBD_FREE_PTR(lqs);
+                } else {
+                        atomic_dec(&lqs->lqs_refcount);
+                }
         } else {
-                atomic_dec(&lqs->lqs_refcount);
-                CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-                       lqs, atomic_read(&lqs->lqs_refcount));
-
+                count = atomic_dec_return(&lqs->lqs_refcount);
+                if (count == 1)
+                        if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs))
+                                cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq);
+                CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count);
         }
 }
 
+static inline void lqs_putref(struct lustre_qunit_size *lqs)
+{
+        __lqs_putref(lqs, 1);
+}
+
 static inline void lqs_initref(struct lustre_qunit_size *lqs)
 {
         atomic_set(&lqs->lqs_refcount, 0);
index 2c217c0..568f62f 100644 (file)
@@ -120,8 +120,15 @@ quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt)
                 lqs->lqs_last_ishrink  = 0;
         }
         lqs_initref(lqs);
-        rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, &lqs->lqs_key,
-                                    &lqs->lqs_hash);
+
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid)
+                rc = -EBUSY;
+        else
+                rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
+                                            &lqs->lqs_key, &lqs->lqs_hash);
+        spin_unlock(&qctxt->lqc_lock);
+
         if (!rc)
                 lqs_getref(lqs);
 
index 847851d..fd85997 100644 (file)
@@ -352,6 +352,7 @@ check_cur_qunit(struct obd_device *obd,
 
         spin_unlock(&lqs->lqs_lock);
         lqs_putref(lqs);
+
         EXIT;
  out:
         OBD_FREE_PTR(qctl);
@@ -1207,6 +1208,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
                 RETURN(rc);
 
         cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+        cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+        atomic_set(&qctxt->lqc_lqs, 0);
         spin_lock_init(&qctxt->lqc_lock);
         spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_handler = handler;
@@ -1249,6 +1252,16 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         RETURN(rc);
 }
 
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+        int rc;
+        ENTRY;
+
+        rc = !atomic_read(&qctxt->lqc_lqs);
+
+        RETURN(rc);
+}
+
 
 void hash_put_lqs(void *obj, void *data)
 {
@@ -1259,6 +1272,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
         struct list_head tmp_list;
+        struct l_wait_info lwi = { 0 };
         struct obd_device_target *obt = qctxt->lqc_obt;
         int i;
         ENTRY;
@@ -1290,12 +1304,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                 qunit_put(qunit);
         }
 
-        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
-        down_write(&obt->obt_rwsem);
-        lustre_hash_exit(qctxt->lqc_lqs_hash);
-        qctxt->lqc_lqs_hash = NULL;
-        up_write(&obt->obt_rwsem);
-
         /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
          * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
         while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
@@ -1304,6 +1312,13 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                      cfs_time_seconds(1));
         }
 
+        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+        l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+        down_write(&obt->obt_rwsem);
+        lustre_hash_exit(qctxt->lqc_lqs_hash);
+        qctxt->lqc_lqs_hash = NULL;
+        up_write(&obt->obt_rwsem);
+
         ptlrpcd_decref();
 
 #ifdef LPROCFS
@@ -1479,12 +1494,10 @@ static void *
 lqs_get(struct hlist_node *hnode)
 {
         struct lustre_qunit_size *q =
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        atomic_inc(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+        __lqs_getref(q);
 
         RETURN(q);
 }
@@ -1493,13 +1506,10 @@ static void *
 lqs_put(struct hlist_node *hnode)
 {
         struct lustre_qunit_size *q =
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        LASSERT(atomic_read(&q->lqs_refcount) > 0);
-        atomic_dec(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+        __lqs_putref(q, 0);
 
         RETURN(q);
 }
@@ -1507,10 +1517,10 @@ lqs_put(struct hlist_node *hnode)
 static void
 lqs_exit(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q;
+        struct lustre_qunit_size *q =
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time
index ce2dc4c..ecb9b2c 100644 (file)
@@ -83,7 +83,8 @@ SHOW_QUOTA_USER="$LFS quota -v -u $TSTUSR $DIR"
 SHOW_QUOTA_USER2="$LFS quota -v -u $TSTUSR2 $DIR"
 SHOW_QUOTA_GROUP="$LFS quota -v -g $TSTUSR $DIR"
 SHOW_QUOTA_GROUP2="$LFS quota -v -g $TSTUSR2 $DIR"
-SHOW_QUOTA_INFO="$LFS quota -t -u $DIR; $LFS quota -t -g $DIR"
+SHOW_QUOTA_INFO_USER="$LFS quota -t -u $DIR"
+SHOW_QUOTA_INFO_GROUP="$LFS quota -t -g $DIR"
 
 # control the time of tests
 cycle=30
@@ -298,8 +299,10 @@ test_1_sub() {
        $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(($LIMIT/2)) || quota_error u $TSTUSR "(usr) write failure, but expect success"
         etime=`date +%s`
         delta=$((etime - stime))
-        rate=$((BLK_SZ * LIMIT / 2 / delta / 1024))
-        [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec"
+        if [ $delta -gt 0 ]; then
+                rate=$((BLK_SZ * LIMIT / 2 / delta / 1024))
+                [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec"
+        fi
         log "    Done"
         log "    Write out of block quota ..."
        # this time maybe cache write,  ignore it's failure
@@ -362,7 +365,7 @@ test_1() {
            blk_qunit=$(( $RANDOM % 3072 + 1024 ))
            blk_qtune=$(( $RANDOM % $blk_qunit ))
            # other osts and mds will occupy at 1M blk quota
-           b_limit=$(( ($RANDOM - 16384) / 8 +  $OSTCOUNT * $blk_qunit * 4 ))
+           b_limit=$(( ($RANDOM - 16384) / 8 +  ($OSTCOUNT + 1) * $blk_qunit * 4 ))
            set_blk_tunesz $blk_qtune
            set_blk_unitsz $blk_qunit
            echo "cycle: $i(total $cycle) bunit:$blk_qunit, btune:$blk_qtune, blimit:$b_limit"
@@ -480,7 +483,8 @@ test_block_soft() {
 
        $SHOW_QUOTA_USER
        $SHOW_QUOTA_GROUP
-       $SHOW_QUOTA_INFO
+       $SHOW_QUOTA_INFO_USER
+       $SHOW_QUOTA_INFO_GROUP
 
        echo "    Write before timer goes off"
        $RUNDD count=$BUNIT_SZ seek=$OFFSET || \
@@ -494,7 +498,8 @@ test_block_soft() {
 
         $SHOW_QUOTA_USER
         $SHOW_QUOTA_GROUP
-        $SHOW_QUOTA_INFO
+        $SHOW_QUOTA_INFO_USER
+        $SHOW_QUOTA_INFO_GROUP
 
        echo "    Write after timer goes off"
        # maybe cache write, ignore.
@@ -506,7 +511,8 @@ test_block_soft() {
 
         $SHOW_QUOTA_USER
         $SHOW_QUOTA_GROUP
-        $SHOW_QUOTA_INFO
+        $SHOW_QUOTA_INFO_USER
+        $SHOW_QUOTA_INFO_GROUP
 
        echo "    Unlink file to stop timer"
        rm -f $TESTFILE
@@ -515,7 +521,8 @@ test_block_soft() {
 
         $SHOW_QUOTA_USER
         $SHOW_QUOTA_GROUP
-        $SHOW_QUOTA_INFO
+        $SHOW_QUOTA_INFO_USER
+        $SHOW_QUOTA_INFO_GROUP
 
        echo "    Write ..."
        $RUNDD count=$BUNIT_SZ || quota_error a $TSTUSR "write failure, but expect success"
@@ -585,7 +592,8 @@ test_file_soft() {
 
        $SHOW_QUOTA_USER
        $SHOW_QUOTA_GROUP
-       $SHOW_QUOTA_INFO
+       $SHOW_QUOTA_INFO_USER
+       $SHOW_QUOTA_INFO_GROUP
 
        echo "    Create file after timer goes off"
        # the least of inode qunit is 2, so there are at most 3(qunit:2+qtune:1)
@@ -598,7 +606,8 @@ test_file_soft() {
 
        $SHOW_QUOTA_USER
        $SHOW_QUOTA_GROUP
-       $SHOW_QUOTA_INFO
+       $SHOW_QUOTA_INFO_USER
+       $SHOW_QUOTA_INFO_GROUP
 
        echo "    Unlink files to stop timer"
        find `dirname $TESTFILE` -name "`basename ${TESTFILE}`*" | xargs rm -f
@@ -2009,7 +2018,7 @@ test_26() {
        wait_delete_completed
 
        # every quota slave gets 20MB
-       b_limit=$((OSTCOUNT * 20 * 1024))
+       b_limit=$(((OSTCOUNT + 1) * 20 * 1024))
        log "limit: ${b_limit}KB"
        $LFS setquota -u $TSTUSR -b 0 -B $b_limit -i 0 -I 0 $DIR
        sleep 3