Whamcloud - gitweb
LU-10918 llite: Rule based auto PCC caching when create files 51/34751/15
authorQian Yingjin <qian@ddn.com>
Wed, 24 Apr 2019 09:51:25 +0000 (17:51 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 13 Jun 2019 04:34:15 +0000 (04:34 +0000)
Configurable rule based auto PCC caching for newly created files
can significantly benefit users for readwrite PCC. It can
determine which file can use a cache on PCC directly without any
admission control for high priority user/group/project or filename
with wildcard support. Meanwhile, we can enforce a quota limitation
of capacity usage for each user/group/project to providing caching
isolation.

Similar to NRS TBF command line, it supports logical conditional
conjunction and disjunction operations among different user/group/
project or filename with the wildcard support.

The command line to add this kind of rule is as follow:
lctl pcc add /mnt/lustre /mnt/pcc
"projid={500 1000}&fname={*.h5},uid={1001} rwid=1 roid=1"
It means that Project ID of 500, 1000 AND file suffix name is "h5"
OR User ID is 1001 can be auto cached on PCC for newly create file
on the client. "rwid" means RW-PCC attach ID (which is
usually archive ID); "roid" means RO-PCC attach ID. By defualt,
RO-PCC attach id is setting same with RW-PCC attach ID for a
shared PCC backend.

Test-Parameters: clientcount=3 testlist=sanity-pcc,sanity-pcc,sanity-pcc
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I628975b3e097e98d6b93f1c6acd855aaacdaa8b3
Reviewed-on: https://review.whamcloud.com/34751
Reviewed-by: Li Xi <lixi@ddn.com>
Reviewed-by: Patrick Farrell <pfarrell@whamcloud.com>
Tested-by: Jenkins
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/doc/lctl-pcc.8
lustre/doc/lfs-pcc.1
lustre/llite/namei.c
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/tests/sanity-pcc.sh

index 45ba840..759a670 100644 (file)
@@ -18,11 +18,15 @@ on a Lustre filesystem client instances with the mount point referenced by
 .IR mntpath .
 The parameter
 .IR param
-is a string to config the PCC backend such as read-write attach id (archive ID)
-and auto-caching project id. i.e. the string "2 100" means that the read-write
-attach id is 2, and the project ID is 100 for the PCC backend. On this client
-any subsequently created files with the project ID of 100 will be persistently
-cached automatically.
+is a string in the form of name-value pairs to config the PCC backend such as
+read-write attach id (archive ID) or read-only attach id and auto caching rule.
+i.e. for the string "projid={500}&fname={*.h5} rwid=2", the first substring of
+the config parameter is the auto caching rule. Where "&" represents the logical
+conjunction operator while "," represents the logical disjunction operator. The
+example rule means that new files are only auto cached if the project ID is 500
+and the suffix of the file name is "h5". "rwid" represents the read-write
+attach id (2) which value is same as the archive ID of the copytool agent
+running on this PCC node.
 .TP
 .B lctl pcc del <\fImntpath\fR> <\fIpccpath\fR>
 Delete a PCC backend specified by path
index c709c8d..56a760f 100644 (file)
@@ -34,14 +34,19 @@ Display the PCC state for given files.
 For RW-PCC, it is HSM ARCHIVE ID to choose which backend for cache files.
 .TP
 .B --mnt | -m
-Specifies Lustre mount point.
+Specify the Lustre mount point.
 .TP
 Before using RW-PCC, you need to configure HSM root and Archive ID mapping properly:
 .TP
-.B lfs pcc add $MNTPATH $PCCPATH \ "$ARCHIVE_ID $PROJID"
-Add one PCC backend to the Lustre client, you need to specify hsm root,
-archive ID, and project ID. On this client any subsequently created
-files with this project ID will be persistently cached automatically.
+.B lfs pcc add $MNTPATH $PCCPATH \ "$PARAM"
+Add one PCC backend to the Lustre client. For RW-PCC, when a file is being
+created, a rule-based policy is used to determine whether it will be cached.
+The rule expression supports logical conditional conjunction and disjunction
+operations among different users, groups, projects, or filenames including
+wildcards. You need to specify auto create caching rule and archive ID in
+.B $PARAM.
+On this client any subsequently created files matching the condition of auto
+caching rule will be persistently cached automatically.
 .TP
 .B lfs pcc del $MNTPATH $PCCPATH
  Delete one PCC backend
@@ -57,8 +62,14 @@ Enable HSM on the appropriate MDT.
 .B # lhsmtool_posix --daemon --hsm-root /mnt/pcc/ --archive=1 /mnt/lustre
 Launch one copytool on client node to connect cache storage.
 .TP
-.B # lfs pcc add /mnt/lustre /mnt/pcc \ "1\ 100"
-Add HSM root and Archive ID mapping for RW-PCC.
+.B # lfs pcc add /mnt/lustre /mnt/pcc \ "projid={500,1000}&fname={*.h5},uid=1001 rwid=1"
+Add HSM root and Archive ID (referenced by
+.IB rwid
+name-value pair) mapping for RW-PCC. Where "&" represents the logical
+conjunction operator while "," represents the logical disjunction operator.
+The example rule means that new files are only auto cached if the project ID is
+either 500 or 1000 and the suffix of the file name is “h5” or the user ID is
+1001.
 .TP
 .B $ lfs pcc attach -i 1 /mnt/lustre/file
 Attach an existing file into PCC and migrate data from lustre to Cache Device,
index 0464951..901b213 100644 (file)
@@ -835,7 +835,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                lum->lmm_pattern = LOV_PATTERN_F_RELEASED | LOV_PATTERN_RAID0;
                op_data->op_data = lum;
                op_data->op_data_size = sizeof(*lum);
-               op_data->op_archive_id = dataset->pccd_id;
+               op_data->op_archive_id = dataset->pccd_rwid;
 
                rc = obd_fid_alloc(NULL, ll_i2mdexp(parent), &op_data->op_fid2,
                                   op_data);
@@ -1004,9 +1004,14 @@ static int ll_atomic_open(struct inode *dir, struct dentry *dentry,
                /* Volatile file is used for HSM restore, so do not use PCC */
                if (!filename_is_volatile(dentry->d_name.name,
                                          dentry->d_name.len, NULL)) {
-                       dataset = pcc_dataset_get(&sbi->ll_pcc_super,
-                                                 ll_i2info(dir)->lli_projid,
-                                                 0);
+                       struct pcc_matcher item;
+
+                       item.pm_uid = from_kuid(&init_user_ns, current_uid());
+                       item.pm_gid = from_kgid(&init_user_ns, current_gid());
+                       item.pm_projid = ll_i2info(dir)->lli_projid;
+                       item.pm_name = &dentry->d_name;
+                       dataset = pcc_dataset_match_get(&sbi->ll_pcc_super,
+                                                       &item);
                        pca.pca_dataset = dataset;
                }
        }
index eca1f16..32e5f65 100644 (file)
@@ -128,23 +128,538 @@ int pcc_super_init(struct pcc_super *super)
        return 0;
 }
 
