Whamcloud - gitweb
LU-11425 quota: support quota for DoM 57/33257/8
authorHongchao Zhang <hongchao@whamcloud.com>
Sun, 14 Oct 2018 17:44:30 +0000 (13:44 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 13 Nov 2018 06:17:34 +0000 (06:17 +0000)
Support block quota enforcement on MDT to accommodate the data
writing introduced by DoM.

This patch adds a new qsd_instance on the OSD device at MDT to
implement the quota enforcement on block data, for there is only
one type of quota (meta data or block data) to be handled in one
qsd_instance in the current quota design of Lustre, it's much more
changes to adapt the qsd_instance to support more quota type than
to use a separated qsd_instance to enfore meta data on MDT.

Change-Id: I7c57f383d338cf0070e05ecc318d42586c39371e
Signed-off-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/33257
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Wang Shilong <wshilong@ddn.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
16 files changed:
lustre/include/lustre_quota.h
lustre/include/uapi/linux/lustre/lustre_disk.h
lustre/mdt/mdt_io.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_quota.c
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_quota.c
lustre/quota/lquota_disk.c
lustre/quota/lquota_lib.c
lustre/quota/qsd_handler.c
lustre/quota/qsd_lib.c
lustre/tests/conf-sanity.sh
lustre/tests/sanity-quota.sh

index 95d3062..c7a71a2 100644 (file)
@@ -176,7 +176,7 @@ struct qsd_instance;
  * enforcement. Arguments are documented where each function is defined.  */
 
 struct qsd_instance *qsd_init(const struct lu_env *, char *, struct dt_device *,
-                             struct proc_dir_entry *);
+                             struct proc_dir_entry *, bool is_md);
 int qsd_prepare(const struct lu_env *, struct qsd_instance *);
 int qsd_start(const struct lu_env *, struct qsd_instance *);
 void qsd_fini(const struct lu_env *, struct qsd_instance *);
@@ -212,13 +212,13 @@ struct lquota_id_info {
        bool                     lqi_is_blk;
 };
 
-/* Since we enforce only inode quota in meta pool (MDTs), and block quota in
- * data pool (OSTs), there are at most 4 quota ids being enforced in a single
- * transaction, which is chown transaction:
+/* With the DoM, both inode quota in meta pool and block quota in data pool
+ * will be enforced at MDT, there are at most 4 quota ids being enforced in
+ * a single transaction for inode and block quota, which is chown transaction:
  * original uid and gid, new uid and gid.
  *
  * This value might need to be revised when directory quota is added.  */
-#define QUOTA_MAX_TRANSIDS    4
+#define QUOTA_MAX_TRANSIDS    8
 
 /* all qids involved in a single transaction */
 struct lquota_trans {
index 03aa1e6..9acc3ef 100644 (file)
@@ -62,6 +62,8 @@
 #define MGS_NIDTBL_DIR         "NIDTBL_VERSIONS"
 #define QMT_DIR                        "quota_master"
 #define QSD_DIR                        "quota_slave"
+#define QSD_DIR_DT             "quota_slave_dt"
+#define QSD_DIR_MD             "quota_slave_md"
 #define HSM_ACTIONS            "hsm_actions"
 #define LFSCK_DIR              "LFSCK"
 #define LFSCK_BOOKMARK         "lfsck_bookmark"
index 2d6f802..1fc93b9 100644 (file)
@@ -732,8 +732,15 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                                        oa->o_flags = OBD_FL_NO_GRPQUOTA;
                        }
 
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_PRJQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_PRJQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_PRJQUOTA;
+                       }
+
                        oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLUSRQUOTA |
