struct list_head qunit_hash[NR_DQHASH];
spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
+/* please sync qunit_state with qunit_state_names */
+enum qunit_state {
+ QUNIT_CREATED = 0, /* a qunit is created */
+ QUNIT_IN_HASH = 1, /* a qunit is added into qunit hash, that means
+ * a quota req will be sent or is flying */
+ QUNIT_RM_FROM_HASH = 2, /* a qunit is removed from qunit hash, that
+ * means a quota req is handled and comes
+ * back */
+ QUNIT_FINISHED = 3, /* qunit can wake up all threads waiting
+ * for it */
+};
+
+static const char *qunit_state_names[] = {
+ [QUNIT_CREATED] = "CREATED",
+ [QUNIT_IN_HASH] = "IN_HASH",
+ [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH",
+ [QUNIT_FINISHED] = "FINISHED",
+};
+
struct lustre_qunit {
struct list_head lq_hash; /* Hash list in memory */
atomic_t lq_refcnt; /* Use count */
struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */
struct qunit_data lq_data; /* See qunit_data */
unsigned int lq_opc; /* QUOTA_DQACQ, QUOTA_DQREL */
- struct list_head lq_waiters; /* Threads waiting for this qunit */
+ cfs_waitq_t lq_waitq; /* Threads waiting for this qunit */
+ spinlock_t lq_lock; /* Protect the whole structure */
+ enum qunit_state lq_state; /* Present the status of qunit */
+ int lq_rc; /* The rc of lq_data */
};
+#define QUNIT_SET_STATE(qunit, state) \
+do { \
+ spin_lock(&qunit->lq_lock); \
+ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
+ "lq_rc(%d)\n", \
+ qunit, qunit_state_names[qunit->lq_state], \
+ qunit_state_names[state], qunit->lq_rc); \
+ qunit->lq_state = state; \
+ spin_unlock(&qunit->lq_lock); \
+} while(0)
+
+#define QUNIT_SET_STATE_AND_RC(qunit, state, rc) \
+do { \
+ spin_lock(&qunit->lq_lock); \
+ qunit->lq_rc = rc; \
+ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
+ "lq_rc(%d)\n", \
+ qunit, qunit_state_names[qunit->lq_state], \
+ qunit_state_names[state], qunit->lq_rc); \
+ qunit->lq_state = state; \
+ spin_unlock(&qunit->lq_lock); \
+} while(0)
+
+
int should_translate_quota (struct obd_import *imp)
{
ENTRY;
RETURN(NULL);
INIT_LIST_HEAD(&qunit->lq_hash);
- INIT_LIST_HEAD(&qunit->lq_waiters);
+ init_waitqueue_head(&qunit->lq_waitq);
atomic_set(&qunit->lq_refcnt, 1);
qunit->lq_ctxt = qctxt;
memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
qunit->lq_opc = opc;
-
+ qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
RETURN(qunit);
}
LASSERT(list_empty(&qunit->lq_hash));
head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
list_add(&qunit->lq_hash, head);
+ QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
}
static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
LASSERT_SPIN_LOCKED(&qunit_hash_lock);
list_del_init(&qunit->lq_hash);
+ QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
}
-struct qunit_waiter {
- struct list_head qw_entry;
- cfs_waitq_t qw_waitq;
- int qw_rc;
-};
-
#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
(limit = count) : (limit += count)
{
struct lustre_qunit *qunit = NULL;
struct super_block *sb = qctxt->lqc_sb;
- struct qunit_waiter *qw, *tmp;
int err = 0;
struct quota_adjust_qunit *oqaq = NULL;
int rc1 = 0;
compute_lqs_after_removing_qunit(qunit);
/* wake up all waiters */
- spin_lock(&qunit_hash_lock);
- list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) {
- list_del_init(&qw->qw_entry);
- qw->qw_rc = rc;
- wake_up(&qw->qw_waitq);
- }
- spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+ wake_up(&qunit->lq_waitq);
qunit_put(qunit);
if (rc < 0 && rc != -EDQUOT)
RETURN(rc);
}
-static int got_qunit(struct qunit_waiter *waiter)
+static int got_qunit(struct lustre_qunit *qunit)
{
- int rc = 0;
+ int rc;
ENTRY;
- spin_lock(&qunit_hash_lock);
- rc = list_empty(&waiter->qw_entry);
- spin_unlock(&qunit_hash_lock);
+
+ spin_lock(&qunit->lq_lock);
+ switch (qunit->lq_state) {
+ case QUNIT_IN_HASH:
+ case QUNIT_RM_FROM_HASH:
+ rc = 0;
+ break;
+ case QUNIT_FINISHED:
+ rc = 1;
+ break;
+ default:
+ rc = 0;
+ CERROR("invalid qunit state %d\n", qunit->lq_state);
+ }
+ spin_unlock(&qunit->lq_lock);
RETURN(rc);
}
struct qunit_data *qdata, int opc, int wait)
{
struct lustre_qunit *qunit, *empty;
- struct qunit_waiter qw;
struct l_wait_info lwi = { 0 };
struct ptlrpc_request *req;
struct dqacq_async_args *aa;
int rc = 0;
ENTRY;
- INIT_LIST_HEAD(&qw.qw_entry);
- init_waitqueue_head(&qw.qw_waitq);
- qw.qw_rc = 0;
-
if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
RETURN(-ENOMEM);
spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, qdata);
if (qunit) {
- if (wait) {
+ if (wait)
qunit_get(qunit);
- list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
- }
spin_unlock(&qunit_hash_lock);
free_qunit(empty);
if (!qctxt->lqc_import) {
spin_unlock(&qctxt->lqc_lock);
QDATA_DEBUG(qdata, "lqc_import is invalid.\n");
+
spin_lock(&qunit_hash_lock);
remove_qunit_nolock(qunit);
- qunit = NULL;
spin_unlock(&qunit_hash_lock);
+
compute_lqs_after_removing_qunit(qunit);
- free_qunit(empty);
+
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
+ wake_up(&qunit->lq_waitq);
+
+ qunit_put(qunit);
RETURN(-EAGAIN);
}
imp = class_import_get(qctxt->lqc_import);
ptlrpc_req_set_repsize(req, 2, size);
class_import_put(imp);
- spin_lock(&qunit_hash_lock);
- if (wait && qunit) {
+ if (wait && qunit)
qunit_get(qunit);
- list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
- }
- spin_unlock(&qunit_hash_lock);
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = (struct dqacq_async_args *)&req->rq_async_args;
wait_completion:
if (wait && qunit) {
struct qunit_data *p = &qunit->lq_data;
- QDATA_DEBUG(p, "wait for dqacq.\n");
- l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
+ QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
/* rc = -EAGAIN, it means a quota req is finished;
* rc = -EDQUOT, it means out of quota
* rc = -EBUSY, it means recovery is happening
* other rc < 0, it means real errors, functions who call
* schedule_dqacq should take care of this */
- if (qw.qw_rc == 0)
+ spin_lock(&qunit->lq_lock);
+ if (qunit->lq_rc == 0)
rc = -EAGAIN;
else
- rc = qw.qw_rc;
-
- CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+ rc = qunit->lq_rc;
+ spin_unlock(&qunit->lq_lock);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
+ qunit, rc);
qunit_put(qunit);
}
RETURN(rc);
unsigned short type, int isblk)
{
struct lustre_qunit *qunit = NULL;
- struct qunit_waiter qw;
struct qunit_data qdata;
struct l_wait_info lwi = { 0 };
ENTRY;
- INIT_LIST_HEAD(&qw.qw_entry);
- init_waitqueue_head(&qw.qw_waitq);
- qw.qw_rc = 0;
-
qdata.qd_id = id;
qdata.qd_flags = type;
if (isblk)
spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, &qdata);
- if (qunit) {
+ if (qunit)
/* grab reference on this qunit to handle races with
* dqacq_completion(). Otherwise, this qunit could be freed just
* after we release the qunit_hash_lock */
qunit_get(qunit);
- list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
- }
spin_unlock(&qunit_hash_lock);
if (qunit) {
- struct qunit_data *p = &qdata;
- QDATA_DEBUG(p, "wait for dqacq completion.\n");
- l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
- QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+ struct qunit_data *p = &qunit->lq_data;
+
+ QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
+ qunit, qunit->lq_rc);
qunit_put(qunit);
}
RETURN(0);
void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
{
struct lustre_qunit *qunit, *tmp;
- struct qunit_waiter *qw, *tmp2;
struct list_head tmp_list;
int i;
ENTRY;
list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
list_del_init(&qunit->lq_hash);
compute_lqs_after_removing_qunit(qunit);
+
/* wake up all waiters */
- spin_lock(&qunit_hash_lock);
- list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters,
- qw_entry) {
- list_del_init(&qw->qw_entry);
- qw->qw_rc = 0;
- wake_up(&qw->qw_waitq);
- }
- spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0);
+ wake_up(&qunit->lq_waitq);
qunit_put(qunit);
}