* b=14840 */
struct proc_dir_entry *lqc_proc_dir;
struct lprocfs_stats *lqc_stats; /* lquota statistics */
+
+ atomic_t lqc_lqs; /* the number of used hashed lqs */
+ cfs_waitq_t lqc_lqs_waitq; /* no lqs are in use */
};
#define QUOTA_MASTER_READY(qctxt) (qctxt)->lqc_setup = 1
static inline void lqs_getref(struct lustre_qunit_size *lqs)
{
- atomic_inc(&lqs->lqs_refcount);
+ if (atomic_inc_return(&lqs->lqs_refcount) == 2) /* quota_create_lqs */
+ atomic_inc(&lqs->lqs_ctxt->lqc_lqs);
+
CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
lqs, atomic_read(&lqs->lqs_refcount));
}
&lqs->lqs_key, &lqs->lqs_hash);
OBD_FREE_PTR(lqs);
} else {
- atomic_dec(&lqs->lqs_refcount);
+ if (atomic_dec_return(&lqs->lqs_refcount) == 1)
+ if (atomic_dec_and_test(&lqs->lqs_ctxt->lqc_lqs))
+ cfs_waitq_signal(&lqs->lqs_ctxt->lqc_lqs_waitq);
CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
lqs, atomic_read(&lqs->lqs_refcount));
lqs->lqs_last_ishrink = 0;
}
lqs_initref(lqs);
- rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
+
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_valid)
+ rc = -EBUSY;
+ else
+ rc = lustre_hash_add_unique(qctxt->lqc_lqs_hash,
&lqs->lqs_key, &lqs->lqs_hash);
+ spin_unlock(&qctxt->lqc_lock);
+
LQS_DEBUG(lqs, "create lqs\n");
if (!rc) {
lqs_getref(lqs);
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
-
if (b_tmp > 0)
rc |= LQS_BLK_DECREASE;
else if (b_tmp < 0)
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
+
EXIT;
out:
OBD_FREE_PTR(qctl);
RETURN(rc);
cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+ cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+ atomic_set(&qctxt->lqc_lqs, 0);
spin_lock_init(&qctxt->lqc_lock);
spin_lock(&qctxt->lqc_lock);
qctxt->lqc_handler = handler;
RETURN(rc);
}
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+ int rc;
+ ENTRY;
+
+ rc = !atomic_read(&qctxt->lqc_lqs);
+
+ RETURN(rc);
+}
+
void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
{
struct lustre_qunit *qunit, *tmp;
struct list_head tmp_list;
+ struct l_wait_info lwi = { 0 };
int i;
ENTRY;
qunit_put(qunit);
}
- lustre_hash_exit(qctxt->lqc_lqs_hash);
-
/* after qctxt_cleanup, qctxt might be freed, then check_qm() is
* unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
cfs_time_seconds(1));
}
+ l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+ lustre_hash_exit(qctxt->lqc_lqs_hash);
+
ptlrpcd_decref();
#ifdef LPROCFS
hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- atomic_inc(&q->lqs_refcount);
+ if (atomic_inc_return(&q->lqs_refcount) == 2) /* quota_search_lqs */
+ atomic_inc(&q->lqs_ctxt->lqc_lqs);
CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
q, atomic_read(&q->lqs_refcount));
ENTRY;
LASSERT(atomic_read(&q->lqs_refcount) > 0);
- atomic_dec(&q->lqs_refcount);
+
+ if (atomic_dec_return(&q->lqs_refcount) == 1)
+ if (atomic_dec_and_test(&q->lqs_ctxt->lqc_lqs))
+ cfs_waitq_signal(&q->lqs_ctxt->lqc_lqs_waitq);
+
CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
q, atomic_read(&q->lqs_refcount));