Whamcloud - gitweb
Branch b1_8
authortianzy <tianzy>
Tue, 5 May 2009 11:03:10 +0000 (11:03 +0000)
committertianzy <tianzy>
Tue, 5 May 2009 11:03:10 +0000 (11:03 +0000)
use lqs to judge which uid/gid are set quota limitaton and
which uid/gid are not, and treat them differently.
b=18574
i=johann
i=panda

lustre/include/lustre_quota.h
lustre/lvfs/lustre_quota_fmt.c
lustre/quota/lproc_quota.c
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/quota/quota_interface.c
lustre/quota/quota_internal.h
lustre/quota/quota_master.c

index 2fba4f0..82d4abe 100644 (file)
@@ -150,7 +150,12 @@ struct lustre_dquot {
 struct dquot_id {
         struct list_head        di_link;
         __u32                   di_id;
+        __u32                   di_flag;
 };
+/* set inode quota limitation on a quota uid/gid */
+#define QI_SET                (1 << 30)
+/* set block quota limitation on a quota uid/gid */
+#define QB_SET                (1 << 31)
 
 #define QFILE_CHK               1
 #define QFILE_RD_INFO           2
@@ -226,7 +231,7 @@ struct lustre_quota_ctxt {
         int           lqc_sync_blk;         /* when blk qunit reaches this value,
                                              * later write reqs from client
                                              * should be sync b=16642 */
-        spinlock_t    lqc_lock;         /* guard lqc_imp_valid now */
+        spinlock_t    lqc_lock;             /* guard lqc_imp_valid now */
         cfs_waitq_t   lqc_wait_for_qmaster; /* when mds isn't connected, threads
                                              * on osts who send the quota reqs
                                              * with wait==1 will be put here
@@ -244,7 +249,8 @@ struct lustre_quota_ctxt {
 struct lustre_qunit_size {
         struct hlist_node lqs_hash; /* the hash entry */
         unsigned int lqs_id;        /* id of user/group */
-        unsigned long lqs_flags;    /* is user/group; FULLBUF or LESSBUF */
+        unsigned long lqs_flags;    /* 31st bit is QB_SET, 30th bit is QI_SET
+                                     * other bits are same as LQUOTA_FLAGS_* */
         unsigned long lqs_iunit_sz; /* Unit size of file quota currently */
         unsigned long lqs_itune_sz; /* Trigger dqacq when available file quota
                                      * less than this value, trigger dqrel
index 66a4677..2351548 100644 (file)
@@ -1092,12 +1092,16 @@ int lustre_get_qids(struct file *fp, struct inode *inode, int type,
                                     ddquot + i*dqblk_sz, dqblk_sz))
                                 continue;
 
-                        dqid = kmalloc(sizeof(*dqid), GFP_NOFS);
-                        if (!dqid) 
+                        OBD_ALLOC_GFP(dqid, sizeof(*dqid), GFP_NOFS);
+                        if (!dqid)
                                 GOTO(out_free, rc = -ENOMEM);
 
-                        dqid->di_id = DQF_GET(ddquot + i * dqblk_sz, 
+                        dqid->di_id = DQF_GET(ddquot + i * dqblk_sz,
                                               version, dqb_id);
+                        dqid->di_flag = DQF_GET(ddquot + i * dqblk_sz, version,
+                                                dqb_ihardlimit) ? QI_SET : 0;
+                        dqid->di_flag |= DQF_GET(ddquot + i * dqblk_sz, version,
+                                                 dqb_bhardlimit) ? QB_SET : 0;
                         INIT_LIST_HEAD(&dqid->di_link);
                         list_add(&dqid->di_link, list);
                 }
index d211d65..06bd80a 100644 (file)
@@ -411,8 +411,10 @@ int lprocfs_quota_wr_type(struct file *file, const char *buffer,
                 }
         }
 
-        if (type != 0)
+        if (type != 0) {
                 auto_quota_on(obd, type - 1, obt->obt_sb, is_mds);
+                build_lqs(obd);
+        }
 
         return count;
 }
index 992efe5..72c107a 100644 (file)
@@ -145,25 +145,34 @@ struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
                                            struct lustre_quota_ctxt *qctxt,
                                            int create)
 {
-        int rc = 0;
         struct lustre_qunit_size *lqs;
+        int rc = 0;
 
  search_lqs:
         lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key);
-        if (lqs == NULL && create) {
+        if (IS_ERR(lqs))
+                GOTO(out, rc = PTR_ERR(lqs));
+
+        if (create && lqs == NULL) {
+                /* if quota_create_lqs is successful, it will get a
+                 * ref to the lqs. The ref will be released when
+                 * qctxt_cleanup() or quota is nullified */
                 lqs = quota_create_lqs(lqs_key, qctxt);
                 if (IS_ERR(lqs))
                         rc = PTR_ERR(lqs);
-                if (rc == -EALREADY) {
-                        rc = 0;
-                        goto search_lqs;
-                }
+                if (rc == -EALREADY)
+                        GOTO(search_lqs, rc = 0);
+                /* get a reference for the caller when creating lqs
+                 * successfully */
+                if (rc == 0)
+                        lqs_getref(lqs);
         }
 
-        if (lqs)
+        if (lqs && rc == 0)
                 LQS_DEBUG(lqs, "%s\n",
                           (create == 1 ? "create lqs" : "search lqs"));
 
+ out:
         if (rc == 0) {
                 return lqs;
         } else {
@@ -182,24 +191,22 @@ int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq,
         int rc = 0;
         ENTRY;
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_WITHOUT_CHANGE_QS))
-                RETURN(0);
-
         LASSERT(qctxt);
         lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id),
