Whamcloud - gitweb
Branch HEAD
authortianzy <tianzy>
Tue, 23 Jun 2009 06:25:18 +0000 (06:25 +0000)
committertianzy <tianzy>
Tue, 23 Jun 2009 06:25:18 +0000 (06:25 +0000)
use lqs to judge which uid/gid are set quota limitaton and
which uid/gid are not, and treat them differently.
I found a few issues after landing the patch for 18574.(some are caused by it, some are easier to
be reproduced by it)
1. in setquota, quota slaves will set their quota limitation to 1 and then get more quota. But in
this time, it's lqs size is the default value. If we have set a small quota limitation, we will
meet -EDQUOT. So we should adjust qunit firstly in this situation.
2. filter_quota_ctl(), we should promise there is no qunit is calculated or sent between
qctxt_wait_pending_dqacq() and fsfilt_quotactl(). Now I did this via a "fake" qunit.
b=18574
i=johann
i=panda
aVS: ----------------------------------------------------------------------

12 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_quota.h
lustre/lvfs/fsfilt_ext3.c
lustre/lvfs/lustre_quota_fmt.c
lustre/quota/lproc_quota.c
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/quota/quota_interface.c
lustre/quota/quota_internal.h
lustre/quota/quota_master.c
lustre/tests/sanity-quota.sh

index 099a552..943786e 100644 (file)
@@ -1420,7 +1420,7 @@ struct quota_adjust_qunit {
 };
 extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q);
 
 };
 extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q);
 
-/* flags in qunit_data and quota_adjust_qunit will use macroes below */
+/* flags is shared among quota structures */
 #define LQUOTA_FLAGS_GRP       1UL   /* 0 is user, 1 is group */
 #define LQUOTA_FLAGS_BLK       2UL   /* 0 is inode, 1 is block */
 #define LQUOTA_FLAGS_ADJBLK    4UL   /* adjust the block qunit size */
 #define LQUOTA_FLAGS_GRP       1UL   /* 0 is user, 1 is group */
 #define LQUOTA_FLAGS_BLK       2UL   /* 0 is inode, 1 is block */
 #define LQUOTA_FLAGS_ADJBLK    4UL   /* adjust the block qunit size */
@@ -1428,16 +1428,21 @@ extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q);
 #define LQUOTA_FLAGS_CHG_QS   16UL   /* indicate whether it has capability of
                                       * OBD_CONNECT_CHANGE_QS */
 
 #define LQUOTA_FLAGS_CHG_QS   16UL   /* indicate whether it has capability of
                                       * OBD_CONNECT_CHANGE_QS */
 
-/* the status of lqsk_flags in struct lustre_qunit_size_key */
+/* flags is specific for quota_adjust_qunit */
+#define LQUOTA_QAQ_CEATE_LQS  (1 << 31) /* when it is set, need create lqs */
+
+/* the status of lqs_flags in struct lustre_qunit_size  */
 #define LQUOTA_QUNIT_FLAGS (LQUOTA_FLAGS_GRP | LQUOTA_FLAGS_BLK)
 
 #define LQUOTA_QUNIT_FLAGS (LQUOTA_FLAGS_GRP | LQUOTA_FLAGS_BLK)
 
-#define QAQ_IS_GRP(qaq)    ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
-#define QAQ_IS_ADJBLK(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
-#define QAQ_IS_ADJINO(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_GRP(qaq)         ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
+#define QAQ_IS_ADJBLK(qaq)      ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
+#define QAQ_IS_ADJINO(qaq)      ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_CREATE_LQS(qaq)  ((qaq)->qaq_flags & LQUOTA_QAQ_CEATE_LQS)
 
 
-#define QAQ_SET_GRP(qaq)    ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
-#define QAQ_SET_ADJBLK(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
-#define QAQ_SET_ADJINO(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_GRP(qaq)        ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
+#define QAQ_SET_ADJBLK(qaq)     ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
+#define QAQ_SET_ADJINO(qaq)     ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_CREATE_LQS(qaq) ((qaq)->qaq_flags |= LQUOTA_QAQ_CEATE_LQS)
 
 /* inode access permission for remote user, the inode info are omitted,
  * for client knows them. */
 
 /* inode access permission for remote user, the inode info are omitted,
  * for client knows them. */