+/* Rule based auto caching */
+static void pcc_id_list_free(struct list_head *id_list)
+{
+       struct pcc_match_id *id, *n;
+
+       list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
+               list_del_init(&id->pmi_linkage);
+               OBD_FREE_PTR(id);
+       }
+}
+
+static void pcc_fname_list_free(struct list_head *fname_list)
+{
+       struct pcc_match_fname *fname, *n;
+
+       list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
+               OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
+               list_del_init(&fname->pmf_linkage);
+               OBD_FREE_PTR(fname);
+       }
+}
+
+static void pcc_expression_free(struct pcc_expression *expr)
+{
+       LASSERT(expr->pe_field >= PCC_FIELD_UID &&
+               expr->pe_field < PCC_FIELD_MAX);
+       switch (expr->pe_field) {
+       case PCC_FIELD_UID:
+       case PCC_FIELD_GID:
+       case PCC_FIELD_PROJID:
+               pcc_id_list_free(&expr->pe_cond);
+               break;
+       case PCC_FIELD_FNAME:
+               pcc_fname_list_free(&expr->pe_cond);
+               break;
+       default:
+               LBUG();
+       }
+       OBD_FREE_PTR(expr);
+}
+
+static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
+{
+       struct pcc_expression *expression, *n;
+
+       LASSERT(list_empty(&conjunction->pc_linkage));
+       list_for_each_entry_safe(expression, n,
+                                &conjunction->pc_expressions,
+                                pe_linkage) {
+               list_del_init(&expression->pe_linkage);
+               pcc_expression_free(expression);
+       }
+       OBD_FREE_PTR(conjunction);
+}
+
+static void pcc_rule_conds_free(struct list_head *cond_list)
+{
+       struct pcc_conjunction *conjunction, *n;
+
+       list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
+               list_del_init(&conjunction->pc_linkage);
+               pcc_conjunction_free(conjunction);
+       }
+}
+
+static void pcc_cmd_fini(struct pcc_cmd *cmd)
+{
+       if (cmd->pccc_cmd == PCC_ADD_DATASET) {
+               if (!list_empty(&cmd->u.pccc_add.pccc_conds))
+                       pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
+               if (cmd->u.pccc_add.pccc_conds_str)
+                       OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
+                                strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
+       }
+}
+
+#define PCC_DISJUNCTION_DELIM  (',')
+#define PCC_CONJUNCTION_DELIM  ('&')
+#define PCC_EXPRESSION_DELIM   ('=')
+
+static int
+pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
+{
+       struct pcc_match_fname *fname;
+
+       OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
+       if (fname == NULL)
+               return -ENOMEM;
+
+       OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
+       if (fname->pmf_name == NULL) {
+               OBD_FREE(fname, sizeof(struct pcc_match_fname));
+               return -ENOMEM;
+       }
+
+       memcpy(fname->pmf_name, id->ls_str, id->ls_len);
+       list_add_tail(&fname->pmf_linkage, fname_list);
+       return 0;
+}
+
+static int
+pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       ENTRY;
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(fname_list);
+       while (src.ls_str) {
+               rc = cfs_gettok(&src, ' ', &res);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_fname_list_add(&res, fname_list);
+               if (rc)
+                       break;
+       }
+       if (rc)
+               pcc_fname_list_free(fname_list);
+       RETURN(rc);
+}
+
+static int
+pcc_id_list_parse(char *str, int len, struct list_head *id_list,
+                 enum pcc_field type)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       ENTRY;
+
+       if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
+           type != PCC_FIELD_PROJID)
+               RETURN(-EINVAL);
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(id_list);
+       while (src.ls_str) {
+               struct pcc_match_id *id;
+               __u32 id_val;
+
+               if (cfs_gettok(&src, ' ', &res) == 0)
+                       GOTO(out, rc = -EINVAL);
+
+               if (!cfs_str2num_check(res.ls_str, res.ls_len,
+                                      &id_val, 0, (u32)~0U))
+                       GOTO(out, rc = -EINVAL);
+
+               OBD_ALLOC_PTR(id);
+               if (id == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               id->pmi_id = id_val;
+               list_add_tail(&id->pmi_linkage, id_list);
+       }
+out:
+       if (rc)
+               pcc_id_list_free(id_list);
+       RETURN(rc);
+}
+
+static inline bool
+pcc_check_field(struct cfs_lstr *field, char *str)
+{
+       int len = strlen(str);
+
+       return (field->ls_len == len &&
+               strncmp(field->ls_str, str, len) == 0);
+}
+
+static int
+pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
+{
+       struct pcc_expression *expr;
+       struct cfs_lstr field;
+       int rc = 0;
+
+       OBD_ALLOC(expr, sizeof(struct pcc_expression));
+       if (expr == NULL)
+               return -ENOMEM;
+
+       rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
+       if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
+           src->ls_str[src->ls_len - 1] != '}')
+               GOTO(out, rc = -EINVAL);
+
+       /* Skip '{' and '}' */
+       src->ls_str++;
+       src->ls_len -= 2;
+
+       if (pcc_check_field(&field, "uid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_UID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_UID;
+       } else if (pcc_check_field(&field, "gid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_GID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_GID;
+       } else if (pcc_check_field(&field, "projid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_PROJID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_PROJID;
+       } else if (pcc_check_field(&field, "fname")) {
+               if (pcc_fname_list_parse(src->ls_str,
+                                        src->ls_len,
+                                        &expr->pe_cond) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_FNAME;
+       } else {
+               GOTO(out, rc = -EINVAL);
+       }
+
+       list_add_tail(&expr->pe_linkage, cond_list);
+       return 0;
+out:
+       OBD_FREE_PTR(expr);
+       return rc;
+}
+
+static int
+pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
+{
+       struct pcc_conjunction *conjunction;
+       struct cfs_lstr expr;
+       int rc = 0;
+
+       OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
+       if (conjunction == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&conjunction->pc_expressions);
+       list_add_tail(&conjunction->pc_linkage, cond_list);
+
+       while (src->ls_str) {
+               rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_expression_parse(&expr,
+                                         &conjunction->pc_expressions);
+               if (rc)
+                       break;
+       }
+       return rc;
+}
+
+static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(cond_list);
+       while (src.ls_str) {
+               rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_conjunction_parse(&res, cond_list);
+               if (rc)
+                       break;
+       }
+       return rc;
+}
+
+static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
+{
+       int rc;
+
+       OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
+       if (cmd->u.pccc_add.pccc_conds_str == NULL)
+               return -ENOMEM;
+
+       memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
+
+       rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
+                            strlen(cmd->u.pccc_add.pccc_conds_str),
+                            &cmd->u.pccc_add.pccc_conds);
+       if (rc)
+               pcc_cmd_fini(cmd);
+
+       return rc;
+}
+
+static int
+pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
+{
+       char *key, *val;
+       unsigned long id;
+       int rc;
+
+       val = buffer;
+       key = strsep(&val, "=");
+       if (val == NULL || strlen(val) == 0)
+               return -EINVAL;
+
+       /* Key of the value pair */
+       if (strcmp(key, "rwid") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id <= 0)
+                       return -EINVAL;
+               cmd->u.pccc_add.pccc_rwid = id;
+       } else if (strcmp(key, "roid") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id <= 0)
+                       return -EINVAL;
+               cmd->u.pccc_add.pccc_roid = id;
+       } else {
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static int
+pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
+{
+       char *val;
+       char *token;
+       int rc;
+
+       val = buffer;
+       while (val != NULL && strlen(val) != 0) {
+               token = strsep(&val, " ");
+               rc = pcc_parse_value_pair(cmd, token);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+static void
+pcc_dataset_rule_fini(struct pcc_match_rule *rule)
+{
+       if (!list_empty(&rule->pmr_conds))
+               pcc_rule_conds_free(&rule->pmr_conds);
+       LASSERT(rule->pmr_conds_str != NULL);
+       OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
+}
+
+static int
+pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
+{
+       int rc = 0;
+
+       LASSERT(cmd->u.pccc_add.pccc_conds_str);
+       OBD_ALLOC(rule->pmr_conds_str,
+                 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
+       if (rule->pmr_conds_str == NULL)
+               return -ENOMEM;
+
+       memcpy(rule->pmr_conds_str,
+              cmd->u.pccc_add.pccc_conds_str,
+              strlen(cmd->u.pccc_add.pccc_conds_str));
+
+       INIT_LIST_HEAD(&rule->pmr_conds);
+       if (!list_empty(&cmd->u.pccc_add.pccc_conds))
+               rc = pcc_conds_parse(rule->pmr_conds_str,
+                                         strlen(rule->pmr_conds_str),
+                                         &rule->pmr_conds);
+
+       if (rc)
+               pcc_dataset_rule_fini(rule);
+
+       return rc;
+}
+
+/* Rule Matching */
+static int
+pcc_id_list_match(struct list_head *id_list, __u32 id_val)
+{
+       struct pcc_match_id *id;
+
+       list_for_each_entry(id, id_list, pmi_linkage) {
+               if (id->pmi_id == id_val)
+                       return 1;
+       }
+       return 0;
+}
+
+static bool
+cfs_match_wildcard(const char *pattern, const char *content)
+{
+       if (*pattern == '\0' && *content == '\0')
+               return true;
+
+       if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
+               return false;
+
+       while (*pattern == *content) {
+               pattern++;
+               content++;
+               if (*pattern == '\0' && *content == '\0')
+                       return true;
+
+               if (*pattern == '*' && *(pattern + 1) != '\0' &&
+                   *content == '\0')
+                       return false;
+       }
+
+       if (*pattern == '*')
+               return (cfs_match_wildcard(pattern + 1, content) ||
+                       cfs_match_wildcard(pattern, content + 1));
+
+       return false;
+}
+
+static int
+pcc_fname_list_match(struct list_head *fname_list, const char *name)
+{
+       struct pcc_match_fname *fname;
+
+       list_for_each_entry(fname, fname_list, pmf_linkage) {
+               if (cfs_match_wildcard(fname->pmf_name, name))
+                       return 1;
+       }
+       return 0;
+}
+
+static int
+pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
+{
+       switch (expr->pe_field) {
+       case PCC_FIELD_UID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
+       case PCC_FIELD_GID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
+       case PCC_FIELD_PROJID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
+       case PCC_FIELD_FNAME:
+               return pcc_fname_list_match(&expr->pe_cond,
+                                           matcher->pm_name->name);
+       default:
+               return 0;
+       }
+}
+
+static int
+pcc_conjunction_match(struct pcc_conjunction *conjunction,
+                     struct pcc_matcher *matcher)
+{
+       struct pcc_expression *expr;
+       int matched;
+
+       list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
+               matched = pcc_expression_match(expr, matcher);
+               if (!matched)
+                       return 0;
+       }
+
+       return 1;
+}
+
+static int
+pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
+{
+       struct pcc_conjunction *conjunction;
+       int matched;
+
+       list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
+               matched = pcc_conjunction_match(conjunction, matcher);
+               if (matched)
+                       return 1;
+       }
+
+       return 0;
+}
+
+struct pcc_dataset*
+pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
+{
+       struct pcc_dataset *dataset;
+       struct pcc_dataset *selected = NULL;
+
+       spin_lock(&super->pccs_lock);
+       list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
+               if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
+                       atomic_inc(&dataset->pccd_refcount);
+                       selected = dataset;
+                       break;
+               }
+       }
+       spin_unlock(&super->pccs_lock);
+       if (selected)
+               CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
+                      dataset->pccd_rule.pmr_conds_str,
+                      matcher->pm_uid, matcher->pm_gid,
+                      matcher->pm_projid, matcher->pm_name->name);
+
+       return selected;
+}
+
 /**
  * pcc_dataset_add - Add a Cache policy to control which files need be
  * cached and where it will be cached.
  *
- * @super: superblock of pcc
- * @pathname: root path of pcc
- * @id: HSM archive ID
- * @projid: files with specified project ID will be cached.
+ * @super:     superblock of pcc
+ * @cmd:       pcc command
  */
 static int
