From: fanyong Date: Fri, 26 Jun 2009 06:01:04 +0000 (+0000) Subject: Branch HEAD X-Git-Tag: v1_9_220~92 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a4f8b35cb2fdbf28a5c98c372506ad13ce84d4f1 Branch HEAD b=18317 i=tianzy i=andrew.perepechko Wait until all lqs have been freed on cleanup. --- diff --git a/lustre/include/lustre_quota.h b/lustre/include/lustre_quota.h index 5c154ad..7cdf1d1 100644 --- a/lustre/include/lustre_quota.h +++ b/lustre/include/lustre_quota.h @@ -312,6 +312,10 @@ struct lustre_quota_ctxt { struct proc_dir_entry *lqc_proc_dir; /** lquota statistics */ struct lprocfs_stats *lqc_stats; + /** the number of used hashed lqs */ + atomic_t lqc_lqs; + /** no lqs are in use */ + cfs_waitq_t lqc_lqs_waitq; }; #define QUOTA_MASTER_READY(qctxt) (qctxt)->lqc_setup = 1 @@ -363,30 +367,49 @@ struct lustre_qunit_size { #define LQS_KEY_ID(key) (key & 0xffffffff) #define LQS_KEY_GRP(key) (key >> 32) -static inline void lqs_getref(struct lustre_qunit_size *lqs) +static inline void __lqs_getref(struct lustre_qunit_size *lqs) { - atomic_inc(&lqs->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - lqs, atomic_read(&lqs->lqs_refcount)); + int count = atomic_inc_return(&lqs->lqs_refcount); + + if (count == 2) /* quota_create_lqs */ + atomic_inc(&lqs->lqs_ctxt->lqc_lqs); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count); } -static inline void lqs_putref(struct lustre_qunit_size *lqs) +static inline void lqs_getref(struct lustre_qunit_size *lqs) { - LASSERT(atomic_read(&lqs->lqs_refcount) > 0); + __lqs_getref(lqs); +} - /* killing last ref, let's let hash table kill it */ - if (atomic_read(&lqs->lqs_refcount) == 1) { - lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash, - &lqs->lqs_key, &lqs->lqs_hash); - OBD_FREE_PTR(lqs); +static inline void __lqs_putref(struct lustre_qunit_size *lqs, int del) +{ + int count = atomic_read(&lqs->lqs_refcount); + + LASSERT(count > 0); + if (count == 1) { + CDEBUG(D_QUOTA, "lqs=%p refcount to be 0\n", lqs); + if (del) { + /* killing last ref, let's let hash table kill it */ + lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash, + &lqs->lqs_key, &lqs->lqs_hash); + OBD_FREE_PTR(lqs); + } else { + atomic_dec(&lqs->lqs_refcount); + } } else { - atomic_dec(&lqs->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - lqs, atomic_read(&lqs->lqs_refcount)); - + count = atomic_dec_return(&lqs->lqs_refcount); + if (count == 1) + if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs)) + cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count); } } +static inline void lqs_putref(struct lustre_qunit_size *lqs) +{ + __lqs_putref(lqs, 1); +} + static inline void lqs_initref(struct lustre_qunit_size *lqs) { atomic_set(&lqs->lqs_refcount, 0); diff --git a/lustre/quota/quota_adjust_qunit.c b/lustre/quota/quota_adjust_qunit.c index 2c217c0..568f62f 100644 --- a/lustre/quota/quota_adjust_qunit.c +++ b/lustre/quota/quota_adjust_qunit.c @@ -120,8 +120,15 @@ quota_create_lqs(unsigned long long lqs_key, struct lustre_quota_ctxt *qctxt) lqs->lqs_last_ishrink = 0; } lqs_initref(lqs); - rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, &lqs->lqs_key, - &lqs->lqs_hash); + + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid) + rc = -EBUSY; + else + rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, + &lqs->lqs_key, &lqs->lqs_hash); + spin_unlock(&qctxt->lqc_lock); + if (!rc) lqs_getref(lqs); diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 847851d..fd85997 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -352,6 +352,7 @@ check_cur_qunit(struct obd_device *obd, spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); + EXIT; out: OBD_FREE_PTR(qctl); @@ -1207,6 +1208,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); + cfs_waitq_init(&qctxt->lqc_lqs_waitq); + atomic_set(&qctxt->lqc_lqs, 0); spin_lock_init(&qctxt->lqc_lock); spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; @@ -1249,6 +1252,16 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); } +static int check_lqs(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + rc = !atomic_read(&qctxt->lqc_lqs); + + RETURN(rc); +} + void hash_put_lqs(void *obj, void *data) { @@ -1259,6 +1272,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; struct list_head tmp_list; + struct l_wait_info lwi = { 0 }; struct obd_device_target *obt = qctxt->lqc_obt; int i; ENTRY; @@ -1290,12 +1304,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) qunit_put(qunit); } - lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); - down_write(&obt->obt_rwsem); - lustre_hash_exit(qctxt->lqc_lqs_hash); - qctxt->lqc_lqs_hash = NULL; - up_write(&obt->obt_rwsem); - /* after qctxt_cleanup, qctxt might be freed, then check_qm() is * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { @@ -1304,6 +1312,13 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) cfs_time_seconds(1)); } + lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); + down_write(&obt->obt_rwsem); + lustre_hash_exit(qctxt->lqc_lqs_hash); + qctxt->lqc_lqs_hash = NULL; + up_write(&obt->obt_rwsem); + ptlrpcd_decref(); #ifdef LPROCFS @@ -1479,12 +1494,10 @@ static void * lqs_get(struct hlist_node *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - atomic_inc(&q->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - q, atomic_read(&q->lqs_refcount)); + __lqs_getref(q); RETURN(q); } @@ -1493,13 +1506,10 @@ static void * lqs_put(struct hlist_node *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - LASSERT(atomic_read(&q->lqs_refcount) > 0); - atomic_dec(&q->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - q, atomic_read(&q->lqs_refcount)); + __lqs_putref(q, 0); RETURN(q); } @@ -1507,10 +1517,10 @@ lqs_put(struct hlist_node *hnode) static void lqs_exit(struct hlist_node *hnode) { - struct lustre_qunit_size *q; + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); /* * Nothing should be left. User of lqs put it and * lqs also was deleted from table by this time diff --git a/lustre/tests/sanity-quota.sh b/lustre/tests/sanity-quota.sh index ce2dc4c..ecb9b2c 100644 --- a/lustre/tests/sanity-quota.sh +++ b/lustre/tests/sanity-quota.sh @@ -83,7 +83,8 @@ SHOW_QUOTA_USER="$LFS quota -v -u $TSTUSR $DIR" SHOW_QUOTA_USER2="$LFS quota -v -u $TSTUSR2 $DIR" SHOW_QUOTA_GROUP="$LFS quota -v -g $TSTUSR $DIR" SHOW_QUOTA_GROUP2="$LFS quota -v -g $TSTUSR2 $DIR" -SHOW_QUOTA_INFO="$LFS quota -t -u $DIR; $LFS quota -t -g $DIR" +SHOW_QUOTA_INFO_USER="$LFS quota -t -u $DIR" +SHOW_QUOTA_INFO_GROUP="$LFS quota -t -g $DIR" # control the time of tests cycle=30 @@ -298,8 +299,10 @@ test_1_sub() { $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(($LIMIT/2)) || quota_error u $TSTUSR "(usr) write failure, but expect success" etime=`date +%s` delta=$((etime - stime)) - rate=$((BLK_SZ * LIMIT / 2 / delta / 1024)) - [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec" + if [ $delta -gt 0 ]; then + rate=$((BLK_SZ * LIMIT / 2 / delta / 1024)) + [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec" + fi log " Done" log " Write out of block quota ..." # this time maybe cache write, ignore it's failure @@ -362,7 +365,7 @@ test_1() { blk_qunit=$(( $RANDOM % 3072 + 1024 )) blk_qtune=$(( $RANDOM % $blk_qunit )) # other osts and mds will occupy at 1M blk quota - b_limit=$(( ($RANDOM - 16384) / 8 + $OSTCOUNT * $blk_qunit * 4 )) + b_limit=$(( ($RANDOM - 16384) / 8 + ($OSTCOUNT + 1) * $blk_qunit * 4 )) set_blk_tunesz $blk_qtune set_blk_unitsz $blk_qunit echo "cycle: $i(total $cycle) bunit:$blk_qunit, btune:$blk_qtune, blimit:$b_limit" @@ -480,7 +483,8 @@ test_block_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Write before timer goes off" $RUNDD count=$BUNIT_SZ seek=$OFFSET || \ @@ -494,7 +498,8 @@ test_block_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Write after timer goes off" # maybe cache write, ignore. @@ -506,7 +511,8 @@ test_block_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Unlink file to stop timer" rm -f $TESTFILE @@ -515,7 +521,8 @@ test_block_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Write ..." $RUNDD count=$BUNIT_SZ || quota_error a $TSTUSR "write failure, but expect success" @@ -585,7 +592,8 @@ test_file_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Create file after timer goes off" # the least of inode qunit is 2, so there are at most 3(qunit:2+qtune:1) @@ -598,7 +606,8 @@ test_file_soft() { $SHOW_QUOTA_USER $SHOW_QUOTA_GROUP - $SHOW_QUOTA_INFO + $SHOW_QUOTA_INFO_USER + $SHOW_QUOTA_INFO_GROUP echo " Unlink files to stop timer" find `dirname $TESTFILE` -name "`basename ${TESTFILE}`*" | xargs rm -f @@ -2009,7 +2018,7 @@ test_26() { wait_delete_completed # every quota slave gets 20MB - b_limit=$((OSTCOUNT * 20 * 1024)) + b_limit=$(((OSTCOUNT + 1) * 20 * 1024)) log "limit: ${b_limit}KB" $LFS setquota -u $TSTUSR -b 0 -B $b_limit -i 0 -I 0 $DIR sleep 3