index 838c460..5c154ad 100644 (file)
@@ -179,7 +179,12 @@ struct lustre_dquot {
 struct dquot_id {
         struct list_head        di_link;
         __u32                   di_id;
 struct dquot_id {
         struct list_head        di_link;
         __u32                   di_id;
+        __u32                   di_flag;
 };
 };
+/* set inode quota limitation on a quota uid/gid */
+#define QI_SET                (1 << 30)
+/* set block quota limitation on a quota uid/gid */
+#define QB_SET                (1 << 31)
 
 #define QFILE_CHK               1
 #define QFILE_RD_INFO           2
 
 #define QFILE_CHK               1
 #define QFILE_RD_INFO           2
@@ -315,7 +320,9 @@ struct lustre_quota_ctxt {
 struct lustre_qunit_size {
         struct hlist_node lqs_hash; /** the hash entry */
         unsigned int lqs_id;        /** id of user/group */
 struct lustre_qunit_size {
         struct hlist_node lqs_hash; /** the hash entry */
         unsigned int lqs_id;        /** id of user/group */
-        unsigned long lqs_flags;    /** is user/group; FULLBUF or LESSBUF */
+        unsigned long lqs_flags;    /** 31st bit is QB_SET, 30th bit is QI_SET
+                                     * other bits are same as LQUOTA_FLAGS_*
+                                     */
         unsigned long lqs_iunit_sz; /** Unit size of file quota currently */
         /**
          * Trigger dqacq when available file quota
         unsigned long lqs_iunit_sz; /** Unit size of file quota currently */
         /**
          * Trigger dqacq when available file quota
index dee551b..e7655e7 100644 (file)
@@ -1997,7 +1997,7 @@ static int fsfilt_ext3_quotacheck(struct super_block *sb,
                                 cqget(sb, qctxt->qckt_hash, &qctxt->qckt_list,
                                       dqid->di_id, i,
                                       qctxt->qckt_first_check[i]);
                                 cqget(sb, qctxt->qckt_hash, &qctxt->qckt_list,
                                       dqid->di_id, i,
                                       qctxt->qckt_first_check[i]);
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
 #endif
                 }
         }
 #endif
index 83638d4..e633f15 100644 (file)
@@ -1127,11 +1127,16 @@ int lustre_get_qids(struct file *fp, struct inode *inode, int type,
                                     (char *)&ddquot[i], dqblk_sz))
                                 continue;
 
                                     (char *)&ddquot[i], dqblk_sz))
                                 continue;
 
-                        dqid = kmalloc(sizeof(*dqid), GFP_NOFS);
-                        if (!dqid) 
+                        OBD_ALLOC_GFP(dqid, sizeof(*dqid), GFP_NOFS);
+                        if (!dqid)
                                 GOTO(out_free, rc = -ENOMEM);
 
                                 GOTO(out_free, rc = -ENOMEM);
 
-                        dqid->di_id = le32_to_cpu(ddquot[i].dqb_id);
+                        dqid->di_id    = le32_to_cpu(ddquot[i].dqb_id);
+                        dqid->di_flag  = le64_to_cpu(ddquot[i].dqb_ihardlimit) ?
+                                         QI_SET : 0;
+                        dqid->di_flag |= le64_to_cpu(ddquot[i].dqb_bhardlimit) ?
+                                         QB_SET : 0;
+
                         INIT_LIST_HEAD(&dqid->di_link);
                         list_add(&dqid->di_link, list);
                 }
                         INIT_LIST_HEAD(&dqid->di_link);
                         list_add(&dqid->di_link, list);
                 }
index 58d88f7..93a2882 100644 (file)
@@ -313,8 +313,10 @@ int lprocfs_quota_wr_type(struct file *file, const char *buffer,
                 }
         }
 
                 }
         }
 
-        if (type != 0)
+        if (type != 0) {
                 auto_quota_on(obd, type - 1, obt->obt_sb, is_mds);
                 auto_quota_on(obd, type - 1, obt->obt_sb, is_mds);
+                build_lqs(obd);
+        }
 
         return count;
 }
 
         return count;
 }
index a3ef7ef..2c217c0 100644 (file)
@@ -139,25 +139,34 @@ struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
                                            struct lustre_quota_ctxt *qctxt,
                                            int create)
 {
                                            struct lustre_quota_ctxt *qctxt,
                                            int create)
 {
-        int rc = 0;
         struct lustre_qunit_size *lqs;
         struct lustre_qunit_size *lqs;
+        int rc = 0;
 
  search_lqs:
         lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key);
 
  search_lqs:
         lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key);
-        if (lqs == NULL && create) {
+        if (IS_ERR(lqs))
+                GOTO(out, rc = PTR_ERR(lqs));
+
+        if (create && lqs == NULL) {
+                /* if quota_create_lqs is successful, it will get a
+                 * ref to the lqs. The ref will be released when
+                 * qctxt_cleanup() or quota is nullified */
                 lqs = quota_create_lqs(lqs_key, qctxt);
                 if (IS_ERR(lqs))
                         rc = PTR_ERR(lqs);
                 lqs = quota_create_lqs(lqs_key, qctxt);
                 if (IS_ERR(lqs))
                         rc = PTR_ERR(lqs);
-                if (rc == -EALREADY) {
-                        rc = 0;
-                        goto search_lqs;
-                }
+                if (rc == -EALREADY)
+                        GOTO(search_lqs, rc = 0);
+                /* get a reference for the caller when creating lqs
+                 * successfully */
+                if (rc == 0)
+                        lqs_getref(lqs);
         }
 
         }
 
-        if (lqs)
+        if (lqs && rc == 0)
                 LQS_DEBUG(lqs, "%s\n",
                           (create == 1 ? "create lqs" : "search lqs"));
 
                 LQS_DEBUG(lqs, "%s\n",
                           (create == 1 ? "create lqs" : "search lqs"));
 
+ out:
         if (rc == 0) {
                 return lqs;
         } else {
         if (rc == 0) {
                 return lqs;
         } else {
@@ -170,98 +179,75 @@ int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq,
                            struct lustre_quota_ctxt *qctxt)
 {
         struct lustre_qunit_size *lqs = NULL;
                            struct lustre_quota_ctxt *qctxt)
 {
         struct lustre_qunit_size *lqs = NULL;
-        unsigned long *lbunit, *liunit, *lbtune, *litune;
-        signed long b_tmp = 0, i_tmp = 0;
-        cfs_time_t time_limit = 0;
-        int rc = 0;
+        unsigned long *unit, *tune;
+        signed long tmp = 0;
+        cfs_time_t time_limit = 0, *shrink;
+        int i, rc = 0;
         ENTRY;
 
         LASSERT(qctxt);
         lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id),
         ENTRY;
 
         LASSERT(qctxt);
         lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id),
-                               qctxt, 1);
-        if (lqs == NULL || IS_ERR(lqs))
+                               qctxt, QAQ_IS_CREATE_LQS(oqaq) ? 1 : 0);
+        if (lqs == NULL || IS_ERR(lqs)){
+                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+                       QAQ_IS_GRP(oqaq) ? "group" : "user", oqaq->qaq_id);
                 RETURN(PTR_ERR(lqs));
                 RETURN(PTR_ERR(lqs));
