Whamcloud - gitweb
LU-13881 pcc: comparator support for PCC rules 85/39585/20
authorQian Yingjin <qian@ddn.com>
Thu, 6 Aug 2020 08:29:21 +0000 (16:29 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 5 Jun 2024 04:48:06 +0000 (04:48 +0000)
There are increasing requirements for PCC rules to add comparator
support:
- File data larger or smaller than certain threshold should not
  auto cache in PCC (i.e. larger than the capacity of PCC backend
  on a client).
- Users can specify a range of UID/GID/ProjID for auto caching on
  PCC when define a rule;

In addition to the original equal (=) operator, this patch also
adds greater than (>) and less than (<) comparison operators.

The following rule expressions are supported:
- "projid={100}&size>{1M}&size<{500G}"
- "projid>{100}&projid<{110}"
- "uid<{1500}&uid>{1000}"

EX-2872 pcc: mtime rule for 'lctl pcc add'

Add an "mtime>N" rule to allow skipping files for PCC-RO auto-attach
if they were created or modified more than N seconds ago.  Otherwise,
it may be that files are added to the PCC cache before they finished
writing, or if they will be modified again quickly after creation.
Was-Change-Id: Ibb99bff5b483717ae6e5b83f82f1bcd86c3ebbe5

This patch disabled sanity-pcc/test_33 on rhel9.3 kernel until the
inconsistent LSOM problem is solved.

EX-bug-id: EX-2872
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I9f024eb6903f5652ba3cf04fa289456803493b2c
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/39585
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Li Xi <lixi@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/doc/lctl-pcc.8
lustre/llite/namei.c
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/obdclass/lprocfs_status.c
lustre/tests/sanity-pcc.sh

index fc75846..fd097a6 100644 (file)
@@ -27,6 +27,15 @@ example rule means that new files are only auto cached if the project ID is 500
 and the suffix of the file name is "h5". "rwid" represents the read-write
 attach id (2) which value is same as the archive ID of the copytool agent
 running on this PCC node.
+In addition to the equal (=) operator, it also supports greater than (>) and
+less than (<) comparison operators.
+The following rule expressions are supported:
+- "projid={100}&size>{1M}&size<{500G}"
+- "projid>{100}&projid<{110}"
+- "uid<{1500}&uid>{1000}"
+Currently each PCC backend only has one rule which is configed when setup PCC
+backend on a client. If a user wants to change the rule, the PCC backend needs
+to be removed first and then added back with a new rule.
 .TP
 .B lctl pcc del [\fB--keep\fR|\fB-k\fR] <\fImntpath\fR> <\fIpccpath\fR>
 Delete a PCC backend specified by path
index 8074eb6..35a2d11 100644 (file)
@@ -1223,6 +1223,8 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
                        item.pm_gid = from_kgid(&init_user_ns, current_gid());
                        item.pm_projid = ll_i2info(dir)->lli_projid;
                        item.pm_name = &dentry->d_name;
+                       item.pm_size = 0;
+                       item.pm_mtime = ktime_get_seconds();
                        dataset = pcc_dataset_match_get(&sbi->ll_pcc_super,
                                                        LU_PCC_READWRITE,
                                                        &item);
index 99db162..6a95b77 100644 (file)
@@ -130,27 +130,42 @@ int pcc_super_init(struct pcc_super *super)
 }
 
 /* Rule based auto caching */
-static void pcc_id_list_free(struct list_head *id_list)
+static void pcc_id_list_free(struct pcc_expression *expr)
 {
        struct pcc_match_id *id, *n;
 
-       list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
-               list_del_init(&id->pmi_linkage);
-               OBD_FREE_PTR(id);
+       if (expr->pe_opc == PCC_FIELD_OP_EQ) {
+               list_for_each_entry_safe(id, n, &expr->pe_cond, pmi_linkage) {
+                       list_del_init(&id->pmi_linkage);
+                       OBD_FREE_PTR(id);
+               }
        }
 }
 
