Whamcloud - gitweb
Branch b1_6
authortianzy <tianzy>
Fri, 4 Apr 2008 00:51:07 +0000 (00:51 +0000)
committertianzy <tianzy>
Fri, 4 Apr 2008 00:51:07 +0000 (00:51 +0000)
Add a new item(lq_lock) in struct lustre_qunit which protects every single qunit
and qunit_hash_lock only protects qunit hash. It will improve parellelism.
i=johann
i=andrew

lustre/quota/quota_context.c
lustre/quota/quota_internal.h
lustre/tests/sanity-quota.sh

index 1c06ecb..11b7a91 100644 (file)
@@ -43,15 +43,61 @@ cfs_mem_cache_t *qunit_cachep = NULL;
 struct list_head qunit_hash[NR_DQHASH];
 spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED;
 
+/* please sync qunit_state with qunit_state_names */
+enum qunit_state {
+        QUNIT_CREATED      = 0,   /* a qunit is created */
+        QUNIT_IN_HASH      = 1,   /* a qunit is added into qunit hash, that means
+                                   * a quota req will be sent or is flying */
+        QUNIT_RM_FROM_HASH = 2,   /* a qunit is removed from qunit hash, that
+                                   * means a quota req is handled and comes
+                                   * back */
+        QUNIT_FINISHED     = 3,   /* qunit can wake up all threads waiting
+                                   * for it */
+};
+
+static const char *qunit_state_names[] = {
+        [QUNIT_CREATED]      = "CREATED",
+        [QUNIT_IN_HASH]      = "IN_HASH",
+        [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH",
+        [QUNIT_FINISHED]     = "FINISHED",
+};
+
 struct lustre_qunit {
         struct list_head lq_hash;          /* Hash list in memory */
         atomic_t lq_refcnt;                /* Use count */
         struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */
         struct qunit_data lq_data;         /* See qunit_data */
         unsigned int lq_opc;               /* QUOTA_DQACQ, QUOTA_DQREL */
-        struct list_head lq_waiters;       /* Threads waiting for this qunit */
+        cfs_waitq_t lq_waitq;              /* Threads waiting for this qunit */
+        spinlock_t lq_lock;                /* Protect the whole structure */
+        enum qunit_state lq_state;         /* Present the status of qunit */
+        int lq_rc;                         /* The rc of lq_data */
 };
 
+#define QUNIT_SET_STATE(qunit, state)                                   \
+do {                                                                    \
+        spin_lock(&qunit->lq_lock);                                     \
+        QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
+                    "lq_rc(%d)\n",                                      \
+                    qunit, qunit_state_names[qunit->lq_state],          \
+                    qunit_state_names[state], qunit->lq_rc);            \
+        qunit->lq_state = state;                                        \
+        spin_unlock(&qunit->lq_lock);                                   \
+} while(0)
+
+#define QUNIT_SET_STATE_AND_RC(qunit, state, rc)                        \
+do {                                                                    \
+        spin_lock(&qunit->lq_lock);                                     \
+        qunit->lq_rc = rc;                                              \
+        QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
+                    "lq_rc(%d)\n",                                      \
+                    qunit, qunit_state_names[qunit->lq_state],          \
+                    qunit_state_names[state], qunit->lq_rc);            \
+        qunit->lq_state = state;                                        \
+        spin_unlock(&qunit->lq_lock);                                   \
+} while(0)
+
+
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
@@ -344,12 +390,13 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
                 RETURN(NULL);
 
         INIT_LIST_HEAD(&qunit->lq_hash);
-        INIT_LIST_HEAD(&qunit->lq_waiters);
+        init_waitqueue_head(&qunit->lq_waitq);
         atomic_set(&qunit->lq_refcnt, 1);
         qunit->lq_ctxt = qctxt;
         memcpy(&qunit->lq_data, qdata, sizeof(*qdata));
         qunit->lq_opc = opc;
-
+        qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
         RETURN(qunit);
 }
 
@@ -378,6 +425,7 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
         LASSERT(list_empty(&qunit->lq_hash));
         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
         list_add(&qunit->lq_hash, head);
+        QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
 }
 
 static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
@@ -406,14 +454,9 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit)
         LASSERT_SPIN_LOCKED(&qunit_hash_lock);
 
         list_del_init(&qunit->lq_hash);
+        QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
 }
 
-struct qunit_waiter {
-        struct list_head qw_entry;
-        cfs_waitq_t      qw_waitq;
-        int qw_rc;
-};
-
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
@@ -470,7 +513,6 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 {
         struct lustre_qunit *qunit = NULL;
         struct super_block *sb = qctxt->lqc_sb;
-        struct qunit_waiter *qw, *tmp;
         int err = 0;
         struct quota_adjust_qunit *oqaq = NULL;
         int rc1 = 0;
@@ -567,13 +609,8 @@ out:
         compute_lqs_after_removing_qunit(qunit);
 
         /* wake up all waiters */
-        spin_lock(&qunit_hash_lock);
-        list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) {
-                list_del_init(&qw->qw_entry);
-                qw->qw_rc = rc;
-                wake_up(&qw->qw_waitq);
-        }
-        spin_unlock(&qunit_hash_lock);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+        wake_up(&qunit->lq_waitq);
 
         qunit_put(qunit);
         if (rc < 0 && rc != -EDQUOT)