-
-        /* deleting the lqs, because a user sets lfs quota 0 0 0 0  */
-        if (!oqaq->qaq_bunit_sz && !oqaq->qaq_iunit_sz && QAQ_IS_ADJBLK(oqaq) &&
-            QAQ_IS_ADJINO(oqaq)) {
-                LQS_DEBUG(lqs, "release lqs\n");
-                /* this is for quota_search_lqs */
-                lqs_putref(lqs);
-                /* kill lqs */
-                lqs_putref(lqs);
-                RETURN(rc);
         }
 
         }
 
-        lbunit = &lqs->lqs_bunit_sz;
-        liunit = &lqs->lqs_iunit_sz;
-        lbtune = &lqs->lqs_btune_sz;
-        litune = &lqs->lqs_itune_sz;
-
-        CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
+        CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n",
+               lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
         spin_lock(&lqs->lqs_lock);
         spin_lock(&lqs->lqs_lock);
-        /* adjust the slave's block qunit size */
-        if (QAQ_IS_ADJBLK(oqaq)) {
-                cfs_duration_t sec = cfs_time_seconds(qctxt->lqc_switch_seconds);
-
-                b_tmp = *lbunit - oqaq->qaq_bunit_sz;
-
-                if (qctxt->lqc_handler && b_tmp > 0)
-                        lqs->lqs_last_bshrink = cfs_time_current();
-
-                if (qctxt->lqc_handler && b_tmp < 0) {
-                        time_limit = cfs_time_add(lqs->lqs_last_bshrink, sec);
-                        if (!lqs->lqs_last_bshrink ||
-                            cfs_time_after(cfs_time_current(), time_limit)) {
-                                *lbunit = oqaq->qaq_bunit_sz;
-                                *lbtune = (*lbunit) / 2;
-                        } else {
-                                b_tmp = 0;
-                        }
-                } else {
-                        *lbunit = oqaq->qaq_bunit_sz;
-                        *lbtune = (*lbunit) / 2;
+        for (i = 0; i < 2; i++) {
+                if (i == 0 && !QAQ_IS_ADJBLK(oqaq))
+                        continue;
+
+                if (i == 1 && !QAQ_IS_ADJINO(oqaq))
+                        continue;
+
+                tmp = i ? (lqs->lqs_iunit_sz - oqaq->qaq_iunit_sz) :
+                          (lqs->lqs_bunit_sz - oqaq->qaq_bunit_sz);
+                shrink = i ? &lqs->lqs_last_ishrink :
+                             &lqs->lqs_last_bshrink;
+                time_limit = cfs_time_add(i ? lqs->lqs_last_ishrink :
+                                              lqs->lqs_last_bshrink,
+                                   cfs_time_seconds(qctxt->lqc_switch_seconds));
+                unit = i ? &lqs->lqs_iunit_sz : &lqs->lqs_bunit_sz;
+                tune = i ? &lqs->lqs_itune_sz : &lqs->lqs_btune_sz;
+
+                /* quota master shrinks */
+                if (qctxt->lqc_handler && tmp > 0)
+                        *shrink = cfs_time_current();
+
+                /* quota master enlarges */
+                if (qctxt->lqc_handler && tmp < 0) {
+                        /* in case of ping-pong effect, don't enlarge lqs
+                         * in a short time */
+                        if (*shrink &&
+                            cfs_time_before(cfs_time_current(), time_limit))
+                                tmp = 0;
                 }
                 }
-        }
 
 
-        /* adjust the slave's file qunit size */
-        if (QAQ_IS_ADJINO(oqaq)) {
-                i_tmp = *liunit - oqaq->qaq_iunit_sz;
-
-                if (qctxt->lqc_handler && i_tmp > 0)
-                        lqs->lqs_last_ishrink  = cfs_time_current();
-
-                if (qctxt->lqc_handler && i_tmp < 0) {
-                        time_limit = cfs_time_add(lqs->lqs_last_ishrink,
-                                                  cfs_time_seconds(qctxt->
-                                                  lqc_switch_seconds));
-                        if (!lqs->lqs_last_ishrink ||
-                            cfs_time_after(cfs_time_current(), time_limit)) {
-                                *liunit = oqaq->qaq_iunit_sz;
-                                *litune = (*liunit) / 2;
-                        } else {
-                                i_tmp = 0;
-                        }
-                } else {
-                        *liunit = oqaq->qaq_iunit_sz;
-                        *litune = (*liunit) / 2;
+                /* when setquota, don't enlarge lqs b=18616 */
+                if (QAQ_IS_CREATE_LQS(oqaq) && tmp < 0)
+                        tmp = 0;
+
+                if (tmp != 0) {
+                        *unit = i ? oqaq->qaq_iunit_sz : oqaq->qaq_bunit_sz;
+                        *tune = (*unit) / 2;
                 }
                 }
+
+
+                if (tmp > 0)
+                        rc |= i ? LQS_INO_DECREASE : LQS_BLK_DECREASE;
+                if (tmp < 0)
+                        rc |= i ? LQS_INO_INCREASE : LQS_BLK_INCREASE;
         }
         spin_unlock(&lqs->lqs_lock);
         }
         spin_unlock(&lqs->lqs_lock);