-pcc_dataset_add(struct pcc_super *super, const char *pathname,
-               __u32 archive_id, __u32 projid)
+pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
 {
-       int rc;
+       char *pathname = cmd->pccc_pathname;
        struct pcc_dataset *dataset;
        struct pcc_dataset *tmp;
        bool found = false;
+       int rc;
 
        OBD_ALLOC_PTR(dataset);
        if (dataset == NULL)
@@ -156,13 +671,23 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname,
                return rc;
        }
        strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
-       dataset->pccd_id = archive_id;
-       dataset->pccd_projid = projid;
+       dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
+       dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
        atomic_set(&dataset->pccd_refcount, 1);
 
+       rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
+       if (rc) {
+               pcc_dataset_put(dataset);
+               return rc;
+       }
+
        spin_lock(&super->pccs_lock);
        list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
-               if (tmp->pccd_id == archive_id) {
+               if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
+                   (dataset->pccd_rwid != 0 &&
+                    dataset->pccd_rwid == tmp->pccd_rwid) ||
+                   (dataset->pccd_roid != 0 &&
+                    dataset->pccd_roid == tmp->pccd_roid)) {
                        found = true;
                        break;
                }
@@ -180,23 +705,21 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname,
 }
 
 struct pcc_dataset *
-pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
+pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
 {
        struct pcc_dataset *dataset;
        struct pcc_dataset *selected = NULL;
 
-       if (projid == 0 && archive_id == 0)
+       if (id == 0)
                return NULL;
 
        /*
-        * archive ID is unique in the list, projid might be duplicate,
+        * archive ID (read-write ID) or read-only ID is unique in the list,
         * we just return last added one as first priority.
         */
        spin_lock(&super->pccs_lock);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
-               if (projid && dataset->pccd_projid != projid)
-                       continue;
-               if (archive_id && dataset->pccd_id != archive_id)
+               if (type == LU_PCC_READWRITE && dataset->pccd_rwid != id)
                        continue;
                atomic_inc(&dataset->pccd_refcount);
                selected = dataset;
@@ -204,8 +727,8 @@ pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
        }
        spin_unlock(&super->pccs_lock);
        if (selected)