-                               qctxt, 1);
-        if (lqs == NULL || IS_ERR(lqs))
+                               qctxt, 0);
+        if (lqs == NULL || IS_ERR(lqs)){
+                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+                       QAQ_IS_GRP(oqaq) ? "group" : "user", oqaq->qaq_id);
                 RETURN(PTR_ERR(lqs));
+        }
 
-        /* deleting the lqs, because a user sets lfs quota 0 0 0 0  */
-        if (!oqaq->qaq_bunit_sz && !oqaq->qaq_iunit_sz && QAQ_IS_ADJBLK(oqaq) &&
-            QAQ_IS_ADJINO(oqaq)) {
-                LQS_DEBUG(lqs, "release lqs\n");
-                /* this is for quota_search_lqs */
+        if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_WITHOUT_CHANGE_QS)) {
+                lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz;
+                lqs->lqs_btune_sz = qctxt->lqc_btune_sz;
+                lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz;
+                lqs->lqs_itune_sz = qctxt->lqc_itune_sz;
                 lqs_putref(lqs);
-                /* kill lqs */
-                lqs_putref(lqs);
-                RETURN(rc);
+                RETURN(0);
         }
 
         lbunit = &lqs->lqs_bunit_sz;
index b8009b5..a918574 100644 (file)
@@ -276,9 +276,12 @@ check_cur_qunit(struct obd_device *obd,
                 GOTO(out, ret = 0);
 
         lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
-                               qctxt, 1);
-        if (IS_ERR(lqs))
-                GOTO (out, ret = PTR_ERR(lqs));
+                               qctxt, 0);
+        if (IS_ERR(lqs) || lqs == NULL) {
+                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+                       QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
+                GOTO (out, ret = 0);
+        }
         spin_lock(&lqs->lqs_lock);
 
         if (QDATA_IS_BLK(qdata)) {
@@ -1030,8 +1033,7 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (quota_is_set(obd, uid, gid, isblk ? QB_SET : QI_SET) == 0)
                 RETURN(0);
 
         for (i = 0; i < MAXQUOTAS; i++) {
@@ -1182,6 +1184,12 @@ static int check_lqs(struct lustre_quota_ctxt *qctxt)
         RETURN(rc);
 }
 
+
+void hash_put_lqs(void *obj, void *data)
+{
+        lqs_putref((struct lustre_qunit_size *)obj);
+}
+
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
@@ -1225,6 +1233,7 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                      cfs_time_seconds(1));
         }
 
+        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
         l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
         lustre_hash_exit(qctxt->lqc_lqs_hash);
 
@@ -1316,7 +1325,7 @@ static int qslave_recovery_main(void *arg)
                                        "qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
 
index 9fa1604..1da7765 100644 (file)
 
 #ifdef HAVE_QUOTA_SUPPORT
 #ifdef __KERNEL__
+
+/* When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574 */
+void build_lqs(struct obd_device *obd)
+{
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct list_head id_list;
+        int i, rc;
+
+        INIT_LIST_HEAD(&id_list);
+        for (i = 0; i < MAXQUOTAS; i++) {
+                struct dquot_id *dqid, *tmp;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+                rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+                                 i, &id_list);
+#else
+                rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+                                 i, &id_list);
+#endif
+                if (rc) {
+                        CDEBUG(D_ERROR, "fail to get %s qids!\n",
+                               i ? "group" : "user");
+                        continue;
+                }
+
+                list_for_each_entry_safe(dqid, tmp, &id_list,
+                                         di_link) {
+                        struct lustre_qunit_size *lqs;
+
+                        list_del_init(&dqid->di_link);
+                        lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+                                               qctxt, 1);
+                        if (lqs && !IS_ERR(lqs)) {
+                                lqs->lqs_flags |= dqid->di_flag;
+                                lqs_putref(lqs);
+                        } else {
+                                CDEBUG(D_ERROR, "fail to create a lqs"
+                                       "(%s id: %u)!\n", i ? "group" : "user",
+                                       dqid->di_id);
+                        }
+
+                        OBD_FREE_PTR(dqid);
+                }
+        }
+}
+
 int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -80,6 +128,8 @@ int mds_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
         case Q_QUOTAON:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
                 rc = mds_quota_on(obd, oqctl);
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                build_lqs(obd);
                 break;
         case Q_QUOTAOFF:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