-        CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
+        CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n",
+               lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
 
         lqs_putref(lqs);
 
 
         lqs_putref(lqs);
 
-        if (b_tmp > 0)
-                rc |= LQS_BLK_DECREASE;
-        else if (b_tmp < 0)
-                rc |= LQS_BLK_INCREASE;
-
-        if (i_tmp > 0)
-                rc |= LQS_INO_DECREASE;
-        else if (i_tmp < 0)
-                rc |= LQS_INO_INCREASE;
-
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index c96cbc2..847851d 100644 (file)
@@ -141,7 +141,6 @@ do {                                                                    \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
 
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
 
-
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
@@ -289,9 +288,12 @@ check_cur_qunit(struct obd_device *obd,
                 GOTO(out, ret = 0);
 
         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
                 GOTO(out, ret = 0);
 
         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
-                               qctxt, 1);
-        if (IS_ERR(lqs))
-                GOTO (out, ret = PTR_ERR(lqs));
+                               qctxt, 0);
+        if (IS_ERR(lqs) || lqs == NULL) {
+                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+                       QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
+                GOTO (out, ret = 0);
+        }
         spin_lock(&lqs->lqs_lock);
 
         if (QDATA_IS_BLK(qdata)) {
         spin_lock(&lqs->lqs_lock);
 
         if (QDATA_IS_BLK(qdata)) {
@@ -509,6 +511,70 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit)
         qunit_put(qunit);
 }
 
         qunit_put(qunit);
 }
 
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+                    struct obd_quotactl *oqctl, int isblk)
+{
+        struct lustre_qunit *qunit, *find_qunit;
+        int cycle = 1;
+
+        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        if (qunit == NULL) {
+                CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
+                       oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+                qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                         oqctl->qc_type, isblk);
+                return NULL;
+        }
+
+        INIT_LIST_HEAD(&qunit->lq_hash);
+        qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+        init_waitqueue_head(&qunit->lq_waitq);
+        atomic_set(&qunit->lq_refcnt, 1);
+        qunit->lq_ctxt = qctxt;
+        qunit->lq_data.qd_id = oqctl->qc_id;
+        qunit->lq_data.qd_flags =  oqctl->qc_type;
+        if (isblk)
+                QDATA_SET_BLK(&qunit->lq_data);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        /* it means it is only an invalid qunit for barrier */
+        qunit->lq_opc = QUOTA_LAST_OPC;
+
+        while (1) {
+                spin_lock(&qunit_hash_lock);
+                find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+                if (find_qunit) {
+                        spin_unlock(&qunit_hash_lock);
+                        qunit_put(find_qunit);
+                        qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                                 oqctl->qc_type, isblk);
+                        CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+                        continue;
+                }
+                break;
+        }
+        insert_qunit_nolock(qctxt, qunit);
+        spin_unlock(&qunit_hash_lock);
+        return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+        struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+        if (qunit == NULL) {
+                CERROR("handle is NULL\n");
+                return;
+        }
+
+        LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+        spin_lock(&qunit_hash_lock);
+        remove_qunit_nolock(qunit);
+        spin_unlock(&qunit_hash_lock);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+        wake_up(&qunit->lq_waitq);
+        qunit_put(qunit);
+}
+
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
@@ -1038,8 +1104,7 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0)
                 RETURN(0);
 
         for (i = 0; i < MAXQUOTAS; i++) {
                 RETURN(0);
 
         for (i = 0; i < MAXQUOTAS; i++) {
@@ -1184,6 +1249,12 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+
+void hash_put_lqs(void *obj, void *data)
+{
+        lqs_putref((struct lustre_qunit_size *)obj);
+}
+
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
@@ -1219,6 +1290,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                 qunit_put(qunit);
         }
 
                 qunit_put(qunit);
         }
 
+        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
         down_write(&obt->obt_rwsem);
         lustre_hash_exit(qctxt->lqc_lqs_hash);
         qctxt->lqc_lqs_hash = NULL;
         down_write(&obt->obt_rwsem);
         lustre_hash_exit(qctxt->lqc_lqs_hash);
         qctxt->lqc_lqs_hash = NULL;
@@ -1329,7 +1401,7 @@ static int qslave_recovery_main(void *arg)
                                        "qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
                                        "qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
 
                 }
         }
 
index 6af3712..8a982c1 100644 (file)
 
 #ifdef HAVE_QUOTA_SUPPORT
 #ifdef __KERNEL__
 
 #ifdef HAVE_QUOTA_SUPPORT
 #ifdef __KERNEL__
