struct proc_dir_entry *lqc_proc_dir;
/** lquota statistics */
struct lprocfs_stats *lqc_stats;
+ /** the number of used hashed lqs */
+ atomic_t lqc_lqs;
+ /** no lqs are in use */
+ cfs_waitq_t lqc_lqs_waitq;
};
#define QUOTA_MASTER_READY(qctxt) (qctxt)->lqc_setup = 1
#define LQS_KEY_ID(key) (key & 0xffffffff)
#define LQS_KEY_GRP(key) (key >> 32)
-static inline void lqs_getref(struct lustre_qunit_size *lqs)
+static inline void __lqs_getref(struct lustre_qunit_size *lqs)
{
- atomic_inc(&lqs->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- lqs, atomic_read(&lqs->lqs_refcount));
+ int count = atomic_inc_return(&lqs->lqs_refcount);
+
+ if (count == 2) /* quota_create_lqs */
+ atomic_inc(&lqs->lqs_ctxt->lqc_lqs);
+ CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count);
}
-static inline void lqs_putref(struct lustre_qunit_size *lqs)
+static inline void lqs_getref(struct lustre_qunit_size *lqs)
{
- LASSERT(atomic_read(&lqs->lqs_refcount) > 0);
+ __lqs_getref(lqs);
+}
- /* killing last ref, let's let hash table kill it */
- if (atomic_read(&lqs->lqs_refcount) == 1) {
- lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash,
- &lqs->lqs_key, &lqs->lqs_hash);
- OBD_FREE_PTR(lqs);
+static inline void __lqs_putref(struct lustre_qunit_size *lqs, int del)
+{
+ int count = atomic_read(&lqs->lqs_refcount);
+
+ LASSERT(count > 0);
+ if (count == 1) {
+ CDEBUG(D_QUOTA, "lqs=%p refcount to be 0\n", lqs);
+ if (del) {
+ /* killing last ref, let's let hash table kill it */
+ lustre_hash_del(lqs->lqs_ctxt->lqc_lqs_hash,
+ &lqs->lqs_key, &lqs->lqs_hash);
+ OBD_FREE_PTR(lqs);
+ } else {
+ atomic_dec(&lqs->lqs_refcount);
+ }
} else {
- atomic_dec(&lqs->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- lqs, atomic_read(&lqs->lqs_refcount));
-
+ count = atomic_dec_return(&lqs->lqs_refcount);
+ if (count == 1)
+ if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs))
+ cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq);
+ CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", lqs, count);
}
}
+static inline void lqs_putref(struct lustre_qunit_size *lqs)
+{
+ __lqs_putref(lqs, 1);
+}
+
static inline void lqs_initref(struct lustre_qunit_size *lqs)
{
atomic_set(&lqs->lqs_refcount, 0);
lqs->lqs_last_ishrink = 0;
}
lqs_initref(lqs);
- rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash, &lqs->lqs_key,
- &lqs->lqs_hash);
+
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_valid)
+ rc = -EBUSY;
+ else
+ rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
+ &lqs->lqs_key, &lqs->lqs_hash);
+ spin_unlock(&qctxt->lqc_lock);
+
if (!rc)
lqs_getref(lqs);
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
+
EXIT;
out:
OBD_FREE_PTR(qctl);
RETURN(rc);
cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+ cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+ atomic_set(&qctxt->lqc_lqs, 0);
spin_lock_init(&qctxt->lqc_lock);
spin_lock(&qctxt->lqc_lock);
qctxt->lqc_handler = handler;
RETURN(rc);
}
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+ int rc;
+ ENTRY;
+
+ rc = !atomic_read(&qctxt->lqc_lqs);
+
+ RETURN(rc);
+}
+
void hash_put_lqs(void *obj, void *data)
{
{
struct lustre_qunit *qunit, *tmp;
struct list_head tmp_list;
+ struct l_wait_info lwi = { 0 };
struct obd_device_target *obt = qctxt->lqc_obt;
int i;
ENTRY;
qunit_put(qunit);
}
- lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
- down_write(&obt->obt_rwsem);
- lustre_hash_exit(qctxt->lqc_lqs_hash);
- qctxt->lqc_lqs_hash = NULL;
- up_write(&obt->obt_rwsem);
-
/* after qctxt_cleanup, qctxt might be freed, then check_qm() is
* unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
cfs_time_seconds(1));
}
+ lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+ l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+ down_write(&obt->obt_rwsem);
+ lustre_hash_exit(qctxt->lqc_lqs_hash);
+ qctxt->lqc_lqs_hash = NULL;
+ up_write(&obt->obt_rwsem);
+
ptlrpcd_decref();
#ifdef LPROCFS
lqs_get(struct hlist_node *hnode)
{
struct lustre_qunit_size *q =
- hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- atomic_inc(&q->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- q, atomic_read(&q->lqs_refcount));
+ __lqs_getref(q);
RETURN(q);
}
lqs_put(struct hlist_node *hnode)
{
struct lustre_qunit_size *q =
- hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- LASSERT(atomic_read(&q->lqs_refcount) > 0);
- atomic_dec(&q->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- q, atomic_read(&q->lqs_refcount));
+ __lqs_putref(q, 0);
RETURN(q);
}
static void
lqs_exit(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q;
+ struct lustre_qunit_size *q =
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
/*
* Nothing should be left. User of lqs put it and
* lqs also was deleted from table by this time
SHOW_QUOTA_USER2="$LFS quota -v -u $TSTUSR2 $DIR"
SHOW_QUOTA_GROUP="$LFS quota -v -g $TSTUSR $DIR"
SHOW_QUOTA_GROUP2="$LFS quota -v -g $TSTUSR2 $DIR"
-SHOW_QUOTA_INFO="$LFS quota -t -u $DIR; $LFS quota -t -g $DIR"
+SHOW_QUOTA_INFO_USER="$LFS quota -t -u $DIR"
+SHOW_QUOTA_INFO_GROUP="$LFS quota -t -g $DIR"
# control the time of tests
cycle=30
$RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(($LIMIT/2)) || quota_error u $TSTUSR "(usr) write failure, but expect success"
etime=`date +%s`
delta=$((etime - stime))
- rate=$((BLK_SZ * LIMIT / 2 / delta / 1024))
- [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec"
+ if [ $delta -gt 0 ]; then
+ rate=$((BLK_SZ * LIMIT / 2 / delta / 1024))
+ [ $rate -gt 1024 ] || error "SLOW IO for $TSTUSR (user): $rate KB/sec"
+ fi
log " Done"
log " Write out of block quota ..."
# this time maybe cache write, ignore it's failure
blk_qunit=$(( $RANDOM % 3072 + 1024 ))
blk_qtune=$(( $RANDOM % $blk_qunit ))
# other osts and mds will occupy at 1M blk quota
- b_limit=$(( ($RANDOM - 16384) / 8 + $OSTCOUNT * $blk_qunit * 4 ))
+ b_limit=$(( ($RANDOM - 16384) / 8 + ($OSTCOUNT + 1) * $blk_qunit * 4 ))
set_blk_tunesz $blk_qtune
set_blk_unitsz $blk_qunit
echo "cycle: $i(total $cycle) bunit:$blk_qunit, btune:$blk_qtune, blimit:$b_limit"
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Write before timer goes off"
$RUNDD count=$BUNIT_SZ seek=$OFFSET || \
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Write after timer goes off"
# maybe cache write, ignore.
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Unlink file to stop timer"
rm -f $TESTFILE
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Write ..."
$RUNDD count=$BUNIT_SZ || quota_error a $TSTUSR "write failure, but expect success"
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Create file after timer goes off"
# the least of inode qunit is 2, so there are at most 3(qunit:2+qtune:1)
$SHOW_QUOTA_USER
$SHOW_QUOTA_GROUP
- $SHOW_QUOTA_INFO
+ $SHOW_QUOTA_INFO_USER
+ $SHOW_QUOTA_INFO_GROUP
echo " Unlink files to stop timer"
find `dirname $TESTFILE` -name "`basename ${TESTFILE}`*" | xargs rm -f
wait_delete_completed
# every quota slave gets 20MB
- b_limit=$((OSTCOUNT * 20 * 1024))
+ b_limit=$(((OSTCOUNT + 1) * 20 * 1024))
log "limit: ${b_limit}KB"
$LFS setquota -u $TSTUSR -b 0 -B $b_limit -i 0 -I 0 $DIR
sleep 3