-static void pcc_fname_list_free(struct list_head *fname_list)
+static void pcc_fname_list_free(struct pcc_expression *expr)
 {
        struct pcc_match_fname *fname, *n;
 
-       list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
+       LASSERT(expr->pe_opc == PCC_FIELD_OP_EQ);
+       list_for_each_entry_safe(fname, n, &expr->pe_cond, pmf_linkage) {
                OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
                list_del_init(&fname->pmf_linkage);
                OBD_FREE_PTR(fname);
        }
 }
 
+static void pcc_size_list_free(struct pcc_expression *expr)
+{
+       struct pcc_match_size *sz, *n;
+
+       if (expr->pe_opc == PCC_FIELD_OP_EQ) {
+               list_for_each_entry_safe(sz, n, &expr->pe_cond, pms_linkage) {
+                       list_del_init(&sz->pms_linkage);
+                       OBD_FREE_PTR(sz);
+               }
+       }
+}
+
 static void pcc_expression_free(struct pcc_expression *expr)
 {
        LASSERT(expr->pe_field >= PCC_FIELD_UID &&
@@ -159,10 +174,15 @@ static void pcc_expression_free(struct pcc_expression *expr)
        case PCC_FIELD_UID:
        case PCC_FIELD_GID:
        case PCC_FIELD_PROJID:
-               pcc_id_list_free(&expr->pe_cond);
+               pcc_id_list_free(expr);
                break;
        case PCC_FIELD_FNAME:
-               pcc_fname_list_free(&expr->pe_cond);
+               pcc_fname_list_free(expr);
+               break;
+       case PCC_FIELD_SIZE:
+               pcc_size_list_free(expr);
+               break;
+       case PCC_FIELD_MTIME:
                break;
        default:
                LBUG();
@@ -207,7 +227,9 @@ static void pcc_cmd_fini(struct pcc_cmd *cmd)
 
 #define PCC_DISJUNCTION_DELIM  (",")
 #define PCC_CONJUNCTION_DELIM  ("&")
-#define PCC_EXPRESSION_DELIM   ("=")
+#define PCC_EXPRESSION_DELIM_EQ        ("=")
+#define PCC_EXPRESSION_DELIM_LT        ("<")
+#define PCC_EXPRESSION_DELIM_GT        (">")
 
 static int
 pcc_fname_list_add(char *id, struct list_head *fname_list)
@@ -230,39 +252,32 @@ pcc_fname_list_add(char *id, struct list_head *fname_list)
 }
 
 static int
-pcc_fname_list_parse(char *str, struct list_head *fname_list)
+pcc_fname_list_parse(char *str, struct pcc_expression *expr)
 {
        int rc = 0;
 
        ENTRY;
 
-       INIT_LIST_HEAD(fname_list);
        while (rc == 0 && str) {
                char *fname = strsep(&str, " ");
 
                if (*fname)
-                       rc = pcc_fname_list_add(fname, fname_list);
+                       rc = pcc_fname_list_add(fname, &expr->pe_cond);
        }
-       if (list_empty(fname_list))
+       if (list_empty(&expr->pe_cond))
                rc = -EINVAL;
        if (rc)
-               pcc_fname_list_free(fname_list);
+               pcc_fname_list_free(expr);
        RETURN(rc);
 }
 
 static int
-pcc_id_list_parse(char *str, struct list_head *id_list,
-                 enum pcc_field type)
+pcc_id_list_parse(char *str, struct pcc_expression *expr)
 {
        int rc = 0;
 
        ENTRY;
 
-       if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
-           type != PCC_FIELD_PROJID)
-               RETURN(-EINVAL);
-
-       INIT_LIST_HEAD(id_list);
        while (str) {
                char *num;
                struct pcc_match_id *id;
@@ -280,20 +295,185 @@ pcc_id_list_parse(char *str, struct list_head *id_list,
                        GOTO(out, rc = -ENOMEM);
 
                id->pmi_id = id_val;
-               list_add_tail(&id->pmi_linkage, id_list);
+               list_add_tail(&id->pmi_linkage, &expr->pe_cond);
        }
-       if (list_empty(id_list))
+       if (list_empty(&expr->pe_cond))
                rc = -EINVAL;
 out:
        if (rc)
-               pcc_id_list_free(id_list);
+               pcc_id_list_free(expr);
+       RETURN(rc);
+}
+
+static int
+pcc_expr_id_parse(char *str, struct pcc_expression *expr)
+{
+       int rc;
+
+       ENTRY;
+
+       if (expr->pe_field != PCC_FIELD_UID &&
+           expr->pe_field != PCC_FIELD_GID &&
+           expr->pe_field != PCC_FIELD_PROJID)
+               RETURN(-EINVAL);
+
+       if (expr->pe_opc >= PCC_FIELD_OP_MAX)
+               RETURN(-EINVAL);
+
+       if (expr->pe_opc == PCC_FIELD_OP_EQ)
+               rc = pcc_id_list_parse(str, expr);
+       else {
+               unsigned long id;
+
+               rc = kstrtoul(str, 10, &id);
+               if (rc != 0)
+                       RETURN(-EINVAL);
+
+               if (id <= 0 || id >= (u32)~0U)
+                       RETURN(-EINVAL);
+
+               expr->pe_id = id;
+       }
+
        RETURN(rc);
 }
 
 static int
+pcc_size_list_parse(char *str, struct pcc_expression *expr)
+{
+       int rc = 0;
+
+       ENTRY;
+
+       while (rc == 0 && str) {
+               char *sz_str;
+               struct pcc_match_size *sz;
+               __u64 sz_val;
+
+               sz_str = strsep(&str, " ");
+               if (!*sz_str)
+                       continue;
+
+               rc = sysfs_memparse(sz_str, strlen(sz_str), &sz_val, "MiB");
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               OBD_ALLOC_PTR(sz);
+               if (sz == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               sz->pms_size = sz_val;
+               list_add_tail(&sz->pms_linkage, &expr->pe_cond);
+       }
+       if (list_empty(&expr->pe_cond))
+               rc = -EINVAL;
+out:
+       if (rc)
+               pcc_id_list_free(expr);
+       RETURN(rc);
+}
+
+static int
+pcc_expr_size_parse(char *str, struct pcc_expression *expr)
+{
+       if (expr->pe_opc == PCC_FIELD_OP_EQ)
+               return pcc_size_list_parse(str, expr);
+       else
+               return sysfs_memparse(str, strlen(str), &expr->pe_size, "MiB");
+}
+
+/*
+ * Parse relative file age timestamp, allowing suffix for ease of use:
+ * s = seconds, m = minutes, h = hours, d = days, w = weeks, y = years
+ */
+static int pcc_expr_time_parse(char *str, struct pcc_expression *expr)
+{
+       unsigned long mtime;
+       int len = strlen(str);
+       unsigned int mult = 1;
+       char buf[10];
+       int rc;
+
+       if (expr->pe_opc == PCC_FIELD_OP_EQ)
+               return -EOPNOTSUPP;
+
+       /* 1B seconds is enough, and avoids the need for overflow checking */
+       if (len > 10)
+               return -EOVERFLOW;
+
+       strncpy(buf, str, sizeof(buf));
+       rc = strspn(buf, "0123456789");
+       if (rc < len) {
+               switch (str[rc]) {
+               case 'y':
+                       mult *= 52;
+                       fallthrough;
+               case 'w':
+                       mult *= 7;
+                       fallthrough;
+               case 'd':
+                       mult *= 24;
+                       fallthrough;
+               case 'h':
+                       mult *= 60;
+                       fallthrough;
+               case 'm':
+                       mult *= 60;
+                       fallthrough;
+               case 's':
+                       break;
+               default:
+                       return -EINVAL;
+               }
+               buf[rc] = '\0';
+       }
+       rc = kstrtoul(buf, 10, &mtime);
+       if (!rc)
+               expr->pe_mtime = mtime * mult;
+
+       return rc;
+}
+
+static inline char *
+pcc_get_opcode_delim(enum pcc_field_op opc)
+{
+       switch (opc) {
+       case PCC_FIELD_OP_EQ:
+               return PCC_EXPRESSION_DELIM_EQ;
+       case PCC_FIELD_OP_LT:
+               return PCC_EXPRESSION_DELIM_LT;
+       case PCC_FIELD_OP_GT:
+               return PCC_EXPRESSION_DELIM_GT;
+       default:
+               LBUG();
+       }
+}
+
+static enum pcc_field_op
+pcc_get_field_opcode(char **src, char **field)
+{
+       int i;
+
+       ENTRY;
+
+       for (i = PCC_FIELD_OP_EQ; i < PCC_FIELD_OP_MAX; i++) {
+               char *tmp = *src;
+
+               *field = strim(strsep(&tmp, pcc_get_opcode_delim(i)));
+               if (**field && tmp) {
+                       *src = tmp;
+                       RETURN(i);
+               }
+       }
+
+       RETURN(PCC_FIELD_OP_INV);
+}
+
+static int
 pcc_expression_parse(char *str, struct list_head *cond_list)
 {
        struct pcc_expression *expr;
+       enum pcc_field_op opc;
        char *field;
        int len;
        int rc = 0;
@@ -302,8 +482,8 @@ pcc_expression_parse(char *str, struct list_head *cond_list)
        if (expr == NULL)
                return -ENOMEM;
 
-       field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
-       if (!*field || !str)
+       opc = pcc_get_field_opcode(&str, &field);
+       if (opc == PCC_FIELD_OP_INV)
                /* No LHS or no '=' */
                GOTO(out, rc = -EINVAL);
        str = skip_spaces(str);
@@ -315,31 +495,34 @@ pcc_expression_parse(char *str, struct list_head *cond_list)
        str[len - 1] = '\0';
        str += 1;
 
+       expr->pe_opc = opc;
+       INIT_LIST_HEAD(&expr->pe_cond);
        if (strcmp(field, "uid") == 0) {
-               if (pcc_id_list_parse(str,
-                                     &expr->pe_cond,
-                                     PCC_FIELD_UID) < 0)
-                       GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_UID;
+               rc = pcc_expr_id_parse(str, expr);
        } else if (strcmp(field, "gid") == 0) {
-               if (pcc_id_list_parse(str,
-                                     &expr->pe_cond,
-                                     PCC_FIELD_GID) < 0)
-                       GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_GID;
+               rc = pcc_expr_id_parse(str, expr);
        } else if (strcmp(field, "projid") == 0) {
-               if (pcc_id_list_parse(str,
-                                     &expr->pe_cond,
-                                     PCC_FIELD_PROJID) < 0)
-                       GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_PROJID;
-       } else if (strcmp(field, "fname") == 0) {
-               if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
+               rc = pcc_expr_id_parse(str, expr);
+       } else if (strcmp(field, "size") == 0) {
+               expr->pe_field = PCC_FIELD_SIZE;
+               rc = pcc_expr_size_parse(str, expr);
+       } else if (strcmp(field, "mtime") == 0) {
+               expr->pe_field = PCC_FIELD_MTIME;
+               rc = pcc_expr_time_parse(str, expr);
+       } else if (strcmp(field, "fname") == 0 ||
+                  strcmp(field, "filename") == 0) {
+               if (opc != PCC_FIELD_OP_EQ)
                        GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_FNAME;
+               rc = pcc_fname_list_parse(str, expr);
        } else {
                GOTO(out, rc = -EINVAL);
        }
+       if (rc < 0)
+               GOTO(out, rc);
 
        list_add_tail(&expr->pe_linkage, cond_list);
        return 0;
@@ -601,15 +784,68 @@ pcc_fname_list_match(struct list_head *fname_list, const char *name)
 }
 
 static int
+pcc_expr_id_match(struct pcc_expression *expr, __u32 id)
+{
+       switch (expr->pe_opc) {
+       case PCC_FIELD_OP_EQ:
+               return pcc_id_list_match(&expr->pe_cond, id);
+       case PCC_FIELD_OP_LT:
+               return id < expr->pe_id;
+       case PCC_FIELD_OP_GT:
+               return id > expr->pe_id;
+       default:
+               return 0;
+       }
+}
+
+static int
+pcc_size_list_match(struct list_head *id_list, __u64 sz_val)
+{
+       struct pcc_match_size *sz;
+
+       list_for_each_entry(sz, id_list, pms_linkage) {
+               if (sz->pms_size == sz_val)
+                       return 1;
+       }
+       return 0;
+}
+
+static int
+pcc_expr_size_match(struct pcc_expression *expr, __u64 sz)
+{
+       switch (expr->pe_opc) {
+       case PCC_FIELD_OP_EQ:
+               return pcc_size_list_match(&expr->pe_cond, sz);
+       case PCC_FIELD_OP_LT:
+               return sz < expr->pe_size;
+       case PCC_FIELD_OP_GT:
+               return sz > expr->pe_size;
+       default:
+               return 0;
+       }
+}
+
+static inline int
+pcc_expr_time_match(struct pcc_expression *expr, __u64 time)
+{
+       /* pe_mtime and pe_size are both __u64 in the same union */
+       return pcc_expr_size_match(expr, ktime_get_real_seconds() - time);
+}
+
+static int
 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
 {
        switch (expr->pe_field) {
        case PCC_FIELD_UID:
-               return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
+               return pcc_expr_id_match(expr, matcher->pm_uid);
        case PCC_FIELD_GID:
-               return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
+               return pcc_expr_id_match(expr, matcher->pm_gid);
        case PCC_FIELD_PROJID:
-               return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
+               return pcc_expr_id_match(expr, matcher->pm_projid);
+       case PCC_FIELD_SIZE:
+               return pcc_expr_size_match(expr, matcher->pm_size);
+       case PCC_FIELD_MTIME:
+               return pcc_expr_time_match(expr, matcher->pm_mtime);
        case PCC_FIELD_FNAME:
                return pcc_fname_list_match(&expr->pe_cond,
                                            matcher->pm_name->name);
@@ -1451,6 +1687,8 @@ static int pcc_try_readonly_open_attach(struct inode *inode, struct file *file,
        item.pm_gid = from_kgid(&init_user_ns, current_gid());
        item.pm_projid = ll_i2info(inode)->lli_projid;
        item.pm_name = &dentry->d_name;
+       item.pm_size = ll_i2info(inode)->lli_lazysize;
+       item.pm_mtime = inode->i_mtime.tv_sec;
        dataset = pcc_dataset_match_get(&ll_i2sbi(inode)->ll_pcc_super,
                                        LU_PCC_READONLY, &item);
        if (dataset == NULL)
index 8ab0f44..b059004 100644 (file)
@@ -48,6 +48,12 @@ struct pcc_match_id {
        struct list_head        pmi_linkage;
 };
 
+/* Lazy file size */
+struct pcc_match_size {
+       __u64                   pms_size;
+       struct list_head        pms_linkage;
+};
+
 /* wildcard file name */
 struct pcc_match_fname {
        char                    *pmf_name;
@@ -59,13 +65,29 @@ enum pcc_field {
        PCC_FIELD_GID,
        PCC_FIELD_PROJID,
        PCC_FIELD_FNAME,
+       PCC_FIELD_SIZE,
+       PCC_FIELD_MTIME,
        PCC_FIELD_MAX
 };
 
+enum pcc_field_op {
+       PCC_FIELD_OP_EQ         = 0,
+       PCC_FIELD_OP_LT         = 1,
+       PCC_FIELD_OP_GT         = 2,
+       PCC_FIELD_OP_MAX        = 3,
+       PCC_FIELD_OP_INV        = PCC_FIELD_MAX,
+};
+
 struct pcc_expression {
-       enum pcc_field          pe_field;
-       struct list_head        pe_cond;
        struct list_head        pe_linkage;
+       enum pcc_field          pe_field;
+       enum pcc_field_op       pe_opc;
+       union {
+               struct list_head        pe_cond;
+               __u64                   pe_size;  /* file size in bytes */
+               __u64                   pe_mtime; /* relative age in seconds */
+               __u32                   pe_id;    /* UID/GID/PROJID */
+       };
 };
 
 struct pcc_conjunction {
@@ -87,6 +109,8 @@ struct pcc_matcher {
        __u32            pm_uid;
        __u32            pm_gid;
        __u32            pm_projid;
+       __u64            pm_size;
+       __u64            pm_mtime;
        struct qstr     *pm_name;
 };
 
index 5437c9c..1a5564b 100644 (file)
@@ -1882,7 +1882,9 @@ int sysfs_memparse(const char *buffer, size_t count, u64 *val,
        char tmp_buf[23];
        int rc;
 
-       count = strlen(buffer);
+       if (count > strlen(buffer))
+               count = strlen(buffer);
+
        while (count > 0 && isspace(buffer[count - 1]))
                count--;
 
index c184f5d..1425cfc 100755 (executable)
@@ -59,6 +59,7 @@ if [[ -r /etc/redhat-release ]]; then
        if (( $(version_code $rhel_version) >= $(version_code 9.3.0) )); then
                always_except EX-8739 6 7a 7b 23    # PCC-RW
                always_except LU-17289 102          # fio io_uring
+               always_except LU-17781 33           # inconsistent LSOM
        fi
 fi
 
@@ -193,7 +194,7 @@ setup_pcc_mapping() {
 
        [ -z "$param" ] && param="projid={100}\ rwid=$HSM_ARCHIVE_NUMBER"
        stack_trap "cleanup_pcc_mapping $facet" EXIT
-       do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p $param ||
+       do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p "$param" ||
                error "Setup PCC backend $hsm_root on $MOUNT failed"
 }
 
@@ -2788,6 +2789,127 @@ test_32() {
 }
 run_test 32 "Test for RO-PCC when PCC copy is deleted"
 
+test_33() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/myfile.doc
+       local file2=$DIR2/myfile.doc
+
+       $LCTL get_param -n mdc.*.connect_flags | grep -q pcc_ro ||
+               skip "Server does not support PCC-RO"
+
+       stack_trap "restore_opencache" EXIT
+       disable_opencache
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+
+       setup_pcc_mapping $SINGLEAGT \
+               "fname={*.doc}\&size\<{1M}\ roid=$HSM_ARCHIVE_NUMBER\ pccro=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       touch $file || error "touch $file failed"
+       $TRUNCATE $file $((1048576 * 2)) || error "Truncate $file failed"
+       check_lpcc_state $file "none"
+       do_facet $SINGLEAGT $LFS pcc state $file
+       $TRUNCATE $file $((1048576 / 2)) || error "Truncate $file failed"
+       do_facet $SINGLEAGT $LFS pcc state $file
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+
+       setup_pcc_mapping $SINGLEAGT \
+               "fname={*.doc}\&size\<{5M}\&size\>{3M}\ roid=5\ pccro=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       check_lpcc_state $file "none"
+       $TRUNCATE $file2 $((1048576 * 6)) || error "Truncate $file2 failed"
+       check_lpcc_state $file "none"
+       $TRUNCATE $file2 $((1048576 * 4)) || error "Truncate $file2 failed"
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+
+       setup_pcc_mapping $SINGLEAGT \
+               "fname={*.doc}\&size={5M\ 3M}\ roid=5\ pccro=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       do_facet $SINGLEAGT $MULTIOP $file oc ||
+               error "failed to readonly open $file"
+       check_lpcc_state $file "none"
+       $TRUNCATE $file $((1048576 * 5)) || error "Truncate $file failed"
+       do_facet $SINGLEAGT $MULTIOP $file oc ||
+               error "failed to readonly open $file"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach $file"
+       $TRUNCATE $file $((1048576 * 4)) || error "Truncate $file failed"
+       do_facet $SINGLEAGT $MULTIOP $file oc ||
+               error "failed to readonly open $file"
+       check_lpcc_state $file "none"
+       $TRUNCATE $file $((1048576 * 3)) || error "Truncate $file failed"
+       do_facet $SINGLEAGT $MULTIOP $file oc ||
+               error "failed to readonly open $file"
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+}
+run_test 33 "Cache rule with comparator (>, =, <) for file size"
+
+test_34() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       $LCTL get_param -n mdc.*.connect_flags | grep -q pcc_ro ||
+               skip "Server does not support PCC-RO"
+
+       ! is_project_quota_supported &&
+               skip "project quota is not supported"
+
+       enable_project_quota
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+
+       setup_pcc_mapping $SINGLEAGT \
+               "projid\>{100}\ roid=5\ ropcc=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       do_facet $SINGLEAGT "echo -n QQQQQ > $file" ||
+               error "failed to write $file"
+       check_lpcc_state $file "none"
+       $LFS project -p 99 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "none"
+       $LFS project -p 101 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+
+       setup_pcc_mapping $SINGLEAGT \
+               "projid\<{100}\ roid=5\ ropcc=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       check_lpcc_state $file "none"
+       $LFS project -p 102 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "none"
+       $LFS project -p 99 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+
+       setup_pcc_mapping $SINGLEAGT \
+               "projid\<{120}\&projid\>{110}\ roid=5\ ropcc=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+       check_lpcc_state $file "none"
+       $LFS project -p 105 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "none"
+       $LFS project -p 121 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "none"
+       $LFS project -p 115 $file || error "failed to set project for $file"
+       $LFS project -d $file
+       check_lpcc_state $file "readonly"
+       cleanup_pcc_mapping
+}
+run_test 34 "Cache rule with comparator (>, <) for Project ID range"
+
 test_36_base() {
        local loopfile="$TMP/$tfile"
        local mntpt="/mnt/pcc.$tdir"
@@ -2831,6 +2953,34 @@ test_36b() {
 }
 run_test 36b "Stale RO-PCC copy should be deleted after remove the PCC backend"
 
+test_41() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       $LCTL get_param -n mdc.*.connect_flags | grep -q pcc_ro ||
+               skip "Server does not support PCC-RO"
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       do_facet $SINGLEAGT mkdir $hsm_root || error "mkdir $hsm_root failed"
+       setup_pcc_mapping $SINGLEAGT \
+               "mtime\>{1m}\ roid=$HSM_ARCHIVE_NUMBER\ ropcc=1"
+       do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+
+       echo "pcc_ro_data" > $file || error "echo $file failed"
+       do_facet $SINGLEAGT cat $file || error "cat $file failed"
+       check_lpcc_state $file "none"
+
+       local mtime=$(date -d "2min ago" +%s)
+
+       do_facet $SINGLEAGT touch -m -d @$mtime $file ||
+               error "failed to change mtime for $file $mtime"
+       do_facet $SINGLEAGT cat $file || error "cat $file failed"
+       check_lpcc_state $file "readonly"
+}
+run_test 41 "Test mtime rule for PCC-RO open attach with O_RDONLY mode"
+
 #test 101: containers and PCC
 #LU-15170: Test mount namespaces with PCC
 #This tests the cases where the PCC mount is not present in the container by