@@ -687,13 +724,25 @@ exit:
         RETURN(rc);
 }
 
-static int got_qunit(struct qunit_waiter *waiter)
+static int got_qunit(struct lustre_qunit *qunit)
 {
-        int rc = 0;
+        int rc;
         ENTRY;
-        spin_lock(&qunit_hash_lock);
-        rc = list_empty(&waiter->qw_entry);
-        spin_unlock(&qunit_hash_lock);
+
+        spin_lock(&qunit->lq_lock);
+        switch (qunit->lq_state) {
+        case QUNIT_IN_HASH:
+        case QUNIT_RM_FROM_HASH:
+                rc = 0;
+                break;
+        case QUNIT_FINISHED:
+                rc = 1;
+                break;
+        default:
+                rc = 0;
+                CERROR("invalid qunit state %d\n", qunit->lq_state);
+        }
+        spin_unlock(&qunit->lq_lock);
         RETURN(rc);
 }
 
@@ -702,7 +751,6 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait)
 {
         struct lustre_qunit *qunit, *empty;
-        struct qunit_waiter qw;
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_request *req;
         struct dqacq_async_args *aa;
@@ -713,20 +761,14 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         int rc = 0;
         ENTRY;
 
-        INIT_LIST_HEAD(&qw.qw_entry);
-        init_waitqueue_head(&qw.qw_waitq);
-        qw.qw_rc = 0;
-
         if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
                 RETURN(-ENOMEM);
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                if (wait) {
+                if (wait)
                         qunit_get(qunit);
-                        list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
-                }
                 spin_unlock(&qunit_hash_lock);
                 free_qunit(empty);
 
@@ -767,12 +809,17 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         if (!qctxt->lqc_import) {
                 spin_unlock(&qctxt->lqc_lock);
                 QDATA_DEBUG(qdata, "lqc_import is invalid.\n");
+
                 spin_lock(&qunit_hash_lock);
                 remove_qunit_nolock(qunit);
-                qunit = NULL;
                 spin_unlock(&qunit_hash_lock);
+
                 compute_lqs_after_removing_qunit(qunit);
-                free_qunit(empty);
+
+                QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
+                wake_up(&qunit->lq_waitq);
+
+                qunit_put(qunit);
                 RETURN(-EAGAIN);
         }
         imp = class_import_get(qctxt->lqc_import);
@@ -809,12 +856,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         ptlrpc_req_set_repsize(req, 2, size);
         class_import_put(imp);
 
-        spin_lock(&qunit_hash_lock);
-        if (wait && qunit) {
+        if (wait && qunit) 
                 qunit_get(qunit);
-                list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
-        }
-        spin_unlock(&qunit_hash_lock);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = (struct dqacq_async_args *)&req->rq_async_args;
@@ -829,20 +872,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 wait_completion:
         if (wait && qunit) {
                 struct qunit_data *p = &qunit->lq_data;
-                QDATA_DEBUG(p, "wait for dqacq.\n");
 
-                l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
+                QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
                 /* rc = -EAGAIN, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
                  * rc = -EBUSY, it means recovery is happening
                  * other rc < 0, it means real errors, functions who call
                  * schedule_dqacq should take care of this */
-                if (qw.qw_rc == 0)
+                spin_lock(&qunit->lq_lock);
+                if (qunit->lq_rc == 0)
                         rc = -EAGAIN;
                 else
-                        rc = qw.qw_rc;
-
-                CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+                        rc = qunit->lq_rc;
+                spin_unlock(&qunit->lq_lock);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
+                       qunit, rc);
                 qunit_put(qunit);
         }
         RETURN(rc);
@@ -892,15 +937,10 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                          unsigned short type, int isblk)
 {
         struct lustre_qunit *qunit = NULL;
-        struct qunit_waiter qw;
         struct qunit_data qdata;
         struct l_wait_info lwi = { 0 };
         ENTRY;
 
-        INIT_LIST_HEAD(&qw.qw_entry);
-        init_waitqueue_head(&qw.qw_waitq);
-        qw.qw_rc = 0;
-
         qdata.qd_id = id;
         qdata.qd_flags = type;
         if (isblk)
@@ -909,20 +949,20 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
 
         spin_lock(&qunit_hash_lock);
         qunit = dqacq_in_flight(qctxt, &qdata);
-        if (qunit) {
+        if (qunit)
                 /* grab reference on this qunit to handle races with
                  * dqacq_completion(). Otherwise, this qunit could be freed just
                  * after we release the qunit_hash_lock */
                 qunit_get(qunit);
-                list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
-        }
         spin_unlock(&qunit_hash_lock);
 
         if (qunit) {
-                struct qunit_data *p = &qdata;
-                QDATA_DEBUG(p, "wait for dqacq completion.\n");
-                l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
-                QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
+                struct qunit_data *p = &qunit->lq_data;
+
+                QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
+                       qunit, qunit->lq_rc);
                 qunit_put(qunit);
         }
         RETURN(0);