@@ -131,6 +181,7 @@ int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
         struct obd_device_target *obt = &obd->u.obt;
         struct lvfs_run_ctxt saved;
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct lustre_qunit_size *lqs;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
@@ -179,6 +230,10 @@ int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
                         atomic_inc(&obt->obt_quotachecking);
                 }
+
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                if (oqctl->qc_cmd == Q_QUOTAON)
+                        build_lqs(obd);
                 break;
         case Q_SETQUOTA:
                 /* currently, it is only used for nullifying the quota */
@@ -194,6 +249,16 @@ int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
                         oqctl->qc_cmd = Q_SETQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 0);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                } else {
+                        lqs->lqs_flags &= ~QB_SET;
+                        lqs_putref(lqs);
+                }
+
                 break;
         case Q_INITQUOTA:
                 {
@@ -228,6 +293,22 @@ int filter_quota_ctl(struct obd_export *exp, struct obd_quotactl *oqctl)
                 if (rc)
                         RETURN(rc);
 adjust:
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 1);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                        break;
+                } else {
+                        lqs->lqs_flags |= QB_SET;
+                        if (OBD_FAIL_CHECK(OBD_FAIL_QUOTA_WITHOUT_CHANGE_QS)) {
+                                lqs->lqs_bunit_sz = qctxt->lqc_bunit_sz;
+                                lqs->lqs_btune_sz = qctxt->lqc_btune_sz;
+                                lqs->lqs_iunit_sz = qctxt->lqc_iunit_sz;
+                                lqs->lqs_itune_sz = qctxt->lqc_itune_sz;
+                        }
+                        lqs_putref(lqs);
+                }
+
                 /* Trigger qunit pre-acquire */
                 if (oqctl->qc_type == USRQUOTA)
                         uid = oqctl->qc_id;
index 5e94307..4303f57 100644 (file)
@@ -260,10 +260,6 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
         int rc = 0, rc2[2] = { 0, 0 };
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
-                RETURN(rc);
-
         spin_lock(&qctxt->lqc_lock);
         if (!qctxt->lqc_valid){
                 spin_unlock(&qctxt->lqc_lock);
@@ -469,13 +465,38 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
         RETURN(rc);
 }
 