-               CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
-                      selected->pccd_projid);
+               CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
+
        return selected;
 }
 
@@ -213,6 +736,7 @@ void
 pcc_dataset_put(struct pcc_dataset *dataset)
 {
        if (atomic_dec_and_test(&dataset->pccd_refcount)) {
+               pcc_dataset_rule_fini(&dataset->pccd_rule);
                path_put(&dataset->pccd_path);
                OBD_FREE_PTR(dataset);
        }
@@ -243,8 +767,8 @@ static void
 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
 {
        seq_printf(m, "%s:\n", dataset->pccd_pathname);
-       seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
-       seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
+       seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
+       seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
 }
 
 int
@@ -292,7 +816,6 @@ pcc_cmd_parse(char *buffer, unsigned long count)
        static struct pcc_cmd *cmd;
        char *token;
        char *val;
-       unsigned long tmp;
        int rc = 0;
 
        OBD_ALLOC_PTR(cmd);
@@ -326,28 +849,35 @@ pcc_cmd_parse(char *buffer, unsigned long count)
        cmd->pccc_pathname = token;
 
        if (cmd->pccc_cmd == PCC_ADD_DATASET) {
-               /* archive ID */
-               token = strsep(&val, " ");
-               if (val == NULL)
+               /* List of ID */
+               LASSERT(val);
+               token = val;
+               val = strrchr(token, '}');
+               if (!val)
                        GOTO(out_free_cmd, rc = -EINVAL);
 
-               rc = kstrtoul(token, 10, &tmp);
-               if (rc != 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               if (tmp == 0)
+               /* Skip '}' */
+               val++;
+               if (*val == '\0') {
+                       val = NULL;
+               } else if (*val == ' ') {
+                       *val = '\0';
+                       val++;
+               } else {
                        GOTO(out_free_cmd, rc = -EINVAL);
-               cmd->u.pccc_add.pccc_id = tmp;
+               }
 
-               token = val;
-               rc = kstrtoul(token, 10, &tmp);
-               if (rc != 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               if (tmp == 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               cmd->u.pccc_add.pccc_projid = tmp;
-       }
+               rc = pcc_id_parse(cmd, token);
+               if (rc)
+                       GOTO(out_free_cmd, rc);
 
+               rc = pcc_parse_value_pairs(cmd, val);
+               if (rc)
+                       GOTO(out_cmd_fini, rc = -EINVAL);
+       }
        goto out;
+out_cmd_fini:
+       pcc_cmd_fini(cmd);
 out_free_cmd:
        OBD_FREE_PTR(cmd);
 out:
@@ -368,9 +898,7 @@ int pcc_cmd_handle(char *buffer, unsigned long count,
 
        switch (cmd->pccc_cmd) {
        case PCC_ADD_DATASET:
-               rc = pcc_dataset_add(super, cmd->pccc_pathname,
-                                     cmd->u.pccc_add.pccc_id,
-                                     cmd->u.pccc_add.pccc_projid);
+               rc = pcc_dataset_add(super, cmd);
                break;
        case PCC_DEL_DATASET:
                rc = pcc_dataset_del(super, cmd->pccc_pathname);
@@ -383,6 +911,7 @@ int pcc_cmd_handle(char *buffer, unsigned long count,
                break;
        }
 
+       pcc_cmd_fini(cmd);
        OBD_FREE_PTR(cmd);
        return rc;
 }
@@ -1126,7 +1655,8 @@ static int pcc_inode_remove(struct pcc_inode *pcci)
        dentry = pcci->pcci_path.dentry;
        rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
        if (rc)
-               CWARN("failed to unlink cached file, rc = %d\n", rc);
+               CWARN("failed to unlink PCC file %.*s, rc = %d\n",
+                     dentry->d_name.len, dentry->d_name.name, rc);
 
        return rc;
 }
@@ -1328,7 +1858,10 @@ out_unlock:
 
                rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
                if (rc2)
-                       CWARN("failed to unlink PCC file, rc = %d\n", rc2);
+                       CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
+                             ll_i2sbi(inode)->ll_fsname,
+                             pcc_dentry->d_name.len, pcc_dentry->d_name.name,
+                             rc2);
 
                dput(pcc_dentry);
        }
@@ -1434,8 +1967,8 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
        if (rc)
                RETURN(rc);
 
-       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
-                                 archive_id);
+       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
+                                 LU_PCC_READWRITE, archive_id);
        if (dataset == NULL)
                RETURN(-ENOENT);
 
