Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_ctl.c
index 6af3712..8a982c1 100644 (file)
 
 #ifdef HAVE_QUOTA_SUPPORT
 #ifdef __KERNEL__
+
+/* When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574 */
+void build_lqs(struct obd_device *obd)
+{
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct list_head id_list;
+        int i, rc;
+
+        INIT_LIST_HEAD(&id_list);
+        for (i = 0; i < MAXQUOTAS; i++) {
+                struct dquot_id *dqid, *tmp;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+                rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+                                 i, &id_list);
+#else
+                rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+                                 i, &id_list);
+#endif
+                if (rc) {
+                        CDEBUG(D_ERROR, "fail to get %s qids!\n",
+                               i ? "group" : "user");
+                        continue;
+                }
+
+                list_for_each_entry_safe(dqid, tmp, &id_list,
+                                         di_link) {
+                        struct lustre_qunit_size *lqs;
+
+                        list_del_init(&dqid->di_link);
+                        lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+                                               qctxt, 1);
+                        if (lqs && !IS_ERR(lqs)) {
+                                lqs->lqs_flags |= dqid->di_flag;
+                                lqs_putref(lqs);
+                        } else {
+                                CDEBUG(D_ERROR, "fail to create a lqs"
+                                       "(%s id: %u)!\n", i ? "group" : "user",
+                                       dqid->di_id);
+                        }
+
+                        OBD_FREE_PTR(dqid);
+                }
+        }
+}
+
 int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
                   struct obd_quotactl *oqctl)
 {
@@ -80,6 +128,8 @@ int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
         case Q_QUOTAON:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
                 rc = mds_quota_on(obd, oqctl);
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                build_lqs(obd);
                 break;
         case Q_QUOTAOFF:
                 oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
@@ -132,6 +182,8 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
         struct obd_device_target *obt = &obd->u.obt;
         struct lvfs_run_ctxt saved;
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        struct lustre_qunit_size *lqs;
+        void *handle = NULL;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
@@ -161,17 +213,18 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
         case Q_GETQUOTA:
                 /* In recovery scenario, this pending dqacq/dqrel might have
                  * been processed by master successfully before it's dquot
-                 * on master enter recovery mode. We must wait for this 
+                 * on master enter recovery mode. We must wait for this
                  * dqacq/dqrel done then return the correct limits to master */
                 if (oqctl->qc_stat == QUOTA_RECOVERING)
-                        qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                                 oqctl->qc_id, oqctl->qc_type,
-                                                 1);
+                        handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
+                if (oqctl->qc_stat == QUOTA_RECOVERING)
+                        quota_unbarrier(handle);
+
                 if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
                     oqctl->qc_cmd == Q_FINVALIDATE) {
                         if (!rc && oqctl->qc_cmd == Q_QUOTAON)
@@ -180,11 +233,14 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                                 obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
                         atomic_inc(&obt->obt_quotachecking);
                 }
+
+                /* when quotaon, create lqs for every quota uid/gid b=18574 */
+                if (oqctl->qc_cmd == Q_QUOTAON)
+                        build_lqs(obd);
                 break;
         case Q_SETQUOTA:
                 /* currently, it is only used for nullifying the quota */
-                qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                         oqctl->qc_id, oqctl->qc_type, 1);
+                handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
 
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
@@ -195,6 +251,17 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                         oqctl->qc_cmd = Q_SETQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                quota_unbarrier(handle);
+
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 0);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                } else {
+                        lqs->lqs_flags &= ~QB_SET;
+                        lqs_putref(lqs);
+                }
+
                 break;
         case Q_INITQUOTA:
                 {
@@ -204,15 +271,13 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                 LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
                 LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
 
-                /* There might be a pending dqacq/dqrel (which is going to
-                 * clear stale limits on slave). we should wait for it's
-                 * completion then initialize limits */
-                qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
-                                         oqctl->qc_id, oqctl->qc_type, 1);
-
                 if (!oqctl->qc_dqblk.dqb_bhardlimit)
                         goto adjust;
 
+               /* There might be a pending dqacq/dqrel (which is going to
+                 * clear stale limits on slave). we should wait for it's
+                 * completion then initialize limits */
+                handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
                 LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
@@ -225,10 +290,21 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp,
                         oqctl->qc_cmd = Q_INITQUOTA;
                 }
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+                quota_unbarrier(handle);
 
                 if (rc)
                         RETURN(rc);
 adjust:
+                lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+                                       qctxt, 1);
+                if (lqs == NULL || IS_ERR(lqs)){
+                        CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+                        break;
+                } else {
+                        lqs->lqs_flags |= QB_SET;
+                        lqs_putref(lqs);
+                }
+
                 /* Trigger qunit pre-acquire */
                 if (oqctl->qc_type == USRQUOTA)
                         id[USRQUOTA] = oqctl->qc_id;