-                                      OBD_MD_FLGRPQUOTA;
+                                      OBD_MD_FLGRPQUOTA | OBD_MD_FLPRJQUOTA;
                }
        } else if (cmd == OBD_BRW_READ) {
                /* If oa != NULL then mdt_preprw_read updated the inode
index 12b1d32..f0cda5f 100644 (file)
@@ -1968,7 +1968,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        struct osd_thandle *oh;
        struct osd_iobuf *iobuf = &oti->oti_iobuf;
        struct osd_device *osd = osd_dt_dev(th->th_dev);
-       struct qsd_instance *qsd = osd->od_quota_slave;
+       struct qsd_instance *qsd = osd_def_qsd(osd);
        struct lquota_trans *qtrans;
        struct list_head truncates = LIST_HEAD_INIT(truncates);
        int rc = 0, remove_agents = 0;
@@ -2093,9 +2093,9 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
        osd_index_fini(obj);
        if (inode != NULL) {
-               struct qsd_instance     *qsd = osd_obj2dev(obj)->od_quota_slave;
-               qid_t                    uid = i_uid_read(inode);
-               qid_t                    gid = i_gid_read(inode);
+               struct qsd_instance *qsd = osd_def_qsd(osd_obj2dev(obj));
+               qid_t                uid = i_uid_read(inode);
+               qid_t                gid = i_gid_read(inode);
 
                obj->oo_inode = NULL;
                iput(inode);
@@ -7447,10 +7447,17 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
        ENTRY;
 
        /* shutdown quota slave instance associated with the device */
-       if (o->od_quota_slave != NULL) {
-               struct qsd_instance *qsd = o->od_quota_slave;
+       if (o->od_quota_slave_md != NULL) {
+               struct qsd_instance *qsd = o->od_quota_slave_md;
 
-               o->od_quota_slave = NULL;
+               o->od_quota_slave_md = NULL;
+               qsd_fini(env, qsd);
+       }
+
+       if (o->od_quota_slave_dt != NULL) {
+               struct qsd_instance *qsd = o->od_quota_slave_dt;
+
+               o->od_quota_slave_dt = NULL;
                qsd_fini(env, qsd);
        }
 
@@ -7774,11 +7781,29 @@ static int osd_device_init0(const struct lu_env *env,
        LASSERT(l->ld_site->ls_linkage.prev != NULL);
 
        /* initialize quota slave instance */
-       o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
-                                    o->od_proc_entry);
-       if (IS_ERR(o->od_quota_slave)) {
-               rc = PTR_ERR(o->od_quota_slave);
-               o->od_quota_slave = NULL;
+       /* currently it's no need to prepare qsd_instance_md for OST */
+       if (!o->od_is_ost) {
+               o->od_quota_slave_md = qsd_init(env, o->od_svname,
+                                               &o->od_dt_dev,
+                                               o->od_proc_entry, true);
+               if (IS_ERR(o->od_quota_slave_md)) {
+                       rc = PTR_ERR(o->od_quota_slave_md);
+                       o->od_quota_slave_md = NULL;
+                       GOTO(out_procfs, rc);
+               }
+       }
+
+       o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
+                                       o->od_proc_entry, false);
+
+       if (IS_ERR(o->od_quota_slave_dt)) {
+               if (o->od_quota_slave_md != NULL) {
+                       qsd_fini(env, o->od_quota_slave_md);
+                       o->od_quota_slave_md = NULL;
+               }
+
+               rc = PTR_ERR(o->od_quota_slave_dt);
+               o->od_quota_slave_dt = NULL;
                GOTO(out_procfs, rc);
        }
 
@@ -7901,14 +7926,17 @@ static int osd_recovery_complete(const struct lu_env *env,
 
        ENTRY;
 
-       if (osd->od_quota_slave == NULL)
+       if (osd->od_quota_slave_md == NULL && osd->od_quota_slave_dt == NULL)
                RETURN(0);
 
        /*
         * start qsd instance on recovery completion, this notifies the quota
         * slave code that we are about to process new requests now
         */
-       rc = qsd_start(env, osd->od_quota_slave);
+       rc = qsd_start(env, osd->od_quota_slave_dt);
+       if (rc == 0 && osd->od_quota_slave_md != NULL)
+               rc = qsd_start(env, osd->od_quota_slave_md);
+
        RETURN(rc);
 }
 
@@ -7976,13 +8004,21 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
 
        ENTRY;
 
-       if (osd->od_quota_slave != NULL) {
-               /* set up quota slave objects */
-               result = qsd_prepare(env, osd->od_quota_slave);
+       if (osd->od_quota_slave_md != NULL) {
+               /* set up quota slave objects for inode */
+               result = qsd_prepare(env, osd->od_quota_slave_md);
+               if (result != 0)
+                       RETURN(result);
+       }
+
+       if (osd->od_quota_slave_dt != NULL) {
+               /* set up quota slave objects for block */
+               result = qsd_prepare(env, osd->od_quota_slave_dt);
                if (result != 0)
                        RETURN(result);
        }
 
+
        if (lsd->lsd_feature_incompat & OBD_COMPAT_OST) {
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 52, 0)
                if (lsd->lsd_feature_rocompat & OBD_ROCOMPAT_IDX_IN_IDIF) {
index 4cf0191..9afd83b 100644 (file)
@@ -300,8 +300,11 @@ struct osd_device {
        char                      od_svname[MAX_OBD_NAME];
        char                      od_mntdev[MAX_OBD_NAME];
 
-       /* quota slave instance */
-       struct qsd_instance      *od_quota_slave;
+       /* quota slave instance for inode */
+       struct qsd_instance      *od_quota_slave_md;
+
+       /* quota slave instance for block */
+       struct qsd_instance      *od_quota_slave_dt;
 
        /* osd seq instance */
        struct lu_client_seq    *od_cl_seq;
@@ -327,6 +330,14 @@ struct osd_device {
        enum osd_t10_type        od_t10_type;
 };
 
+static inline struct qsd_instance *osd_def_qsd(struct osd_device *osd)
+{
+       if (osd->od_is_ost)
+               return osd->od_quota_slave_dt;
+       else
+               return osd->od_quota_slave_md;
+}
+
 enum osd_full_scrub_ratio {
        /* Trigger OI scrub to scan the whole device directly. */
        OFSR_DIRECTLY   = 0,
@@ -384,11 +395,15 @@ struct osd_thandle {
        /* Link to the device, for debugging. */
        struct lu_ref_link      ot_dev_link;
        unsigned int            ot_credits;
+
+       /* quota IDs related to the transaction */
        unsigned short          ot_id_cnt;
-       __u8                    ot_id_types[OSD_MAX_UGID_CNT];
-       unsigned int            ot_remove_agents:1;
+       __u8                    ot_id_res[OSD_MAX_UGID_CNT];
+       __u8                    ot_id_types[OSD_MAX_UGID_CNT];
        uid_t                   ot_id_array[OSD_MAX_UGID_CNT];
        struct lquota_trans    *ot_quota_trans;
+
+       unsigned int            ot_remove_agents:1;
 #if OSD_THANDLE_STATS
         /** time when this handle was allocated */
        ktime_t oth_alloced;
index 34d2a74..b7edc53 100644 (file)
@@ -555,6 +555,7 @@ int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh,
        struct qsd_instance *qsd;
        struct inode *inode = NULL;
        int i, rc = 0, crd;
+       __u8 res = qi->lqi_is_blk ? LQUOTA_RES_DT : LQUOTA_RES_MD;
        bool found = false;
 
        ENTRY;
@@ -566,10 +567,14 @@ int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh,
        dev = osd_dt_dev(oh->ot_super.th_dev);
        LASSERT(dev != NULL);
 
-       qsd = dev->od_quota_slave;
+       if (res == LQUOTA_RES_DT)
+               qsd = dev->od_quota_slave_dt;
+       else
+               qsd = dev->od_quota_slave_md;
 
        for (i = 0; i < oh->ot_id_cnt; i++) {
                if (oh->ot_id_array[i] == qi->lqi_id.qid_uid &&
+                   oh->ot_id_res[i] == res &&
                    oh->ot_id_types[i] == qi->lqi_type) {
                        found = true;
                        break;
@@ -617,6 +622,7 @@ int osd_declare_qid(const struct lu_env *env, struct osd_thandle *oh,
 
                oh->ot_id_array[i] = qi->lqi_id.qid_uid;
                oh->ot_id_types[i] = qi->lqi_type;
+               oh->ot_id_res[i] = res;
                oh->ot_id_cnt++;
        }
 
@@ -698,9 +704,11 @@ int osd_declare_inode_qid(const struct lu_env *env, qid_t uid, qid_t gid,
        qi->lqi_type = PRJQUOTA;
        rcp = osd_declare_qid(env, oh, qi, obj, true, flags);
 
-       if (force && (rcp == -EDQUOT || rcp == -EINPROGRESS))
+       if (force && (rcp == -EDQUOT || rcp == -EINPROGRESS)) {
+               CDEBUG(D_ERROR, "force to ignore quota flags =%d\n", *flags);
                /* as before, ignore EDQUOT & EINPROGRESS for root */
                rcp = 0;
+       }
 #endif
 
        RETURN(rcu ? rcu : (rcg ? rcg : rcp));
index 4792b89..dec2abf 100644 (file)
@@ -164,7 +164,9 @@ static void osd_trans_commit_cb(void *cb_data, int error)
         * should be released. Quota space won't be adjusted at this point since
         * we can't provide a suitable environment. It will be performed
         * asynchronously by a lquota thread. */
-       qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+       qsd_op_end(NULL, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+       if (osd->od_quota_slave_md != NULL)
+               qsd_op_end(NULL, osd->od_quota_slave_md, &oh->ot_quota_trans);
 
        lu_device_put(lud);
        th->th_dev = NULL;
@@ -303,7 +305,10 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                osd_unlinked_list_emptify(env, osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
-               qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+               qsd_op_end(env, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+               if (osd->od_quota_slave_md != NULL)
+                       qsd_op_end(env, osd->od_quota_slave_md,
+                                  &oh->ot_quota_trans);
                OBD_FREE_PTR(oh);
                RETURN(0);
        }
@@ -768,14 +773,21 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
        ENTRY;
 
        /* shutdown quota slave instance associated with the device */
-       if (o->od_quota_slave != NULL) {
+       if (o->od_quota_slave_md != NULL) {
                /* complete all in-flight callbacks */
                osd_sync(env, &o->od_dt_dev);
                txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
-               qsd_fini(env, o->od_quota_slave);
-               o->od_quota_slave = NULL;
+               qsd_fini(env, o->od_quota_slave_md);
+               o->od_quota_slave_md = NULL;
        }
 
+       if (o->od_quota_slave_dt != NULL) {
+               /* complete all in-flight callbacks */
+               osd_sync(env, &o->od_dt_dev);
+               txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+               qsd_fini(env, o->od_quota_slave_dt);
+               o->od_quota_slave_dt = NULL;
+       }
        osd_fid_fini(env, o);
 
        RETURN(0);
@@ -1151,12 +1163,29 @@ static int osd_mount(const struct lu_env *env,
        if (rc)
                GOTO(err, rc);
 
-       /* initialize quota slave instance */
-       o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
-                                    o->od_proc_entry);
-       if (IS_ERR(o->od_quota_slave)) {
-               rc = PTR_ERR(o->od_quota_slave);
-               o->od_quota_slave = NULL;
+       /* currently it's no need to prepare qsd_instance_md for OST */
+       if (!o->od_is_ost) {
+               o->od_quota_slave_md = qsd_init(env, o->od_svname,
+                                               &o->od_dt_dev,
+                                               o->od_proc_entry, true);
+               if (IS_ERR(o->od_quota_slave_md)) {
+                       rc = PTR_ERR(o->od_quota_slave_md);
+                       o->od_quota_slave_md = NULL;
+                       GOTO(err, rc);
+               }
+       }
+
+       o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
+                                    o->od_proc_entry, false);
+
+       if (IS_ERR(o->od_quota_slave_dt)) {
+               if (o->od_quota_slave_md != NULL) {
+                       qsd_fini(env, o->od_quota_slave_md);
+                       o->od_quota_slave_md = NULL;
+               }
+
+               rc = PTR_ERR(o->od_quota_slave_dt);
+               o->od_quota_slave_dt = NULL;
                GOTO(err, rc);
        }
 
@@ -1405,12 +1434,14 @@ static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d)
        int                      rc = 0;
        ENTRY;
 
-       if (osd->od_quota_slave == NULL)
+       if (osd->od_quota_slave_md == NULL && osd->od_quota_slave_dt == NULL)
                RETURN(0);
 
        /* start qsd instance on recovery completion, this notifies the quota
         * slave code that we are about to process new requests now */
-       rc = qsd_start(env, osd->od_quota_slave);
+       rc = qsd_start(env, osd->od_quota_slave_dt);
+       if (rc == 0 && osd->od_quota_slave_md != NULL)
+               rc = qsd_start(env, osd->od_quota_slave_md);
        RETURN(rc);
 }
 
@@ -1500,9 +1531,16 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
        int                      rc = 0;
        ENTRY;
 
-       if (osd->od_quota_slave != NULL) {
+       if (osd->od_quota_slave_md != NULL) {
+               /* set up quota slave objects */
+               rc = qsd_prepare(env, osd->od_quota_slave_md);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       if (osd->od_quota_slave_dt != NULL) {
                /* set up quota slave objects */
-               rc = qsd_prepare(env, osd->od_quota_slave);
+               rc = qsd_prepare(env, osd->od_quota_slave_dt);
                if (rc != 0)
                        RETURN(rc);
        }
index 20656b4..b1bec9e 100644 (file)
@@ -364,8 +364,11 @@ struct osd_device {
        dnode_t                 *od_projectused_dn;
 #endif
 
-       /* quota slave instance */
-       struct qsd_instance     *od_quota_slave;
+       /* quota slave instance for inode */
+       struct qsd_instance     *od_quota_slave_md;
+
+       /* quota slave instance for block */
+       struct qsd_instance     *od_quota_slave_dt;
 
        struct brw_stats        od_brw_stats;
        atomic_t                od_r_in_flight;
@@ -393,6 +396,14 @@ struct osd_device {
        unsigned long long       od_readcache_max_filesize;
 };
 
+static inline struct qsd_instance *osd_def_qsd(struct osd_device *osd)
+{
+       if (osd->od_is_ost)
+               return osd->od_quota_slave_dt;
+       else
+               return osd->od_quota_slave_md;
+}
+
 enum osd_destroy_type {
        OSD_DESTROY_NONE = 0,
        OSD_DESTROY_SYNC = 1,
index dd42a45..a04a29b 100644 (file)
@@ -1158,7 +1158,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        if (attr && attr->la_valid & LA_UID) {
                /* quota enforcement for user */
                if (attr->la_uid != obj->oo_attr.la_uid) {
-                       rc = qsd_transfer(env, osd->od_quota_slave,
+                       rc = qsd_transfer(env, osd_def_qsd(osd),
                                          &oh->ot_quota_trans, USRQUOTA,
                                          obj->oo_attr.la_uid, attr->la_uid,
                                          bspace, &info->oti_qi, true);
@@ -1169,7 +1169,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        if (attr && attr->la_valid & LA_GID) {
                /* quota enforcement for group */
                if (attr->la_gid != obj->oo_attr.la_gid) {
-                       rc = qsd_transfer(env, osd->od_quota_slave,
+                       rc = qsd_transfer(env, osd_def_qsd(osd),
                                          &oh->ot_quota_trans, GRPQUOTA,
                                          obj->oo_attr.la_gid, attr->la_gid,
                                          bspace, &info->oti_qi,
@@ -1201,7 +1201,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
 
                /* quota enforcement for project */
                if (attr->la_projid != obj->oo_attr.la_projid) {
-                       rc = qsd_transfer(env, osd->od_quota_slave,
+                       rc = qsd_transfer(env, osd_def_qsd(osd),
                                          &oh->ot_quota_trans, PRJQUOTA,
                                          obj->oo_attr.la_projid,
                                          attr->la_projid, bspace,
index 0146a7f..180d66d 100644 (file)
@@ -520,13 +520,20 @@ int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
 {
        struct osd_thread_info *info = osd_oti_get(env);
        struct lquota_id_info *qi = &info->oti_qi;
-       struct qsd_instance *qsd = osd->od_quota_slave;
+       struct qsd_instance *qsd = NULL;
        int rcu, rcg, rcp = 0; /* user & group & project rc */
        struct thandle *th = &oh->ot_super;
        bool force = !!(osd_qid_declare_flags & OSD_QID_FORCE) ||
                        th->th_ignore_quota;
        ENTRY;
 
+       if (osd_qid_declare_flags & OSD_QID_INODE)
+               qsd = osd->od_quota_slave_md;
+       else if (osd_qid_declare_flags & OSD_QID_BLK)
+               qsd = osd->od_quota_slave_dt;
+       else
+               RETURN(0);
+
        if (unlikely(qsd == NULL))
                /* quota slave instance hasn't been allocated yet */
                RETURN(0);
index c823873..77e69c8 100644 (file)
@@ -50,6 +50,7 @@
 
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
+#include <obd_class.h>
 #include "lquota_internal.h"
 
 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
@@ -381,7 +382,7 @@ struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
 {
        struct lquota_thread_info       *qti = lquota_info(env);
        struct dt_object                *slv_idx;
-       int                              rc;
+       int                              rc, type;
        ENTRY;
 
        LASSERT(uuid != NULL);
@@ -394,19 +395,29 @@ struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
        if (rc)
                RETURN(ERR_PTR(rc));
 
+       if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
+               type = LDD_F_SV_TYPE_MDT;
+       else
+               type = LDD_F_SV_TYPE_OST;
+
        /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
         * through the network */
        qti->qti_fid.f_seq = FID_SEQ_QUOTA;
        qti->qti_fid.f_ver = 0;
        if (local) {
-               int type;
+               int pool_type, qtype;
 
-               rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
+               rc = lquota_extract_fid(glb_fid, NULL, &pool_type, &qtype);
                if (rc)
                        RETURN(ERR_PTR(rc));
 
                /* use predefined fid in the reserved oid list */
-               qti->qti_fid.f_oid = qtype2slv_oid(type);
+               if ((type == LDD_F_SV_TYPE_MDT && pool_type == LQUOTA_RES_MD) ||
+                   (type == LDD_F_SV_TYPE_OST && pool_type == LQUOTA_RES_DT))
+                       qti->qti_fid.f_oid = qtype2slv_oid(qtype);
+               else
+                       qti->qti_fid.f_oid = pool_type << 16 |
+                                                       qtype2slv_oid(qtype);
 
                slv_idx = local_index_find_or_create_with_fid(env, dev,
                                                              &qti->qti_fid,
index ad82fd1..093dc38 100644 (file)
@@ -117,17 +117,26 @@ struct dt_object *acct_obj_lookup(const struct lu_env *env,
  *
  * \param env - is the environment passed by the caller
  * \param dev - is the dt_device storing the slave index object
+ * \param pool - is the pool type, either LQUOTA_RES_MD or LQUOTA_RES_DT
  * \param type - is the quota type, either USRQUOTA or GRPQUOTA
  */
 static struct dt_object *quota_obj_lookup(const struct lu_env *env,
-                                         struct dt_device *dev, int type)
+                                         struct dt_device *dev, int pool,
+                                         int type)
 {
        struct lquota_thread_info       *qti = lquota_info(env);
        struct dt_object                *obj = NULL;
+       int                              is_md;
        ENTRY;
 
+       is_md = lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev);
+       if ((is_md && pool == LQUOTA_RES_MD) ||
+           (!is_md && pool == LQUOTA_RES_DT))
+               qti->qti_fid.f_oid = qtype2slv_oid(type);
+       else
+               qti->qti_fid.f_oid = pool << 16 | qtype2slv_oid(type);
+
        qti->qti_fid.f_seq = FID_SEQ_QUOTA;
-       qti->qti_fid.f_oid = qtype2slv_oid(type);
        qti->qti_fid.f_ver = 0;
 
        /* lookup the quota object */
@@ -173,7 +182,7 @@ int lquotactl_slv(const struct lu_env *env, struct dt_device *dev,
 {
        struct lquota_thread_info       *qti = lquota_info(env);
        __u64                            key;
-       struct dt_object                *obj;
+       struct dt_object                *obj, *obj_aux = NULL;
        struct obd_dqblk                *dqblk = &oqctl->qc_dqblk;
        int                              rc;
        ENTRY;
@@ -215,7 +224,11 @@ int lquotactl_slv(const struct lu_env *env, struct dt_device *dev,
 
        /* Step 2: collect enforcement information */
 
-       obj = quota_obj_lookup(env, dev, oqctl->qc_type);
+       if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
+               obj = quota_obj_lookup(env, dev, LQUOTA_RES_MD, oqctl->qc_type);
+       else
+               obj = quota_obj_lookup(env, dev, LQUOTA_RES_DT, oqctl->qc_type);
+
        if (IS_ERR(obj))
                RETURN(0);
        if (obj->do_index_ops == NULL)
@@ -231,6 +244,24 @@ int lquotactl_slv(const struct lu_env *env, struct dt_device *dev,
        if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev)) {
                dqblk->dqb_ihardlimit = qti->qti_slv_rec.qsr_granted;
                dqblk->dqb_bhardlimit = 0;
+
+               obj_aux = quota_obj_lookup(env, dev, LQUOTA_RES_DT,
+                                          oqctl->qc_type);
+               if (IS_ERR(obj_aux)) {
+                       obj_aux = NULL;
+                       GOTO(out, rc = 0);
+               }
+
+               if (obj_aux->do_index_ops == NULL)
+                       GOTO(out, rc = 0);
+
+               memset(&qti->qti_slv_rec, 0, sizeof(qti->qti_slv_rec));
+               rc = dt_lookup(env, obj_aux, (struct dt_rec *)&qti->qti_slv_rec,
+                              (struct dt_key *)&key);
+               if (rc < 0 && rc != -ENOENT)
+                       GOTO(out, rc = 0);
+
+               dqblk->dqb_bhardlimit = qti->qti_slv_rec.qsr_granted;
        } else {
                dqblk->dqb_ihardlimit = 0;
                dqblk->dqb_bhardlimit = qti->qti_slv_rec.qsr_granted;
@@ -240,6 +271,8 @@ int lquotactl_slv(const struct lu_env *env, struct dt_device *dev,
        GOTO(out, rc = 0);
 out:
        dt_object_put(env, obj);
+       if (obj_aux != NULL)
+               dt_object_put(env, obj_aux);
        return rc;
 }
 EXPORT_SYMBOL(lquotactl_slv);
index 577e3cf..eb79812 100644 (file)
@@ -810,7 +810,7 @@ out_flags:
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
@@ -874,8 +874,6 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
index 5eb5a72..294538b 100644 (file)
@@ -641,7 +641,7 @@ EXPORT_SYMBOL(qsd_fini);
  */
 struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
                              struct dt_device *dev,
-                             struct proc_dir_entry *osd_proc)
+                             struct proc_dir_entry *osd_proc, bool is_md)
 {
        struct qsd_thread_info  *qti = qsd_info(env);
        struct qsd_instance     *qsd;
@@ -668,6 +668,7 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        INIT_LIST_HEAD(&qsd->qsd_adjust_list);
        qsd->qsd_prepared = false;
        qsd->qsd_started = false;
+       qsd->qsd_is_md = is_md;
 
        /* copy service name */
        if (strlcpy(qsd->qsd_svname, svname, sizeof(qsd->qsd_svname))
@@ -704,8 +705,18 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        mutex_unlock(&qsd->qsd_fsinfo->qfs_mutex);
 
        /* register procfs directory */
-       qsd->qsd_proc = lprocfs_register(QSD_DIR, osd_proc,
-                                        lprocfs_quota_qsd_vars, qsd);
+       if (qsd->qsd_is_md)
+               qsd->qsd_proc = lprocfs_register(QSD_DIR_MD, osd_proc,
+                                                lprocfs_quota_qsd_vars, qsd);
+       else
+               qsd->qsd_proc = lprocfs_register(QSD_DIR_DT, osd_proc,
+                                                lprocfs_quota_qsd_vars, qsd);
+
+       if (type == LDD_F_SV_TYPE_MDT && qsd->qsd_is_md)
+               lprocfs_add_symlink(QSD_DIR, osd_proc, "./%s", QSD_DIR_MD);
+       else if (type == LDD_F_SV_TYPE_OST && !qsd->qsd_is_md)
+               lprocfs_add_symlink(QSD_DIR, osd_proc, "./%s", QSD_DIR_DT);
+
        if (IS_ERR(qsd->qsd_proc)) {
                rc = PTR_ERR(qsd->qsd_proc);
                qsd->qsd_proc = NULL;
@@ -758,12 +769,10 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
 
        /* Record whether this qsd instance is managing quota enforcement for a
         * MDT (i.e. inode quota) or OST (block quota) */
-       if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) {
-               qsd->qsd_is_md = true;
+       if (qsd->qsd_is_md)
                qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_MD);
-       } else {
+       else
                qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_DT);
-       }
 
        /* look-up on-disk directory for the quota slave */
        qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL,
index 49403bc..fa8b6bf 100644 (file)
@@ -7835,6 +7835,7 @@ test_108b() {
                $rcmd mount -t ldiskfs -o loop $tmp/images/$facet \
                        $tmp/mnt/$facet ||
                        error "failed to local mount $facet"
+
                $rcmd tar jxf $LUSTRE/tests/zfs_${facet}_2_11.tar.bz2 \
                        --xattrs --xattrs-include="*.*" \
                        -C $tmp/mnt/$facet/ > /dev/null 2>&1 ||
index 4ecaa5e..174ccef 100755 (executable)
@@ -3320,6 +3320,118 @@ test_62() {
 }
 run_test 62 "Project inherit should be only changed by root"
 
+test_dom() {
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.55) ] &&
+               skip "Not supported before 2.11.55" && return
+
+       local qtype=$1
+       local qid=$TSTUSR
+       local dd_failed=false
+       local tdir_dom=${tdir}_dom
+       local LIMIT=20480 #20M
+
+       [ $qtype == "p" ] && ! is_project_quota_supported &&
+               echo "Project quota is not supported" && return 0
+
+       [ $qtype == "p" ] && qid=$TSTPRJID
+
+       setup_quota_test || error "setup quota failed with $?"
+       trap cleanup_quota_test EXIT
+
+       quota_init
+
+       # enable mdt/ost quota
+       set_mdt_qtype $QTYPE || error "enable mdt quota failed"
+       set_ost_qtype $QTYPE || error "enable ost quota failed"
+
+       # make sure the system is clean
+       local USED=$(getquota -$qtype $qid global curspace)
+       [ $USED -ne 0 ] && error "Used space for $qid isn't 0."
+
+       chown $TSTUSR.$TSTUSR $DIR/$tdir || error "chown $tdir failed"
+
+       mkdir $DIR/$tdir_dom || error "mkdir $tdir_dom failed"
+       $SETSTRIPE -E 1M -L mdt $DIR/$tdir_dom ||
+               error "setstripe $tdir_dom failed"
+       chown $TSTUSR.$TSTUSR $DIR/$tdir_dom || error "chown $tdir_dom failed"
+
+       [ $qtype == "p" ] && {
+               change_project -sp $TSTPRJID $DIR/$tdir
+               change_project -sp $TSTPRJID $DIR/$tdir_dom
+       }
+
+       $LFS setquota -$qtype $qid -b $LIMIT -B $LIMIT $DIR ||
+               error "set $qid quota failed"
+
+       for ((i = 0; i < $((LIMIT/2048)); i++)); do
+               $RUNAS $DD of=$DIR/$tdir_dom/$tfile-$i count=1 oflag=sync ||
+                                                               dd_failed=true
+       done
+
+       $dd_failed && quota_error $qtype $qid "write failed, expect succeed"
+
+       for ((i = $((LIMIT/2048)); i < $((LIMIT/1024 + 10)); i++)); do
+               $RUNAS $DD of=$DIR/$tdir_dom/$tfile-$i count=1 oflag=sync ||
+                                                               dd_failed=true
+       done
+
+       $dd_failed || quota_error $qtype $qid "write succeed, expect EDQUOT"
+
+       rm -f $DIR/$tdir_dom/*
+
+       # flush cache, ensure noquota flag is set on client
+       cancel_lru_locks osc
+       cancel_lru_locks mdc
+       sync; sync_all_data || true
+
+       dd_failed=false
+
+       $RUNAS $DD of=$DIR/$tdir/file count=$((LIMIT/2048)) oflag=sync ||
+               quota_error $qtype $qid "write failed, expect succeed"
+
+       for ((i = 0; i < $((LIMIT/2048 + 10)); i++)); do
+               $RUNAS $DD of=$DIR/$tdir_dom/$tfile-$i count=1 oflag=sync ||
+                                                               dd_failed=true
+       done
+
+       $dd_failed || quota_error $qtype $TSTID "write succeed, expect EDQUOT"
+
+       rm -f $DIR/$tdir/*
+       rm -f $DIR/$tdir_dom/*
+
+       # flush cache, ensure noquota flag is set on client
+       cancel_lru_locks osc
+       cancel_lru_locks mdc
+       sync; sync_all_data || true
+
+       dd_failed=false
+
+       for ((i = 0; i < $((LIMIT/2048)); i++)); do
+               $RUNAS $DD of=$DIR/$tdir_dom/$tfile-$i count=1 oflag=sync ||
+                                                               dd_failed=true
+       done
+
+       $dd_failed && quota_error $qtype $qid "write failed, expect succeed"
+
+       $RUNAS $DD of=$DIR/$tdir/file count=$((LIMIT/2048 + 10)) oflag=sync &&
+               quota_error $qtype $qid "write succeed, expect EDQUOT"
+
+       rm -f $DIR/$tdir/*
+       rm -fr $DIR/$tdir_dom
+
+       $LFS setquota -u $TSTUSR -b 0 -B 0 -i 0 -I 0 $DIR ||
+               error "reset usr quota failed"
+
+       cleanup_quota_test
+}
+
+test_63() {
+       test_dom "u"
+       test_dom "g"
+       test_dom "p"
+}
+run_test 63 "quota on DoM tests"
+
 quota_fini()
 {
        do_nodes $(comma_list $(nodes_list)) "lctl set_param debug=-quota"