+
+/* When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574 */
+void build_lqs(struct obd_device *obd)
+{
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct list_head id_list;
+        int i, rc;
+
+        INIT_LIST_HEAD(&id_list);
+        for (i = 0; i < MAXQUOTAS; i++) {
+                struct dquot_id *dqid, *tmp;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+                rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+                                 i, &id_list);
+#else
+                rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+                                 i, &id_list);
+#endif
+                if (rc) {
+                        CDEBUG(D_ERROR, "fail to get %s qids!\n",
+                               i ? "group" : "user");
+                        continue;
+                }
+
+                list_for_each_entry_safe(dqid, tmp, &id_list,
+                                         di_link) {
+                        struct lustre_qunit_size *lqs;
+
+                        list_del_init(&dqid->di_link);
+                        lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+                                               qctxt, 1);
+                        if (lqs && !IS_ERR(lqs)) {
+                                lqs->lqs_flags |= dqid->di_flag;
+                                lqs_putref(lqs);
+                        } else {
+                                CDEBUG(D_ERROR, "fail to create a lqs"
+                                       "(%s id: %u)!\n", i ? "group" : "user",
+                                       dqid->di_id);
+                        }
+
+                        OBD_FREE_PTR(dqid);
+                }
+        }
+}
+
 int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
                   struct obd_quotactl *oqctl)
 {
 int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
                   struct obd_quotactl *oqctl)
 {
@@ -80,6 +128,8 @@ int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
         case Q_QUOTAON:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
                 rc = mds_quota_on(obd, oqctl);
         case Q_QUOTAON:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
                 rc = mds_quota_on(obd, oqctl);
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                build_lqs(obd);
                 break;
         case Q_QUOTAOFF:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
                 break;
         case Q_QUOTAOFF:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
@@ -132,6 +182,8 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
         struct obd_device_target *obt = &obd->u.obt;
         struct lvfs_run_ctxt saved;
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         struct obd_device_target *obt = &obd->u.obt;
         struct lvfs_run_ctxt saved;
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct lustre_qunit_size *lqs;
+        void *handle = NULL;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
@@ -161,17 +213,18 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
         case Q_GETQUOTA:
                 /* In recovery scenario, this pending dqacq/dqrel might have
                  * been processed by master successfully before it's dquot
         case Q_GETQUOTA:
                 /* In recovery scenario, this pending dqacq/dqrel might have
                  * been processed by master successfully before it's dquot
-                 * on master enter recovery mode. We must wait for this 
+                 * on master enter recovery mode. We must wait for this
                  * dqacq/dqrel done then return the correct limits to master */
                 if (oqctl->qc_stat == QUOTA_RECOVERING)
                  * dqacq/dqrel done then return the correct limits to master */
                 if (oqctl->qc_stat == QUOTA_RECOVERING)
-                        qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                                 oqctl->qc_id, oqctl->qc_type,
-                                                 1);
+                        handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
+                if (oqctl->qc_stat == QUOTA_RECOVERING)
+                        quota_unbarrier(handle);
+
                 if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
                     oqctl->qc_cmd == Q_FINVALIDATE) {
                         if (!rc && oqctl->qc_cmd == Q_QUOTAON)
                 if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
                     oqctl->qc_cmd == Q_FINVALIDATE) {
                         if (!rc && oqctl->qc_cmd == Q_QUOTAON)
@@ -180,11 +233,14 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
                         atomic_inc(&obt->obt_quotachecking);
                 }
                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
                         atomic_inc(&obt->obt_quotachecking);
                 }
+
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                if (oqctl->qc_cmd == Q_QUOTAON)
+                        build_lqs(obd);
                 break;
         case Q_SETQUOTA:
                 /* currently, it is only used for nullifying the quota */
                 break;
         case Q_SETQUOTA:
                 /* currently, it is only used for nullifying the quota */
-                qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                         oqctl->qc_id, oqctl->qc_type, 1);
+                handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
@@ -195,6 +251,17 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                         oqctl->qc_cmd = Q_SETQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                         oqctl->qc_cmd = Q_SETQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                quota_unbarrier(handle);
+
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 0);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                } else {
+                        lqs->lqs_flags &= ~QB_SET;
+                        lqs_putref(lqs);
+                }
+
                 break;
         case Q_INITQUOTA:
                 {
                 break;
         case Q_INITQUOTA:
                 {
@@ -204,15 +271,13 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                 LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
                 LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
 
                 LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
                 LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
 
-                /* There might be a pending dqacq/dqrel (which is going to
-                 * clear stale limits on slave). we should wait for it's
-                 * completion then initialize limits */
-                qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                         oqctl->qc_id, oqctl->qc_type, 1);
-
                 if (!oqctl->qc_dqblk.dqb_bhardlimit)
                         goto adjust;
 
                 if (!oqctl->qc_dqblk.dqb_bhardlimit)
                         goto adjust;
 
+               /* There might be a pending dqacq/dqrel (which is going to
+                 * clear stale limits on slave). we should wait for it's
+                 * completion then initialize limits */
+                handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
                 LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
                 LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
@@ -225,10 +290,21 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                         oqctl->qc_cmd = Q_INITQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                         oqctl->qc_cmd = Q_INITQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                quota_unbarrier(handle);
 
                 if (rc)
                         RETURN(rc);
 adjust:
 
                 if (rc)
                         RETURN(rc);
 adjust:
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 1);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                        break;
+                } else {
+                        lqs->lqs_flags |= QB_SET;
+                        lqs_putref(lqs);
+                }
+
                 /* Trigger qunit pre-acquire */
                 if (oqctl->qc_type == USRQUOTA)
                         id[USRQUOTA] = oqctl->qc_id;
                 /* Trigger qunit pre-acquire */
                 if (oqctl->qc_type == USRQUOTA)
                         id[USRQUOTA] = oqctl->qc_id;