@@ -974,7 +1014,6 @@ qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb,
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
-        struct qunit_waiter *qw, *tmp2;
         struct list_head tmp_list;
         int i;
         ENTRY;
@@ -995,15 +1034,10 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
         list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) {
                 list_del_init(&qunit->lq_hash);
                 compute_lqs_after_removing_qunit(qunit);
+
                 /* wake up all waiters */
-                spin_lock(&qunit_hash_lock);
-                list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters,
-                                         qw_entry) {
-                        list_del_init(&qw->qw_entry);
-                        qw->qw_rc = 0;
-                        wake_up(&qw->qw_waitq);
-                }
-                spin_unlock(&qunit_hash_lock);
+                QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0);
+                wake_up(&qunit->lq_waitq);
                 qunit_put(qunit);
         }
 
index 1c957f1..d269c0b 100644 (file)
@@ -48,7 +48,9 @@
         CDEBUG(D_QUOTA, "id(%u) flag(%u) type(%c) isblk(%c) count("LPU64") "  \
                "qd_qunit("LPU64"): " fmt, qd->qd_id, qd->qd_flags,            \
                QDATA_IS_GRP(qd) ? 'g' : 'u', QDATA_IS_BLK(qd) ? 'b': 'i',     \
-               qd->qd_count, qd->qd_qunit, ## arg);
+               qd->qd_count,                                                  \
+               (QDATA_IS_ADJBLK(qd) | QDATA_IS_ADJINO(qd)) ? qd->qd_qunit : 0,\
+               ## arg);
 
 #define QAQ_DEBUG(qaq, fmt, arg...)                                           \
         CDEBUG(D_QUOTA, "id(%u) flag(%u) type(%c) bunit("LPU64") "            \
index 54e44fd..c4d6b81 100644 (file)
@@ -191,7 +191,7 @@ test_1_sub() {
         rm -f $TESTFILE
        sync; sleep 1; sync;
        OST0_UUID=`do_facet ost1 $LCTL dl | grep -m1 obdfilter | awk '{print $((NF-1))}'`
-       OST0_QUOTA_USED=`do_facet ost1 $LFS quota -o $OST0_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'`
+       OST0_QUOTA_USED=`$LFS quota -o $OST0_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'`
        echo $OST0_QUOTA_USED
        [ $OST0_QUOTA_USED -ne 0 ] && \
            ($SHOW_QUOTA_USER; error "quota deleted isn't released")
@@ -222,7 +222,7 @@ test_1_sub() {
         rm -f $TESTFILE
        sync; sleep 1; sync;
        OST0_UUID=`do_facet ost1 $LCTL dl | grep -m1 obdfilter | awk '{print $((NF-1))}'`
-       OST0_QUOTA_USED=`do_facet ost1 $LFS quota -o $OST0_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'`
+       OST0_QUOTA_USED=`$LFS quota -o $OST0_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $1 }'`
        echo $OST0_QUOTA_USED
        [ $OST0_QUOTA_USED -ne 0 ] && \
            ($SHOW_QUOTA_USER; error "quota deleted isn't released")
@@ -276,7 +276,7 @@ test_2_sub() {
        sync; sleep 1; sync;
 
        MDS_UUID=`do_facet mds $LCTL dl | grep -m1 mds | awk '{print $((NF-1))}'`
-       MDS_QUOTA_USED=`do_facet mds $LFS quota -o $MDS_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'`
+       MDS_QUOTA_USED=`$LFS quota -o $MDS_UUID -u $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'`
        echo $MDS_QUOTA_USED
        [ $MDS_QUOTA_USED -ne 0 ] && \
            ($SHOW_QUOTA_USER; error "quota deleted isn't released")
@@ -304,7 +304,7 @@ test_2_sub() {
        sync; sleep 1; sync;
 
        MDS_UUID=`do_facet mds $LCTL dl | grep -m1 mds | awk '{print $((NF-1))}'`
-       MDS_QUOTA_USED=`do_facet mds $LFS quota -o $MDS_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'`
+       MDS_QUOTA_USED=`$LFS quota -o $MDS_UUID -g $TSTUSR $DIR | awk '/^.*[[:digit:]+][[:space:]+]/ { print $4 }'`
        echo $MDS_QUOTA_USED
        [ $MDS_QUOTA_USED -ne 0 ] && \
            ($SHOW_QUOTA_USER; error "quota deleted isn't released")