+int quota_is_set(struct obd_device *obd, unsigned int uid,
+                 unsigned int gid, int flag)
+{
+        struct lustre_qunit_size *lqs;
+        __u32 id[MAXQUOTAS] = { uid, gid };
+        int i, q_set = 0;
+
+        if (!sb_any_quota_enabled(obd->u.obt.obt_qctxt.lqc_sb))
+                RETURN(0);
+
+        for (i = 0; i < MAXQUOTAS; i++) {
+                lqs = quota_search_lqs(LQS_KEY(i, id[i]),
+                                       &obd->u.obt.obt_qctxt, 0);
+                if (lqs && !IS_ERR(lqs)) {
+                        if (lqs->lqs_flags & flag)
+                                q_set = 1;
+                        lqs_putref(lqs);
+                }
+        }
+
+        return q_set;
+}
+
 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
                               unsigned int gid, int npage, int *pending,
                               quota_acquire acquire, struct obd_trans_info *oti,
                               struct inode *inode, int frags)
 {
-        return quota_chk_acq_common(obd, uid, gid, npage, pending, LQUOTA_FLAGS_BLK,
-                                    acquire, oti, inode, frags);
+        return quota_is_set(obd, uid, gid, QB_SET) ?
+                quota_chk_acq_common(obd, uid, gid, npage, pending,
+                                     LQUOTA_FLAGS_BLK, acquire, oti, inode,
+                                     frags) : 0;
 }
 
 /* when a block_write or inode_create rpc is finished, adjust the record for
@@ -619,8 +640,9 @@ static int mds_quota_check(struct obd_device *obd, unsigned int uid,
                            quota_acquire acquire, struct obd_trans_info *oti,
                            struct inode *inode, int frags)
 {
-        return quota_chk_acq_common(obd, uid, gid, inodes, pending, 0,
-                                    acquire, oti, inode, frags);
+        return quota_is_set(obd, uid, gid, QI_SET) ?
+                quota_chk_acq_common(obd, uid, gid, inodes, pending, 0,
+                                     acquire, oti, inode, frags) : 0;
 }
 
 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
index 6be30e4..e9825a5 100644 (file)
@@ -154,6 +154,8 @@ int target_quota_check(struct obd_export *exp, struct obd_quotactl *oqctl);
 int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct
                           lustre_quota_ctxt *qctxt);
 #ifdef __KERNEL__
+int quota_is_set(struct obd_device *obd, unsigned int uid,
+                 unsigned int gid, int flag);
 struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
                                            struct lustre_quota_ctxt *qctxt,
                                            int create);
@@ -170,6 +172,7 @@ int filter_quota_adjust_qunit(struct obd_export *exp,
                               struct lustre_quota_ctxt *qctxt);
 int lquota_proc_setup(struct obd_device *obd, int is_master);
 int lquota_proc_cleanup(struct lustre_quota_ctxt *qctxt);
+void build_lqs(struct obd_device *obd);
 
 extern cfs_proc_dir_entry_t *lquota_type_proc_dir;
 #endif
index 5249a87..b142426 100644 (file)
@@ -1089,14 +1089,6 @@ int dquot_create_oqaq(struct lustre_quota_ctxt *qctxt,
 
         }
 
-        if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit &&
-            !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) {
-                oqaq->qaq_bunit_sz = 0;
-                oqaq->qaq_iunit_sz = 0;
-                QAQ_SET_ADJBLK(oqaq);
-                QAQ_SET_ADJINO(oqaq);
-        }
-
         QAQ_DEBUG(oqaq, "the oqaq computed\n");
 
         RETURN(rc);
@@ -1111,6 +1103,7 @@ static int mds_init_slave_ilimits(struct obd_device *obd,
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         unsigned int uid = 0, gid = 0;
         struct obd_quotactl *ioqc = NULL;
+        struct lustre_qunit_size *lqs;
         int flag;
         int rc;
         ENTRY;
@@ -1132,6 +1125,23 @@ static int mds_init_slave_ilimits(struct obd_device *obd,
         ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
         ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
 
+        /* build lqs for mds */
+        lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                               qctxt, flag ? 1 : 0);
+        if (lqs && !IS_ERR(lqs)) {
+                if (flag)
+                        lqs->lqs_flags |= QI_SET;
+                else
+                        lqs->lqs_flags &= ~QI_SET;
+                lqs_putref(lqs);
+        } else {
+                CERROR("fail to %s lqs for inode(%s id: %u)!\n",
+                       flag ? "create" : "search",
+                       oqctl->qc_type ? "group" : "user",
+                       oqctl->qc_id);
+                GOTO(out, rc = PTR_ERR(lqs));
+        }
+
         if (QAQ_IS_ADJINO(oqaq)) {
                 /* adjust the mds slave's inode qunit size */
                 rc = quota_adjust_slave_lqs(oqaq, qctxt);
@@ -1178,6 +1188,7 @@ static int mds_init_slave_blimits(struct obd_device *obd,
         struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
         struct mds_obd *mds = &obd->u.mds;
         struct obd_quotactl *ioqc;
+        struct lustre_qunit_size *lqs;
         unsigned int uid = 0, gid = 0;
         int rc, rc1 = 0;
         int flag;
@@ -1199,6 +1210,24 @@ static int mds_init_slave_blimits(struct obd_device *obd,
         ioqc->qc_type = oqctl->qc_type;
         ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
         ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
+
+        /* build lqs for mds */
+        lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                               qctxt, flag ? 1 : 0);
+        if (lqs && !IS_ERR(lqs)) {
+                if (flag)
+                        lqs->lqs_flags |= QB_SET;
+                else
+                        lqs->lqs_flags &= ~QB_SET;
+                lqs_putref(lqs);
+        } else {
+                CERROR("fail to %s lqs for block(%s id: %u)!\n",
+                       flag ? "create" : "search",
+                       oqctl->qc_type ? "group" : "user",
+                       oqctl->qc_id);
+                GOTO(out, rc = PTR_ERR(lqs));
+        }
+
         if (QAQ_IS_ADJBLK(oqaq)) {
                 /* adjust the mds slave's block qunit size */
                 rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
@@ -1621,7 +1650,7 @@ static int qmaster_recovery_main(void *arg)
                                 CERROR("qmaster recovery failed! (id:%d type:%d"
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
         class_decref(obd);