index 20471a1..e6d376f 100644 (file)
@@ -264,10 +264,6 @@ static int quota_check_common(struct obd_device *obd, const unsigned int id[],
         int rc = 0, rc2[2] = { 0, 0 };
         ENTRY;
 
         int rc = 0, rc2[2] = { 0, 0 };
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
-                RETURN(rc);
-
         spin_lock(&qctxt->lqc_lock);
         if (!qctxt->lqc_valid){
                 spin_unlock(&qctxt->lqc_lock);
         spin_lock(&qctxt->lqc_lock);
         if (!qctxt->lqc_valid){
                 spin_unlock(&qctxt->lqc_lock);
@@ -379,6 +375,27 @@ static int quota_check_common(struct obd_device *obd, const unsigned int id[],
                 RETURN(rc);
 }
 
                 RETURN(rc);
 }
 
+int quota_is_set(struct obd_device *obd, const unsigned int id[], int flag)
+{
+        struct lustre_qunit_size *lqs;
+        int i, q_set = 0;
+
+        if (!sb_any_quota_enabled(obd->u.obt.obt_qctxt.lqc_sb))
+                RETURN(0);
+
+        for (i = 0; i < MAXQUOTAS; i++) {
+                lqs = quota_search_lqs(LQS_KEY(i, id[i]),
+                                       &obd->u.obt.obt_qctxt, 0);
+                if (lqs && !IS_ERR(lqs)) {
+                        if (lqs->lqs_flags & flag)
+                                q_set = 1;
+                        lqs_putref(lqs);
+                }
+        }
+
+        return q_set;
+}
+
 static int quota_chk_acq_common(struct obd_device *obd, const unsigned int id[],
                                 int pending[], int count, quota_acquire acquire,
                                 struct obd_trans_info *oti, int isblk,
 static int quota_chk_acq_common(struct obd_device *obd, const unsigned int id[],
                                 int pending[], int count, quota_acquire acquire,
                                 struct obd_trans_info *oti, int isblk,
@@ -392,6 +409,9 @@ static int quota_chk_acq_common(struct obd_device *obd, const unsigned int id[],
         int rc = 0, cycle = 0, count_err = 1;
         ENTRY;
 
         int rc = 0, cycle = 0, count_err = 1;
         ENTRY;
 
+        if (!quota_is_set(obd, id, isblk ? QB_SET : QI_SET))
+                RETURN(0);
+
         CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
         pending[USRQUOTA] = pending[GRPQUOTA] = 0;
         /* Unfortunately, if quota master is too busy to handle the
         CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
         pending[USRQUOTA] = pending[GRPQUOTA] = 0;
         /* Unfortunately, if quota master is too busy to handle the
index e7a633d..692e562 100644 (file)
@@ -114,6 +114,9 @@ int compute_remquota(struct obd_device *obd,
                      int isblk);
 int check_qm(struct lustre_quota_ctxt *qctxt);
 void dqacq_interrupt(struct lustre_quota_ctxt *qctxt);
                      int isblk);
 int check_qm(struct lustre_quota_ctxt *qctxt);
 void dqacq_interrupt(struct lustre_quota_ctxt *qctxt);
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+                    struct obd_quotactl *oqctl, int isblk);
+void quota_unbarrier(void *handle);
 /* quota_master.c */
 int lustre_dquot_init(void);
 void lustre_dquot_exit(void);
 /* quota_master.c */
 int lustre_dquot_init(void);
 void lustre_dquot_exit(void);
@@ -155,6 +158,7 @@ int target_quota_check(struct obd_device *obd, struct obd_export *exp,
 int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct
                           lustre_quota_ctxt *qctxt);
 #ifdef __KERNEL__
 int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct
                           lustre_quota_ctxt *qctxt);
 #ifdef __KERNEL__
+int quota_is_set(struct obd_device *obd, const unsigned int id[], int flag);
 struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
                                            struct lustre_quota_ctxt *qctxt,
                                            int create);
 struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
                                            struct lustre_quota_ctxt *qctxt,
                                            int create);
@@ -171,6 +175,7 @@ int filter_quota_adjust_qunit(struct obd_export *exp,
                               struct lustre_quota_ctxt *qctxt);
 int lquota_proc_setup(struct obd_device *obd, int is_master);
 int lquota_proc_cleanup(struct lustre_quota_ctxt *qctxt);
                               struct lustre_quota_ctxt *qctxt);
 int lquota_proc_setup(struct obd_device *obd, int is_master);
 int lquota_proc_cleanup(struct lustre_quota_ctxt *qctxt);
+void build_lqs(struct obd_device *obd);
 
 extern cfs_proc_dir_entry_t *lquota_type_proc_dir;
 #endif
 
 extern cfs_proc_dir_entry_t *lquota_type_proc_dir;
 #endif
index 7753bff..3c9f1c9 100644 (file)
@@ -1069,28 +1069,20 @@ int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt,
 
         }
 
 
         }
 