@@ -1495,7 +2028,9 @@ out_dentry:
                rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
                revert_creds(old_cred);
                if (rc2)
-                       CWARN("failed to unlink PCC file, rc = %d\n", rc2);
+                       CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
+                             ll_i2sbi(inode)->ll_fsname, dentry->d_name.len,
+                             dentry->d_name.name, rc2);
 
                dput(dentry);
        }
index 7e4996e..0840a0b 100644 (file)
@@ -42,13 +42,64 @@ extern struct kmem_cache *pcc_inode_slab;
 
 #define LPROCFS_WR_PCC_MAX_CMD 4096
 
+/* User/Group/Project ID */
+struct pcc_match_id {
+       __u32                   pmi_id;
+       struct list_head        pmi_linkage;
+};
+
+/* wildcard file name */
+struct pcc_match_fname {
+       char                    *pmf_name;
+       struct list_head         pmf_linkage;
+};
+
+enum pcc_field {
+       PCC_FIELD_UID,
+       PCC_FIELD_GID,
+       PCC_FIELD_PROJID,
+       PCC_FIELD_FNAME,
+       PCC_FIELD_MAX
+};
+
+struct pcc_expression {
+       enum pcc_field          pe_field;
+       struct list_head        pe_cond;
+       struct list_head        pe_linkage;
+};
+
+struct pcc_conjunction {
+       /* link to disjunction */
+       struct list_head        pc_linkage;
+       /* list of logical conjunction */
+       struct list_head        pc_expressions;
+};
+
+/**
+ * Match rule for auto PCC-cached files.
+ */
+struct pcc_match_rule {
+       char                    *pmr_conds_str;
+       struct list_head         pmr_conds;
+};
+
+struct pcc_matcher {
+       __u32            pm_uid;
+       __u32            pm_gid;
+       __u32            pm_projid;
+       struct qstr     *pm_name;
+};
+
 struct pcc_dataset {
-       __u32                   pccd_id;         /* Archive ID */
-       __u32                   pccd_projid;     /* Project ID */
+       __u32                   pccd_rwid;       /* Archive ID */
+       __u32                   pccd_roid;       /* Readonly ID */
+       struct pcc_match_rule   pccd_rule;       /* Match rule */
+       __u32                   pccd_rwonly:1, /* Only use as RW-PCC */
+                               pccd_roonly:1; /* Only use as RO-PCC */
        char                    pccd_pathname[PATH_MAX]; /* full path */
        struct path             pccd_path;       /* Root path */
        struct list_head        pccd_linkage;  /* Linked to pccs_datasets */
-       atomic_t                pccd_refcount; /* reference count */
+       atomic_t                pccd_refcount; /* Reference count */
 };
 
 struct pcc_super {
@@ -102,8 +153,10 @@ struct pcc_cmd {
        char                                    *pccc_pathname;
        union {
                struct pcc_cmd_add {
-                       __u32                    pccc_id;
-                       __u32                    pccc_projid;
+                       __u32                    pccc_rwid;
+                       __u32                    pccc_roid;
+                       struct list_head         pccc_conds;
+                       char                    *pccc_conds_str;
                } pccc_add;
                struct pcc_cmd_del {
                        __u32                    pccc_pad;
@@ -148,8 +201,8 @@ int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
                     struct lu_fid *fid, struct dentry **pcc_dentry);
 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
                           struct dentry *pcc_dentry);
-struct pcc_dataset *pcc_dataset_get(struct pcc_super *super, __u32 projid,
-                                   __u32 archive_id);
+struct pcc_dataset *pcc_dataset_match_get(struct pcc_super *super,
+                                         struct pcc_matcher *matcher);
 void pcc_dataset_put(struct pcc_dataset *dataset);
 void pcc_inode_free(struct inode *inode);
 void pcc_layout_invalidate(struct inode *inode);
index 32c4ec6..7517005 100644 (file)
@@ -175,7 +175,7 @@ setup_pcc_mapping() {
        local hsm_root=${hsm_root:-$(hsm_root "$facet")}
        local param="$2"
 
-       [ -z "$param" ] && param="$HSM_ARCHIVE_NUMBER\ 100"
+       [ -z "$param" ] && param="projid={100}\ rwid=$HSM_ARCHIVE_NUMBER"
        stack_trap "cleanup_pcc_mapping $facet" EXIT
        do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p $param
 }
@@ -577,7 +577,7 @@ test_3b() {
        # Start all of the copytools and setup PCC
        for n in $(seq $AGTCOUNT); do
                copytool setup -f agt$n -a $n -m $MOUNT
-               setup_pcc_mapping agt$n "$n\ 100"
+               setup_pcc_mapping agt$n "projid={100}\ rwid=$n"
        done
 
        mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
@@ -966,6 +966,157 @@ test_12() {
 }
 run_test 12 "RW-PCC attach races with concurrent HSM remove"
 
+test_rule_id() {
+       local idstr="${1}id"
+       local rule="${idstr}={$2}"
+       local myRUNAS="$3"
+       local file=$DIR/$tdir/$tfile
+
+       setup_pcc_mapping $SINGLEAGT "$rule\ rwid=$HSM_ARCHIVE_NUMBER"
+       $LCTL pcc list $MOUNT
+
+       do_facet $SINGLEAGT mkdir -p $DIR/$tdir
+       chmod 777 $DIR/$tdir || error "chmod 0777 $DIR/$tdir failed"
+
+       rm -f $file || error "rm $file failed"
+       do_facet $SINGLEAGT $myRUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $myRUNAS dd if=$file of=/dev/null bs=1024 count=1 ||
+               error "failed to dd read from $file"
+       do_facet $SINGLEAGT $myRUNAS $TRUNCATE $file 256 ||
+               error "failed to truncate $file"
+       do_facet $SINGLEAGT $myRUNAS $TRUNCATE $file 2048 ||
+               error "failed to truncate $file"
+       do_facet $SINGLEAGT $myRUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write from $file"
+       check_lpcc_state $file "readwrite"
+
+       do_facet $SINGLEAGT $myRUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+
+       cleanup_pcc_mapping
+}
+
+test_13a() {
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       test_rule_id "u" "500" "runas -u 500"
+       test_rule_id "g" "500" "runas -u 500 -g 500"
+}
+run_test 13a "Test auto RW-PCC create caching for UID/GID rule"
+
+test_13b() {
+       local file
+
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "fname={*.h5\ suffix.*\ Mid*dle}\ rwid=$HSM_ARCHIVE_NUMBER"
+       $LCTL pcc list $MOUNT
+
+       do_facet $SINGLEAGT mkdir -p $DIR/$tdir
+       chmod 777 $DIR/$tdir || error "chmod 0777 $DIR/$tdir failed"
+
+       file=$DIR/$tdir/prefix.h5
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $myRUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/suffix.doc
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $myRUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/MidPADdle
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $myRUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/Midpad
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       cleanup_pcc_mapping
+}
+run_test 13b "Test auto RW-PCC create caching for file name with wildcard"
+
+test_13c() {
+       local file
+       local myRUNAS
+
+       ! is_project_quota_supported &&
+               echo "Skip project quota is not supported" && return 0
+
+       enable_project_quota
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100\ 200}\&fname={*.h5},uid={500}\&gid={1000}\ rwid=$HSM_ARCHIVE_NUMBER"
+       $LCTL pcc list $MOUNT
+       do_facet $SINGLEAGT mkdir -p $DIR/$tdir
+       chmod 777 $DIR/$tdir || error "chmod 0777 $DIR/$tdir failed"
+
+       mkdir -p $DIR/$tdir/proj || error "mkdir $DIR/$tdir/proj failed"
+       mkdir -p $DIR/$tdir/proj2 || error "mkdir $DIR/$tdir/proj2 failed"
+       $LFS project -sp 100 $DIR/$tdir/proj ||
+               error "failed to set project for $DIR/$tdir/proj"
+       $LFS project -sp 200 $DIR/$tdir/proj2 ||
+               error "failed to set project for $DIR/$tdir/proj2"
+
+       file=$DIR/$tdir/proj/notcache
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/proj/autocache.h5
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach $file"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/proj2/notcache
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "none"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/proj2/autocache.h5
+       do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach $file"
+       rm $file || error "rm $file failed"
+
+       file=$DIR/$tdir/ugidcache
+       myRUNAS="runas -u 500 -g 1000"
+       do_facet $SINGLEAGT $myRUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach $file"
+       rm $file || error "rm $file failed"
+
+       cleanup_pcc_mapping
+}
+run_test 13c "Check auto RW-PCC create caching for UID/GID/ProjID/fname rule"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status