From: tianzy Date: Fri, 4 Apr 2008 00:51:07 +0000 (+0000) Subject: Branch b1_6 X-Git-Tag: v1_8_0_110~634 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=204836ffac1b254d5013f7bb1887414e32b8ab2a;p=fs%2Flustre-release.git Branch b1_6 Add a new item(lq_lock) in struct lustre_qunit which protects every single qunit and qunit_hash_lock only protects qunit hash. It will improve parellelism. i=johann i=andrew --- diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 1c06ecb..11b7a91 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -43,15 +43,61 @@ cfs_mem_cache_t *qunit_cachep = NULL; struct list_head qunit_hash[NR_DQHASH]; spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED; +/* please sync qunit_state with qunit_state_names */ +enum qunit_state { + QUNIT_CREATED = 0, /* a qunit is created */ + QUNIT_IN_HASH = 1, /* a qunit is added into qunit hash, that means + * a quota req will be sent or is flying */ + QUNIT_RM_FROM_HASH = 2, /* a qunit is removed from qunit hash, that + * means a quota req is handled and comes + * back */ + QUNIT_FINISHED = 3, /* qunit can wake up all threads waiting + * for it */ +}; + +static const char *qunit_state_names[] = { + [QUNIT_CREATED] = "CREATED", + [QUNIT_IN_HASH] = "IN_HASH", + [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH", + [QUNIT_FINISHED] = "FINISHED", +}; + struct lustre_qunit { struct list_head lq_hash; /* Hash list in memory */ atomic_t lq_refcnt; /* Use count */ struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */ struct qunit_data lq_data; /* See qunit_data */ unsigned int lq_opc; /* QUOTA_DQACQ, QUOTA_DQREL */ - struct list_head lq_waiters; /* Threads waiting for this qunit */ + cfs_waitq_t lq_waitq; /* Threads waiting for this qunit */ + spinlock_t lq_lock; /* Protect the whole structure */ + enum qunit_state lq_state; /* Present the status of qunit */ + int lq_rc; /* The rc of lq_data */ }; +#define QUNIT_SET_STATE(qunit, state) \ +do { \ + spin_lock(&qunit->lq_lock); \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc); \ + qunit->lq_state = state; \ + spin_unlock(&qunit->lq_lock); \ +} while(0) + +#define QUNIT_SET_STATE_AND_RC(qunit, state, rc) \ +do { \ + spin_lock(&qunit->lq_lock); \ + qunit->lq_rc = rc; \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc); \ + qunit->lq_state = state; \ + spin_unlock(&qunit->lq_lock); \ +} while(0) + + int should_translate_quota (struct obd_import *imp) { ENTRY; @@ -344,12 +390,13 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, RETURN(NULL); INIT_LIST_HEAD(&qunit->lq_hash); - INIT_LIST_HEAD(&qunit->lq_waiters); + init_waitqueue_head(&qunit->lq_waitq); atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; memcpy(&qunit->lq_data, qdata, sizeof(*qdata)); qunit->lq_opc = opc; - + qunit->lq_lock = SPIN_LOCK_UNLOCKED; + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); RETURN(qunit); } @@ -378,6 +425,7 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) LASSERT(list_empty(&qunit->lq_hash)); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); list_add(&qunit->lq_hash, head); + QUNIT_SET_STATE(qunit, QUNIT_IN_HASH); } static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) @@ -406,14 +454,9 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit) LASSERT_SPIN_LOCKED(&qunit_hash_lock); list_del_init(&qunit->lq_hash); + QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH); } -struct qunit_waiter { - struct list_head qw_entry; - cfs_waitq_t qw_waitq; - int qw_rc; -}; - #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ (limit = count) : (limit += count) @@ -470,7 +513,6 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, { struct lustre_qunit *qunit = NULL; struct super_block *sb = qctxt->lqc_sb; - struct qunit_waiter *qw, *tmp; int err = 0; struct quota_adjust_qunit *oqaq = NULL; int rc1 = 0; @@ -567,13 +609,8 @@ out: compute_lqs_after_removing_qunit(qunit); /* wake up all waiters */ - spin_lock(&qunit_hash_lock); - list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = rc; - wake_up(&qw->qw_waitq); - } - spin_unlock(&qunit_hash_lock); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); + wake_up(&qunit->lq_waitq); qunit_put(qunit); if (rc < 0 && rc != -EDQUOT) @@ -687,13 +724,25 @@ exit: RETURN(rc); } -static int got_qunit(struct qunit_waiter *waiter) +static int got_qunit(struct lustre_qunit *qunit) { - int rc = 0; + int rc; ENTRY; - spin_lock(&qunit_hash_lock); - rc = list_empty(&waiter->qw_entry); - spin_unlock(&qunit_hash_lock); + + spin_lock(&qunit->lq_lock); + switch (qunit->lq_state) { + case QUNIT_IN_HASH: + case QUNIT_RM_FROM_HASH: + rc = 0; + break; + case QUNIT_FINISHED: + rc = 1; + break; + default: + rc = 0; + CERROR("invalid qunit state %d\n", qunit->lq_state); + } + spin_unlock(&qunit->lq_lock); RETURN(rc); } @@ -702,7 +751,6 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait) { struct lustre_qunit *qunit, *empty; - struct qunit_waiter qw; struct l_wait_info lwi = { 0 }; struct ptlrpc_request *req; struct dqacq_async_args *aa; @@ -713,20 +761,14 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, int rc = 0; ENTRY; - INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; - if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) RETURN(-ENOMEM); spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { - if (wait) { + if (wait) qunit_get(qunit); - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - } spin_unlock(&qunit_hash_lock); free_qunit(empty); @@ -767,12 +809,17 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, if (!qctxt->lqc_import) { spin_unlock(&qctxt->lqc_lock); QDATA_DEBUG(qdata, "lqc_import is invalid.\n"); + spin_lock(&qunit_hash_lock); remove_qunit_nolock(qunit); - qunit = NULL; spin_unlock(&qunit_hash_lock); + compute_lqs_after_removing_qunit(qunit); - free_qunit(empty); + + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); + wake_up(&qunit->lq_waitq); + + qunit_put(qunit); RETURN(-EAGAIN); } imp = class_import_get(qctxt->lqc_import); @@ -809,12 +856,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ptlrpc_req_set_repsize(req, 2, size); class_import_put(imp); - spin_lock(&qunit_hash_lock); - if (wait && qunit) { + if (wait && qunit) qunit_get(qunit); - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - } - spin_unlock(&qunit_hash_lock); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = (struct dqacq_async_args *)&req->rq_async_args; @@ -829,20 +872,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, wait_completion: if (wait && qunit) { struct qunit_data *p = &qunit->lq_data; - QDATA_DEBUG(p, "wait for dqacq.\n"); - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); /* rc = -EAGAIN, it means a quota req is finished; * rc = -EDQUOT, it means out of quota * rc = -EBUSY, it means recovery is happening * other rc < 0, it means real errors, functions who call * schedule_dqacq should take care of this */ - if (qw.qw_rc == 0) + spin_lock(&qunit->lq_lock); + if (qunit->lq_rc == 0) rc = -EAGAIN; else - rc = qw.qw_rc; - - CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + rc = qunit->lq_rc; + spin_unlock(&qunit->lq_lock); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", + qunit, rc); qunit_put(qunit); } RETURN(rc); @@ -892,15 +937,10 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, unsigned short type, int isblk) { struct lustre_qunit *qunit = NULL; - struct qunit_waiter qw; struct qunit_data qdata; struct l_wait_info lwi = { 0 }; ENTRY; - INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; - qdata.qd_id = id; qdata.qd_flags = type; if (isblk) @@ -909,20 +949,20 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, &qdata); - if (qunit) { + if (qunit) /* grab reference on this qunit to handle races with * dqacq_completion(). Otherwise, this qunit could be freed just * after we release the qunit_hash_lock */ qunit_get(qunit); - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - } spin_unlock(&qunit_hash_lock); if (qunit) { - struct qunit_data *p = &qdata; - QDATA_DEBUG(p, "wait for dqacq completion.\n"); - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); - QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + struct qunit_data *p = &qunit->lq_data; + + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", + qunit, qunit->lq_rc); qunit_put(qunit); } RETURN(0); @@ -974,7 +1014,6 @@ qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb, void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; - struct qunit_waiter *qw, *tmp2; struct list_head tmp_list; int i; ENTRY; @@ -995,15 +1034,10 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) { list_del_init(&qunit->lq_hash); compute_lqs_after_removing_qunit(qunit); + /* wake up all waiters */ - spin_lock(&qunit_hash_lock); - list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters, - qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = 0; - wake_up(&qw->qw_waitq); - } - spin_unlock(&qunit_hash_lock); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0); + wake_up(&qunit->lq_waitq); qunit_put(qunit); } diff --git a/lustre/quota/quota_internal.h b/lustre/quota/quota_internal.h index 1c957f1..d269c0b 100644 --- a/lustre/quota/quota_internal.h +++ b/lustre/quota/quota_internal.h @@ -48,7 +48,9 @@ CDEBUG(D_QUOTA, "id(%u) flag(%u) type(%c) isblk(%c) count("LPU64") " \ "qd_qunit("LPU64"): " fmt, qd->qd_id, qd->qd_flags, \ QDATA_IS_GRP(qd) ? 'g' : 'u', QDATA_IS_BLK(qd) ? 'b': 'i', \ - qd->qd_count, qd->qd_qunit, ## arg); + qd->qd_count, \ + (QDATA_IS_ADJBLK(qd) | QDATA_IS_ADJINO(qd)) ? qd->qd_qunit : 0,\ + ## arg); #define QAQ_DEBUG(qaq, fmt, arg...) \ CDEBUG(D_QUOTA, "id(%u) flag(%u) type(%c) bunit("LPU64") " \ diff --git a/lustre/tests/sanity-quota.sh b/lustre/tests/sanity-quota.sh index 54e44fd..c4d6b81 100644 --- a/lustre/tests/sanity-quota.sh +++ b/lustre/tests/sanity-quota.sh @@ -191,7 +191,7 @@ test_1_sub() { rm -f $TESTFILE sync; sleep 1; sync; OST0_UUID=`do_facet ost1 $LCTL dl | grep -m1 obdfilter | awk '{print $((NF-1))}'` - OST0_QUOTA_USED=`do_facet ost1 $LFS quota -o $OST0_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'` + OST0_QUOTA_USED=`$LFS quota -o $OST0_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'` echo $OST0_QUOTA_USED [ $OST0_QUOTA_USED -ne 0 ] && \ ($SHOW_QUOTA_USER; error "quota deleted isn't released") @@ -222,7 +222,7 @@ test_1_sub() { rm -f $TESTFILE sync; sleep 1; sync; OST0_UUID=`do_facet ost1 $LCTL dl | grep -m1 obdfilter | awk '{print $((NF-1))}'` - OST0_QUOTA_USED=`do_facet ost1 $LFS quota -o $OST0_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'` + OST0_QUOTA_USED=`$LFS quota -o $OST0_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'` echo $OST0_QUOTA_USED [ $OST0_QUOTA_USED -ne 0 ] && \ ($SHOW_QUOTA_USER; error "quota deleted isn't released") @@ -276,7 +276,7 @@ test_2_sub() { sync; sleep 1; sync; MDS_UUID=`do_facet mds $LCTL dl | grep -m1 mds | awk '{print $((NF-1))}'` - MDS_QUOTA_USED=`do_facet mds $LFS quota -o $MDS_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'` + MDS_QUOTA_USED=`$LFS quota -o $MDS_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'` echo $MDS_QUOTA_USED [ $MDS_QUOTA_USED -ne 0 ] && \ ($SHOW_QUOTA_USER; error "quota deleted isn't released") @@ -304,7 +304,7 @@ test_2_sub() { sync; sleep 1; sync; MDS_UUID=`do_facet mds $LCTL dl | grep -m1 mds | awk '{print $((NF-1))}'` - MDS_QUOTA_USED=`do_facet mds $LFS quota -o $MDS_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'` + MDS_QUOTA_USED=`$LFS quota -o $MDS_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'` echo $MDS_QUOTA_USED [ $MDS_QUOTA_USED -ne 0 ] && \ ($SHOW_QUOTA_USER; error "quota deleted isn't released")