-        if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit &&
-            !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) {
-                oqaq->qaq_bunit_sz = 0;
-                oqaq->qaq_iunit_sz = 0;
-                QAQ_SET_ADJBLK(oqaq);
-                QAQ_SET_ADJINO(oqaq);
-        }
-
         QAQ_DEBUG(oqaq, "the oqaq computed\n");
 
         RETURN(rc);
 }
 
 static int mds_init_slave_ilimits(struct obd_device *obd,
         QAQ_DEBUG(oqaq, "the oqaq computed\n");
 
         RETURN(rc);
 }
 
 static int mds_init_slave_ilimits(struct obd_device *obd,
-                                  struct obd_quotactl *oqctl, int set,
-                                  struct quota_adjust_qunit *oqaq)
+                                  struct obd_quotactl *oqctl, int set)
 {
         /* XXX: for file limits only adjust local now */
         struct obd_device_target *obt = &obd->u.obt;
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         unsigned int id[MAXQUOTAS] = { 0, 0 };
         struct obd_quotactl *ioqc = NULL;
 {
         /* XXX: for file limits only adjust local now */
         struct obd_device_target *obt = &obd->u.obt;
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         unsigned int id[MAXQUOTAS] = { 0, 0 };
         struct obd_quotactl *ioqc = NULL;
+        struct lustre_qunit_size *lqs;
         int flag;
         int rc;
         ENTRY;
         int flag;
         int rc;
         ENTRY;
@@ -1112,12 +1104,21 @@ static int mds_init_slave_ilimits(struct obd_device *obd,
         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
 
         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
 
-        if (QAQ_IS_ADJINO(oqaq)) {
-                /* adjust the mds slave's inode qunit size */
-                rc = quota_adjust_slave_lqs(oqaq, qctxt);
-                if (rc < 0)
-                        CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \
-                               failed! (rc:%d)\n", rc);
+        /* build lqs for mds */
+        lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                               qctxt, flag ? 1 : 0);
+        if (lqs && !IS_ERR(lqs)) {
+                if (flag)
+                        lqs->lqs_flags |= QI_SET;
+                else
+                        lqs->lqs_flags &= ~QI_SET;
+                lqs_putref(lqs);
+        } else {
+                CERROR("fail to %s lqs for inode(%s id: %u)!\n",
+                       flag ? "create" : "search",
+                       oqctl->qc_type ? "group" : "user",
+                       oqctl->qc_id);
+                GOTO(out, rc = PTR_ERR(lqs));
         }
 
         /* set local limit to MIN_QLIMIT */
         }
 
         /* set local limit to MIN_QLIMIT */
@@ -1150,15 +1151,15 @@ out:
 }
 
 static int mds_init_slave_blimits(struct obd_device *obd,
 }
 
 static int mds_init_slave_blimits(struct obd_device *obd,
-                                  struct obd_quotactl *oqctl, int set,
-                                  struct quota_adjust_qunit *oqaq)
+                                  struct obd_quotactl *oqctl, int set)
 {
         struct obd_device_target *obt = &obd->u.obt;
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         struct mds_obd *mds = &obd->u.mds;
         struct obd_quotactl *ioqc;
 {
         struct obd_device_target *obt = &obd->u.obt;
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         struct mds_obd *mds = &obd->u.mds;
         struct obd_quotactl *ioqc;
+        struct lustre_qunit_size *lqs;
         unsigned int id[MAXQUOTAS] = { 0, 0 };
         unsigned int id[MAXQUOTAS] = { 0, 0 };
-        int rc, rc1 = 0;
+        int rc;
         int flag;
         ENTRY;
 
         int flag;
         ENTRY;
 
@@ -1178,12 +1179,22 @@ static int mds_init_slave_blimits(struct obd_device *obd,
         ioqc->qc_type = oqctl->qc_type;
         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
         ioqc->qc_type = oqctl->qc_type;
         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
-        if (QAQ_IS_ADJBLK(oqaq)) {
-                /* adjust the mds slave's block qunit size */
-                rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
-                if (rc1 < 0)
-                        CERROR("adjust mds slave's block qunit size failed!"
-                               "(rc:%d)\n", rc1);
+
+        /* build lqs for mds */
+        lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                               qctxt, flag ? 1 : 0);
+        if (lqs && !IS_ERR(lqs)) {
+                if (flag)
+                        lqs->lqs_flags |= QB_SET;
+                else
+                        lqs->lqs_flags &= ~QB_SET;
+                lqs_putref(lqs);
+        } else {
+                CERROR("fail to %s lqs for block(%s id: %u)!\n",
+                       flag ? "create" : "search",
+                       oqctl->qc_type ? "group" : "user",
+                       oqctl->qc_id);
+                GOTO(out, rc = PTR_ERR(lqs));
         }
 
         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
         }
 
         rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
@@ -1209,18 +1220,32 @@ static int mds_init_slave_blimits(struct obd_device *obd,
                 GOTO(out, rc);
         }
 
                 GOTO(out, rc);
         }
 
-        /* adjust all slave's qunit size when setting quota
-         * this is will create a lqs for every ost, which will present
-         * certain uid/gid is set quota or not */
-        QAQ_SET_ADJBLK(oqaq);
-        rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt);
-
         EXIT;
 out:
         OBD_FREE_PTR(ioqc);
         return rc;
 }
 
         EXIT;
 out:
         OBD_FREE_PTR(ioqc);
         return rc;
 }
 
+static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq)
+{
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        int rc = 0;
+
+        QAQ_SET_CREATE_LQS(qaq);
+        /* adjust local lqs */
+        rc = quota_adjust_slave_lqs(qaq, qctxt);
+        if (rc < 0)
+                CERROR("adjust master's qunit size failed!(rc=%d)\n", rc);
+
+        /* adjust remote lqs */
+        if (QAQ_IS_ADJBLK(qaq)) {
+                rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt);
+                if (rc < 0)
+                        CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
+
+        }
+}
+
 int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
 {
         struct mds_obd *mds = &obd->u.mds;
 int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -1334,13 +1359,16 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
         }
 
         up(&mds->mds_qonoff_sem);
         }
 
         up(&mds->mds_qonoff_sem);
+
+        adjust_lqs(obd, oqaq);
+
         orig_set = ihardlimit || isoftlimit;
         now_set  = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit;
         if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) {
                 down(&dquot->dq_sem);
                 dquot->dq_dqb.dqb_curinodes = 0;
                 up(&dquot->dq_sem);
         orig_set = ihardlimit || isoftlimit;
         now_set  = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit;
         if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) {
                 down(&dquot->dq_sem);
                 dquot->dq_dqb.dqb_curinodes = 0;
                 up(&dquot->dq_sem);
-                rc = mds_init_slave_ilimits(obd, oqctl, orig_set, oqaq);
+                rc = mds_init_slave_ilimits(obd, oqctl, orig_set);
                 if (rc) {
                         CERROR("init slave ilimits failed! (rc:%d)\n", rc);
                         goto revoke_out;
                 if (rc) {
                         CERROR("init slave ilimits failed! (rc:%d)\n", rc);
                         goto revoke_out;
@@ -1353,7 +1381,7 @@ int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
                 down(&dquot->dq_sem);
                 dquot->dq_dqb.dqb_curspace = 0;
                 up(&dquot->dq_sem);
                 down(&dquot->dq_sem);
                 dquot->dq_dqb.dqb_curspace = 0;
                 up(&dquot->dq_sem);
-                rc = mds_init_slave_blimits(obd, oqctl, orig_set, oqaq);
+                rc = mds_init_slave_blimits(obd, oqctl, orig_set);
                 if (rc) {
                         CERROR("init slave blimits failed! (rc:%d)\n", rc);
                         goto revoke_out;
                 if (rc) {
                         CERROR("init slave blimits failed! (rc:%d)\n", rc);
                         goto revoke_out;
@@ -1613,7 +1641,7 @@ static int qmaster_recovery_main(void *arg)
                                 CERROR("qmaster recovery failed! (id:%d type:%d"
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
                                 CERROR("qmaster recovery failed! (id:%d type:%d"
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
         class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
                 }
         }
         class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
index c018bd0..a56c221 100644 (file)
@@ -2050,6 +2050,54 @@ test_27() {
 }
 run_test_with_stat 27 "lfs quota/setquota should handle wrong arguments (19612) ================="
 
 }
 run_test_with_stat 27 "lfs quota/setquota should handle wrong arguments (19612) ================="
 
+test_28() {
+        BLK_LIMIT=$((100 * 1024 * 1024)) # 100G
+        echo "Step 1: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+        $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+        $SHOW_QUOTA_USER
+
+        echo "Step 2: reset system ..."
+        cleanup_and_setup_lustre
+        test_0
+
+        echo "Step 3: change qunit for user [$TSTUSR:512:1024]"
+        set_blk_tunesz 512
+        set_blk_unitsz 1024
+
+        wait_delete_completed
+
+        #define OBD_FAIL_QUOTA_RET_QDATA | OBD_FAIL_ONCE
+        lustre_fail ost 0x80000A02
+
+        TESTFILE="$DIR/$tdir/$tfile"
+        mkdir -p $DIR/$tdir
+
+        BLK_LIMIT=$((100 * 1024)) # 100M
+        echo "Step 4: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+        $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+        $SHOW_QUOTA_USER
+
+        touch $TESTFILE
+        chown $TSTUSR.$TSTUSR $TESTFILE
+
+        echo "Step 5: write the test file1 [10M] ..."
+        $RUNAS dd if=/dev/zero of=$TESTFILE  bs=$BLK_SZ count=$(( 10 * 1024 )) \
+           || quota_error a $TSTUSR "write 10M file failure"
+        $SHOW_QUOTA_USER
+
+        rm -f $TESTFILE
+        sync; sleep 3; sync;
+
+        # make qd_count 64 bit
+        lustre_fail ost 0
+
+        set_blk_unitsz $((128 * 1024))
+        set_blk_tunesz $((128 * 1024 / 2))
+
+        resetquota -u $TSTUSR
+}
+run_test_with_stat 28 "test for consistency for qunit when setquota (18574) ==========="
+
 # turn off quota
 quota_fini()
 {
 # turn off quota
 quota_fini()
 {