Whamcloud - gitweb
LU-11467 utils: add lfs mirror delete command
[fs/lustre-release.git] / lustre / llite / pcc.c
index 2de2715..b99c702 100644 (file)
 
 struct kmem_cache *pcc_inode_slab;
 
-void pcc_super_init(struct pcc_super *super)
+int pcc_super_init(struct pcc_super *super)
 {
-       spin_lock_init(&super->pccs_lock);
+       struct cred *cred;
+
+       super->pccs_cred = cred = prepare_creds();
+       if (!cred)
+               return -ENOMEM;
+
+       /* Never override disk quota limits or use reserved space */
+       cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
+       init_rwsem(&super->pccs_rw_sem);
        INIT_LIST_HEAD(&super->pccs_datasets);
+
+       return 0;
+}
+
+/* Rule based auto caching */
+static void pcc_id_list_free(struct list_head *id_list)
+{
+       struct pcc_match_id *id, *n;
+
+       list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
+               list_del_init(&id->pmi_linkage);
+               OBD_FREE_PTR(id);
+       }
+}
+
+static void pcc_fname_list_free(struct list_head *fname_list)
+{
+       struct pcc_match_fname *fname, *n;
+
+       list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
+               OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
+               list_del_init(&fname->pmf_linkage);
+               OBD_FREE_PTR(fname);
+       }
+}
+
+static void pcc_expression_free(struct pcc_expression *expr)
+{
+       LASSERT(expr->pe_field >= PCC_FIELD_UID &&
+               expr->pe_field < PCC_FIELD_MAX);
+       switch (expr->pe_field) {
+       case PCC_FIELD_UID:
+       case PCC_FIELD_GID:
+       case PCC_FIELD_PROJID:
+               pcc_id_list_free(&expr->pe_cond);
+               break;
+       case PCC_FIELD_FNAME:
+               pcc_fname_list_free(&expr->pe_cond);
+               break;
+       default:
+               LBUG();
+       }
+       OBD_FREE_PTR(expr);
+}
+
+static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
+{
+       struct pcc_expression *expression, *n;
+
+       LASSERT(list_empty(&conjunction->pc_linkage));
+       list_for_each_entry_safe(expression, n,
+                                &conjunction->pc_expressions,
+                                pe_linkage) {
+               list_del_init(&expression->pe_linkage);
+               pcc_expression_free(expression);
+       }
+       OBD_FREE_PTR(conjunction);
+}
+
+static void pcc_rule_conds_free(struct list_head *cond_list)
+{
+       struct pcc_conjunction *conjunction, *n;
+
+       list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
+               list_del_init(&conjunction->pc_linkage);
+               pcc_conjunction_free(conjunction);
+       }
+}
+
+static void pcc_cmd_fini(struct pcc_cmd *cmd)
+{
+       if (cmd->pccc_cmd == PCC_ADD_DATASET) {
+               if (!list_empty(&cmd->u.pccc_add.pccc_conds))
+                       pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
+               if (cmd->u.pccc_add.pccc_conds_str)
+                       OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
+                                strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
+       }
+}
+
+#define PCC_DISJUNCTION_DELIM  (',')
+#define PCC_CONJUNCTION_DELIM  ('&')
+#define PCC_EXPRESSION_DELIM   ('=')
+
+static int
+pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
+{
+       struct pcc_match_fname *fname;
+
+       OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
+       if (fname == NULL)
+               return -ENOMEM;
+
+       OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
+       if (fname->pmf_name == NULL) {
+               OBD_FREE(fname, sizeof(struct pcc_match_fname));
+               return -ENOMEM;
+       }
+
+       memcpy(fname->pmf_name, id->ls_str, id->ls_len);
+       list_add_tail(&fname->pmf_linkage, fname_list);
+       return 0;
+}
+
+static int
+pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       ENTRY;
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(fname_list);
+       while (src.ls_str) {
+               rc = cfs_gettok(&src, ' ', &res);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_fname_list_add(&res, fname_list);
+               if (rc)
+                       break;
+       }
+       if (rc)
+               pcc_fname_list_free(fname_list);
+       RETURN(rc);
+}
+
+static int
+pcc_id_list_parse(char *str, int len, struct list_head *id_list,
+                 enum pcc_field type)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       ENTRY;
+
+       if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
+           type != PCC_FIELD_PROJID)
+               RETURN(-EINVAL);
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(id_list);
+       while (src.ls_str) {
+               struct pcc_match_id *id;
+               __u32 id_val;
+
+               if (cfs_gettok(&src, ' ', &res) == 0)
+                       GOTO(out, rc = -EINVAL);
+
+               if (!cfs_str2num_check(res.ls_str, res.ls_len,
+                                      &id_val, 0, (u32)~0U))
+                       GOTO(out, rc = -EINVAL);
+
+               OBD_ALLOC_PTR(id);
+               if (id == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               id->pmi_id = id_val;
+               list_add_tail(&id->pmi_linkage, id_list);
+       }
+out:
+       if (rc)
+               pcc_id_list_free(id_list);
+       RETURN(rc);
+}
+
+static inline bool
+pcc_check_field(struct cfs_lstr *field, char *str)
+{
+       int len = strlen(str);
+
+       return (field->ls_len == len &&
+               strncmp(field->ls_str, str, len) == 0);
+}
+
+static int
+pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
+{
+       struct pcc_expression *expr;
+       struct cfs_lstr field;
+       int rc = 0;
+
+       OBD_ALLOC(expr, sizeof(struct pcc_expression));
+       if (expr == NULL)
+               return -ENOMEM;
+
+       rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
+       if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
+           src->ls_str[src->ls_len - 1] != '}')
+               GOTO(out, rc = -EINVAL);
+
+       /* Skip '{' and '}' */
+       src->ls_str++;
+       src->ls_len -= 2;
+
+       if (pcc_check_field(&field, "uid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_UID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_UID;
+       } else if (pcc_check_field(&field, "gid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_GID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_GID;
+       } else if (pcc_check_field(&field, "projid")) {
+               if (pcc_id_list_parse(src->ls_str,
+                                     src->ls_len,
+                                     &expr->pe_cond,
+                                     PCC_FIELD_PROJID) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_PROJID;
+       } else if (pcc_check_field(&field, "fname")) {
+               if (pcc_fname_list_parse(src->ls_str,
+                                        src->ls_len,
+                                        &expr->pe_cond) < 0)
+                       GOTO(out, rc = -EINVAL);
+               expr->pe_field = PCC_FIELD_FNAME;
+       } else {
+               GOTO(out, rc = -EINVAL);
+       }
+
+       list_add_tail(&expr->pe_linkage, cond_list);
+       return 0;
+out:
+       OBD_FREE_PTR(expr);
+       return rc;
+}
+
+static int
+pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
+{
+       struct pcc_conjunction *conjunction;
+       struct cfs_lstr expr;
+       int rc = 0;
+
+       OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
+       if (conjunction == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&conjunction->pc_expressions);
+       list_add_tail(&conjunction->pc_linkage, cond_list);
+
+       while (src->ls_str) {
+               rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_expression_parse(&expr,
+                                         &conjunction->pc_expressions);
+               if (rc)
+                       break;
+       }
+       return rc;
+}
+
+static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
+{
+       struct cfs_lstr src;
+       struct cfs_lstr res;
+       int rc = 0;
+
+       src.ls_str = str;
+       src.ls_len = len;
+       INIT_LIST_HEAD(cond_list);
+       while (src.ls_str) {
+               rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
+               if (rc == 0) {
+                       rc = -EINVAL;
+                       break;
+               }
+               rc = pcc_conjunction_parse(&res, cond_list);
+               if (rc)
+                       break;
+       }
+       return rc;
+}
+
+static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
+{
+       int rc;
+
+       OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
+       if (cmd->u.pccc_add.pccc_conds_str == NULL)
+               return -ENOMEM;
+
+       memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
+
+       rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
+                            strlen(cmd->u.pccc_add.pccc_conds_str),
+                            &cmd->u.pccc_add.pccc_conds);
+       if (rc)
+               pcc_cmd_fini(cmd);
+
+       return rc;
+}
+
+static int
+pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
+{
+       char *key, *val;
+       unsigned long id;
+       int rc;
+
+       val = buffer;
+       key = strsep(&val, "=");
+       if (val == NULL || strlen(val) == 0)
+               return -EINVAL;
+
+       /* Key of the value pair */
+       if (strcmp(key, "rwid") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id <= 0)
+                       return -EINVAL;
+               cmd->u.pccc_add.pccc_rwid = id;
+       } else if (strcmp(key, "roid") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id <= 0)
+                       return -EINVAL;
+               cmd->u.pccc_add.pccc_roid = id;
+       } else if (strcmp(key, "open_attach") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_OPEN_ATTACH;
+       } else if (strcmp(key, "rwpcc") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
+       } else if (strcmp(key, "ropcc") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
+       } else {
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static int
+pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
+{
+       char *val;
+       char *token;
+       int rc;
+
+       val = buffer;
+       while (val != NULL && strlen(val) != 0) {
+               token = strsep(&val, " ");
+               rc = pcc_parse_value_pair(cmd, token);
+               if (rc)
+                       return rc;
+       }
+
+       switch (cmd->pccc_cmd) {
+       case PCC_ADD_DATASET:
+               if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
+                       return -EINVAL;
+               /*
+                * By default, a PCC backend can provide caching service for
+                * both RW-PCC and RO-PCC.
+                */
+               if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
+               break;
+       case PCC_DEL_DATASET:
+       case PCC_CLEAR_ALL:
+               break;
+       default:
+               return -EINVAL;
+       }
+       return 0;
+}
+
+static void
+pcc_dataset_rule_fini(struct pcc_match_rule *rule)
+{
+       if (!list_empty(&rule->pmr_conds))
+               pcc_rule_conds_free(&rule->pmr_conds);
+       LASSERT(rule->pmr_conds_str != NULL);
+       OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
+}
+
+static int
+pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
+{
+       int rc = 0;
+
+       LASSERT(cmd->u.pccc_add.pccc_conds_str);
+       OBD_ALLOC(rule->pmr_conds_str,
+                 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
+       if (rule->pmr_conds_str == NULL)
+               return -ENOMEM;
+
+       memcpy(rule->pmr_conds_str,
+              cmd->u.pccc_add.pccc_conds_str,
+              strlen(cmd->u.pccc_add.pccc_conds_str));
+
+       INIT_LIST_HEAD(&rule->pmr_conds);
+       if (!list_empty(&cmd->u.pccc_add.pccc_conds))
+               rc = pcc_conds_parse(rule->pmr_conds_str,
+                                         strlen(rule->pmr_conds_str),
+                                         &rule->pmr_conds);
+
+       if (rc)
+               pcc_dataset_rule_fini(rule);
+
+       return rc;
+}
+
+/* Rule Matching */
+static int
+pcc_id_list_match(struct list_head *id_list, __u32 id_val)
+{
+       struct pcc_match_id *id;
+
+       list_for_each_entry(id, id_list, pmi_linkage) {
+               if (id->pmi_id == id_val)
+                       return 1;
+       }
+       return 0;
+}
+
+static bool
+cfs_match_wildcard(const char *pattern, const char *content)
+{
+       if (*pattern == '\0' && *content == '\0')
+               return true;
+
+       if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
+               return false;
+
+       while (*pattern == *content) {
+               pattern++;
+               content++;
+               if (*pattern == '\0' && *content == '\0')
+                       return true;
+
+               if (*pattern == '*' && *(pattern + 1) != '\0' &&
+                   *content == '\0')
+                       return false;
+       }
+
+       if (*pattern == '*')
+               return (cfs_match_wildcard(pattern + 1, content) ||
+                       cfs_match_wildcard(pattern, content + 1));
+
+       return false;
+}
+
+static int
+pcc_fname_list_match(struct list_head *fname_list, const char *name)
+{
+       struct pcc_match_fname *fname;
+
+       list_for_each_entry(fname, fname_list, pmf_linkage) {
+               if (cfs_match_wildcard(fname->pmf_name, name))
+                       return 1;
+       }
+       return 0;
+}
+
+static int
+pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
+{
+       switch (expr->pe_field) {
+       case PCC_FIELD_UID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
+       case PCC_FIELD_GID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
+       case PCC_FIELD_PROJID:
+               return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
+       case PCC_FIELD_FNAME:
+               return pcc_fname_list_match(&expr->pe_cond,
+                                           matcher->pm_name->name);
+       default:
+               return 0;
+       }
+}
+
+static int
+pcc_conjunction_match(struct pcc_conjunction *conjunction,
+                     struct pcc_matcher *matcher)
+{
+       struct pcc_expression *expr;
+       int matched;
+
+       list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
+               matched = pcc_expression_match(expr, matcher);
+               if (!matched)
+                       return 0;
+       }
+
+       return 1;
+}
+
+static int
+pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
+{
+       struct pcc_conjunction *conjunction;
+       int matched;
+
+       list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
+               matched = pcc_conjunction_match(conjunction, matcher);
+               if (matched)
+                       return 1;
+       }
+
+       return 0;
+}
+
+struct pcc_dataset*
+pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
+{
+       struct pcc_dataset *dataset;
+       struct pcc_dataset *selected = NULL;
+
+       down_read(&super->pccs_rw_sem);
+       list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
+               if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
+                       continue;
+
+               if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
+                       atomic_inc(&dataset->pccd_refcount);
+                       selected = dataset;
+                       break;
+               }
+       }
+       up_read(&super->pccs_rw_sem);
+       if (selected)
+               CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
+                      dataset->pccd_rule.pmr_conds_str,
+                      matcher->pm_uid, matcher->pm_gid,
+                      matcher->pm_projid, matcher->pm_name->name);
+
+       return selected;
 }
 
 /**
  * pcc_dataset_add - Add a Cache policy to control which files need be
  * cached and where it will be cached.
  *
- * @super: superblock of pcc
- * @pathname: root path of pcc
- * @id: HSM archive ID
- * @projid: files with specified project ID will be cached.
+ * @super:     superblock of pcc
+ * @cmd:       pcc command
  */
 static int
-pcc_dataset_add(struct pcc_super *super, const char *pathname,
-               __u32 archive_id, __u32 projid)
+pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
 {
-       int rc;
+       char *pathname = cmd->pccc_pathname;
        struct pcc_dataset *dataset;
        struct pcc_dataset *tmp;
        bool found = false;
+       int rc;
 
        OBD_ALLOC_PTR(dataset);
        if (dataset == NULL)
@@ -146,20 +710,31 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname,
                return rc;
        }
        strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
-       dataset->pccd_id = archive_id;
-       dataset->pccd_projid = projid;
+       dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
+       dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
+       dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
        atomic_set(&dataset->pccd_refcount, 1);
 
-       spin_lock(&super->pccs_lock);
+       rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
+       if (rc) {
+               pcc_dataset_put(dataset);
+               return rc;
+       }
+
+       down_write(&super->pccs_rw_sem);
        list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
-               if (tmp->pccd_id == archive_id) {
+               if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
+                   (dataset->pccd_rwid != 0 &&
+                    dataset->pccd_rwid == tmp->pccd_rwid) ||
+                   (dataset->pccd_roid != 0 &&
+                    dataset->pccd_roid == tmp->pccd_roid)) {
                        found = true;
                        break;
                }
        }
        if (!found)
                list_add(&dataset->pccd_linkage, &super->pccs_datasets);
-       spin_unlock(&super->pccs_lock);
+       up_write(&super->pccs_rw_sem);
 
        if (found) {
                pcc_dataset_put(dataset);
@@ -170,32 +745,31 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname,
 }
 
 struct pcc_dataset *
-pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
+pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
 {
        struct pcc_dataset *dataset;
        struct pcc_dataset *selected = NULL;
 
-       if (projid == 0 && archive_id == 0)
+       if (id == 0)
                return NULL;
 
        /*
-        * archive ID is unique in the list, projid might be duplicate,
+        * archive ID (read-write ID) or read-only ID is unique in the list,
         * we just return last added one as first priority.
         */
-       spin_lock(&super->pccs_lock);
+       down_read(&super->pccs_rw_sem);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
-               if (projid && dataset->pccd_projid != projid)
-                       continue;
-               if (archive_id && dataset->pccd_id != archive_id)
+               if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
+                   !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
                        continue;
                atomic_inc(&dataset->pccd_refcount);
                selected = dataset;
                break;
        }
-       spin_unlock(&super->pccs_lock);
+       up_read(&super->pccs_rw_sem);
        if (selected)
-               CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
-                      selected->pccd_projid);
+               CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
+
        return selected;
 }
 
@@ -203,6 +777,7 @@ void
 pcc_dataset_put(struct pcc_dataset *dataset)
 {
        if (atomic_dec_and_test(&dataset->pccd_refcount)) {
+               pcc_dataset_rule_fini(&dataset->pccd_rule);
                path_put(&dataset->pccd_path);
                OBD_FREE_PTR(dataset);
        }
@@ -215,17 +790,17 @@ pcc_dataset_del(struct pcc_super *super, char *pathname)
        struct pcc_dataset *dataset;
        int rc = -ENOENT;
 
-       spin_lock(&super->pccs_lock);
+       down_write(&super->pccs_rw_sem);
        list_for_each_safe(l, tmp, &super->pccs_datasets) {
                dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
                if (strcmp(dataset->pccd_pathname, pathname) == 0) {
-                       list_del(&dataset->pccd_linkage);
+                       list_del_init(&dataset->pccd_linkage);
                        pcc_dataset_put(dataset);
                        rc = 0;
                        break;
                }
        }
-       spin_unlock(&super->pccs_lock);
+       up_write(&super->pccs_rw_sem);
        return rc;
 }
 
@@ -233,8 +808,9 @@ static void
 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
 {
        seq_printf(m, "%s:\n", dataset->pccd_pathname);
-       seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
-       seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
+       seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
+       seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
+       seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
 }
 
 int
@@ -242,25 +818,32 @@ pcc_super_dump(struct pcc_super *super, struct seq_file *m)
 {
        struct pcc_dataset *dataset;
 
-       spin_lock(&super->pccs_lock);
+       down_read(&super->pccs_rw_sem);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
                pcc_dataset_dump(dataset, m);
        }
-       spin_unlock(&super->pccs_lock);
+       up_read(&super->pccs_rw_sem);
        return 0;
 }
 
-void pcc_super_fini(struct pcc_super *super)
+static void pcc_remove_datasets(struct pcc_super *super)
 {
        struct pcc_dataset *dataset, *tmp;
 
+       down_write(&super->pccs_rw_sem);
        list_for_each_entry_safe(dataset, tmp,
                                 &super->pccs_datasets, pccd_linkage) {
                list_del(&dataset->pccd_linkage);
                pcc_dataset_put(dataset);
        }
+       up_write(&super->pccs_rw_sem);
 }
 
+void pcc_super_fini(struct pcc_super *super)
+{
+       pcc_remove_datasets(super);
+       put_cred(super->pccs_cred);
+}
 
 static bool pathname_is_valid(const char *pathname)
 {
@@ -277,7 +860,6 @@ pcc_cmd_parse(char *buffer, unsigned long count)
        static struct pcc_cmd *cmd;
        char *token;
        char *val;
-       unsigned long tmp;
        int rc = 0;
 
        OBD_ALLOC_PTR(cmd);
@@ -311,28 +893,35 @@ pcc_cmd_parse(char *buffer, unsigned long count)
        cmd->pccc_pathname = token;
 
        if (cmd->pccc_cmd == PCC_ADD_DATASET) {
-               /* archive ID */
-               token = strsep(&val, " ");
-               if (val == NULL)
+               /* List of ID */
+               LASSERT(val);
+               token = val;
+               val = strrchr(token, '}');
+               if (!val)
                        GOTO(out_free_cmd, rc = -EINVAL);
 
-               rc = kstrtoul(token, 10, &tmp);
-               if (rc != 0)
+               /* Skip '}' */
+               val++;
+               if (*val == '\0') {
+                       val = NULL;
+               } else if (*val == ' ') {
+                       *val = '\0';
+                       val++;
+               } else {
                        GOTO(out_free_cmd, rc = -EINVAL);
-               if (tmp == 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               cmd->u.pccc_add.pccc_id = tmp;
+               }
 
-               token = val;
-               rc = kstrtoul(token, 10, &tmp);
-               if (rc != 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               if (tmp == 0)
-                       GOTO(out_free_cmd, rc = -EINVAL);
-               cmd->u.pccc_add.pccc_projid = tmp;
-       }
+               rc = pcc_id_parse(cmd, token);
+               if (rc)
+                       GOTO(out_free_cmd, rc);
 
+               rc = pcc_parse_value_pairs(cmd, val);
+               if (rc)
+                       GOTO(out_cmd_fini, rc = -EINVAL);
+       }
        goto out;
+out_cmd_fini:
+       pcc_cmd_fini(cmd);
 out_free_cmd:
        OBD_FREE_PTR(cmd);
 out:
@@ -353,21 +942,20 @@ int pcc_cmd_handle(char *buffer, unsigned long count,
 
        switch (cmd->pccc_cmd) {
        case PCC_ADD_DATASET:
-               rc = pcc_dataset_add(super, cmd->pccc_pathname,
-                                     cmd->u.pccc_add.pccc_id,
-                                     cmd->u.pccc_add.pccc_projid);
+               rc = pcc_dataset_add(super, cmd);
                break;
        case PCC_DEL_DATASET:
                rc = pcc_dataset_del(super, cmd->pccc_pathname);
                break;
        case PCC_CLEAR_ALL:
-               pcc_super_fini(super);
+               pcc_remove_datasets(super);
                break;
        default:
                rc = -EINVAL;
                break;
        }
 
+       pcc_cmd_fini(cmd);
        OBD_FREE_PTR(cmd);
        return rc;
 }
@@ -382,17 +970,26 @@ static inline void pcc_inode_unlock(struct inode *inode)
        mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
 }
 
-static void pcc_inode_init(struct pcc_inode *pcci)
+static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
 {
+       pcci->pcci_lli = lli;
+       lli->lli_pcc_inode = pcci;
+       lli->lli_pcc_state = PCC_STATE_FL_NONE;
        atomic_set(&pcci->pcci_refcount, 0);
        pcci->pcci_type = LU_PCC_NONE;
+       pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
+       atomic_set(&pcci->pcci_active_ios, 0);
+       init_waitqueue_head(&pcci->pcci_waitq);
 }
 
 static void pcc_inode_fini(struct pcc_inode *pcci)
 {
+       struct ll_inode_info *lli = pcci->pcci_lli;
+
        path_put(&pcci->pcci_path);
        pcci->pcci_type = LU_PCC_NONE;
        OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+       lli->lli_pcc_inode = NULL;
 }
 
 static void pcc_inode_get(struct pcc_inode *pcci)
@@ -408,13 +1005,11 @@ static void pcc_inode_put(struct pcc_inode *pcci)
 
 void pcc_inode_free(struct inode *inode)
 {
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       struct pcc_inode *pcci = ll_i2pcci(inode);
 
        if (pcci) {
                WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
                pcc_inode_put(pcci);
-               lli->lli_pcc_inode = NULL;
        }
 }
 
@@ -438,20 +1033,274 @@ static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
                        PFID(fid));
 }
 
+static inline const struct cred *pcc_super_cred(struct super_block *sb)
+{
+       return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
+}
+
 void pcc_file_init(struct pcc_file *pccf)
 {
        pccf->pccf_file = NULL;
        pccf->pccf_type = LU_PCC_NONE;
 }
 
+static inline bool pcc_open_attach_enabled(struct pcc_dataset *dataset)
+{
+       return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
+}
+
+static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
+
+static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
+{
+       struct dentry *pcc_dentry = pcci->pcci_path.dentry;
+       struct ll_inode_info *lli = pcci->pcci_lli;
+       int rc;
+
+       ENTRY;
+
+       if (!(lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH))
+               RETURN(0);
+
+#ifndef HAVE_VFS_SETXATTR
+       if (!pcc_dentry->d_inode->i_op->setxattr)
+               RETURN(-ENOTSUPP);
+
+       rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
+                                                &gen, sizeof(gen), 0);
+#else
+       rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
+                           &gen, sizeof(gen), 0);
+#endif
+       RETURN(rc);
+}
+
+static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
+{
+       struct lu_env *env;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       __u16 refcheck;
+       int rc;
+
+       ENTRY;
+
+       if (!lli->lli_clob)
+               RETURN(-EINVAL);
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       rc = cl_object_layout_get(env, lli->lli_clob, clt);
+       if (rc)
+               CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
+                      PFID(ll_inode2fid(inode)));
+
+       cl_env_put(env, &refcheck);
+       RETURN(rc);
+}
+
+static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
+                                   struct pcc_dataset *dataset)
+{
+       return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
+                       DFID_NOBRACE,
+                       dataset->pccd_pathname,
+                       (fid)->f_oid       & 0xFFFF,
+                       (fid)->f_oid >> 16 & 0xFFFF,
+                       (unsigned int)((fid)->f_seq       & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
+                       PFID(fid));
+}
+
+/* Must be called with pcci->pcci_lock held */
+static void pcc_inode_attach_init(struct pcc_dataset *dataset,
+                                 struct pcc_inode *pcci,
+                                 struct dentry *dentry,
+                                 enum lu_pcc_type type)
+{
+       pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+       pcci->pcci_path.dentry = dentry;
+       LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
+       atomic_set(&pcci->pcci_refcount, 1);
+       pcci->pcci_type = type;
+       pcci->pcci_attr_valid = false;
+
+       if (pcc_open_attach_enabled(dataset)) {
+               struct ll_inode_info *lli = pcci->pcci_lli;
+
+               lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
+       }
+}
+
+static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
+                                     __u32 gen)
+{
+       pcci->pcci_layout_gen = gen;
+}
+
+static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
+{
+       return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
+}
+
+static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
+                                 enum lu_pcc_type type,
+                                 struct pcc_dataset *dataset,
+                                 bool *cached)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       const struct cred *old_cred;
+       struct dentry *pcc_dentry;
+       struct path path;
+       char *pathname;
+       __u32 pcc_gen;
+       int rc;
+
+       ENTRY;
+
+       if (type == LU_PCC_READWRITE &&
+           !(dataset->pccd_flags & PCC_DATASET_RWPCC))
+               RETURN(0);
+
+       OBD_ALLOC(pathname, PATH_MAX);
+       if (pathname == NULL)
+               RETURN(-ENOMEM);
+
+       pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
+
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
+       if (rc)
+               /* ignore this error */
+               GOTO(out, rc = 0);
+
+       pcc_dentry = path.dentry;
+#ifndef HAVE_VFS_SETXATTR
+       if (!pcc_dentry->d_inode->i_op->getxattr)
+               /* ignore this error */
+               GOTO(out_put_path, rc = 0);
+
+       rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
+                                                &pcc_gen, sizeof(pcc_gen));
+#else
+       rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
+                           &pcc_gen, sizeof(pcc_gen));
+#endif
+
+       if (rc < 0)
+               /* ignore this error */
+               GOTO(out_put_path, rc = 0);
+
+       rc = 0;
+       /* The file is still valid cached in PCC, attach it immediately. */
+       if (pcc_gen == gen) {
+               CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
+                      PFID(&lli->lli_fid), gen);
+               if (!pcci) {
+                       OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+                       if (pcci == NULL)
+                               GOTO(out_put_path, rc = -ENOMEM);
+
+                       pcc_inode_init(pcci, lli);
+                       dget(pcc_dentry);
+                       pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
+               } else {
+                       /*
+                        * This happened when a file was once attached into
+                        * PCC, and some processes keep this file opened
+                        * (pcci->refcount > 1) and corresponding PCC file
+                        * without any I/O activity, and then this file was
+                        * detached by the manual detach command or the
+                        * revocation of the layout lock (i.e. cached LRU lock
+                        * shrinking).
+                        */
+                       pcc_inode_get(pcci);
+                       pcci->pcci_type = type;
+               }
+               pcc_layout_gen_set(pcci, gen);
+               *cached = true;
+       }
+out_put_path:
+       path_put(&path);
+out:
+       revert_creds(old_cred);
+       OBD_FREE(pathname, PATH_MAX);
+       RETURN(rc);
+}
+
+static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
+                                  enum lu_pcc_type type, bool *cached)
+{
+       struct pcc_dataset *dataset, *tmp;
+       struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
+       int rc = 0;
+
+       ENTRY;
+
+       down_read(&super->pccs_rw_sem);
+       list_for_each_entry_safe(dataset, tmp,
+                                &super->pccs_datasets, pccd_linkage) {
+               if (!pcc_open_attach_enabled(dataset))
+                       continue;
+               rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
+               if (rc < 0 || (!rc && *cached))
+                       break;
+       }
+       up_read(&super->pccs_rw_sem);
+
+       RETURN(rc);
+}
+
+static int pcc_try_open_attach(struct inode *inode, bool *cached)
+{
+       struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
+       struct cl_layout clt = {
+               .cl_layout_gen = 0,
+               .cl_is_released = false,
+       };
+       int rc;
+
+       ENTRY;
+
+       /*
+        * Quick check whether there is PCC device.
+        */
+       if (list_empty(&super->pccs_datasets))
+               RETURN(0);
+
+       /*
+        * The file layout lock was cancelled. And this open does not
+        * obtain valid layout lock from MDT (i.e. the file is being
+        * HSM restoring).
+        */
+       if (ll_layout_version_get(ll_i2info(inode)) == CL_LAYOUT_GEN_NONE)
+               RETURN(0);
+
+       rc = pcc_get_layout_info(inode, &clt);
+       if (rc)
+               RETURN(rc);
+
+       if (clt.cl_is_released)
+               rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
+                                            LU_PCC_READWRITE, cached);
+
+       RETURN(rc);
+}
+
 int pcc_file_open(struct inode *inode, struct file *file)
 {
        struct pcc_inode *pcci;
+       struct ll_inode_info *lli = ll_i2info(inode);
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct pcc_file *pccf = &fd->fd_pcc_file;
        struct file *pcc_file;
        struct path *path;
        struct qstr *dname;
+       bool cached = false;
        int rc = 0;
 
        ENTRY;
@@ -461,12 +1310,19 @@ int pcc_file_open(struct inode *inode, struct file *file)
 
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
-       if (!pcci)
-               GOTO(out_unlock, rc = 0);
 
-       if (atomic_read(&pcci->pcci_refcount) == 0)
+       if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
                GOTO(out_unlock, rc = 0);
 
+       if (!pcci || !pcc_inode_has_layout(pcci)) {
+               rc = pcc_try_open_attach(inode, &cached);
+               if (rc < 0 || !cached)
+                       GOTO(out_unlock, rc);
+
+               if (!pcci)
+                       pcci = ll_i2pcci(inode);
+       }
+
        pcc_inode_get(pcci);
        WARN_ON(pccf->pccf_file);
 
@@ -474,11 +1330,13 @@ int pcc_file_open(struct inode *inode, struct file *file)
        dname = &path->dentry->d_name;
        CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
               dname->name);
+
 #ifdef HAVE_DENTRY_OPEN_USE_PATH
-       pcc_file = dentry_open(path, file->f_flags, current_cred());
+       pcc_file = dentry_open(path, file->f_flags,
+                              pcc_super_cred(inode->i_sb));
 #else
-       pcc_file = dentry_open(path->dentry, path->mnt,
-                              file->f_flags, current_cred());
+       pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
+                              pcc_super_cred(inode->i_sb));
 #endif
        if (IS_ERR_OR_NULL(pcc_file)) {
                rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
@@ -522,6 +1380,68 @@ void pcc_file_release(struct inode *inode, struct file *file)
        pccf->pccf_file = NULL;
 out:
        pcc_inode_unlock(inode);
+       RETURN_EXIT;
+}
+
+static void pcc_io_init(struct inode *inode, bool *cached)
+{
+       struct pcc_inode *pcci;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+               atomic_inc(&pcci->pcci_active_ios);
+               *cached = true;
+       } else {
+               *cached = false;
+       }
+       pcc_inode_unlock(inode);
+}
+
+static void pcc_io_fini(struct inode *inode)
+{
+       struct pcc_inode *pcci = ll_i2pcci(inode);
+
+       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
+       if (atomic_dec_and_test(&pcci->pcci_active_ios))
+               wake_up_all(&pcci->pcci_waitq);
+}
+
+
+static ssize_t
+__pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+       struct file *file = iocb->ki_filp;
+
+#ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
+       return file->f_op->read_iter(iocb, iter);
+#else
+       struct iovec iov;
+       struct iov_iter i;
+       ssize_t bytes = 0;
+
+       iov_for_each(iov, i, *iter) {
+               ssize_t res;
+
+               res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
+               if (-EIOCBQUEUED == res)
+                       res = wait_on_sync_kiocb(iocb);
+               if (res <= 0) {
+                       if (bytes == 0)
+                               bytes = res;
+                       break;
+               }
+
+               bytes += res;
+               if (res < iov.iov_len)
+                       break;
+       }
+
+       if (bytes > 0)
+               iov_iter_advance(iter, bytes);
+       return bytes;
+#endif
 }
 
 ssize_t pcc_file_read_iter(struct kiocb *iocb,
@@ -530,6 +1450,7 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
        struct file *file = iocb->ki_filp;
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct inode *inode = file_inode(file);
        ssize_t result;
 
        ENTRY;
@@ -538,12 +1459,20 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
                *cached = false;
                RETURN(0);
        }
-       *cached = true;
-       iocb->ki_filp = pccf->pccf_file;
 
-       result = generic_file_read_iter(iocb, iter);
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       iocb->ki_filp = pccf->pccf_file;
+       /* generic_file_aio_read does not support ext4-dax,
+        * __pcc_file_read_iter uses ->aio_read hook directly
+        * to add support for ext4-dax.
+        */
+       result = __pcc_file_read_iter(iocb, iter);
        iocb->ki_filp = file;
 
+       pcc_io_fini(inode);
        RETURN(result);
 }
 
@@ -582,130 +1511,449 @@ __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
 #endif
 }
 
-ssize_t pcc_file_write_iter(struct kiocb *iocb,
-                           struct iov_iter *iter, bool *cached)
+ssize_t pcc_file_write_iter(struct kiocb *iocb,
+                           struct iov_iter *iter, bool *cached)
+{
+       struct file *file = iocb->ki_filp;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct inode *inode = file_inode(file);
+       ssize_t result;
+
+       ENTRY;
+
+       if (pccf->pccf_file == NULL) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       if (pccf->pccf_type != LU_PCC_READWRITE) {
+               *cached = false;
+               RETURN(-EAGAIN);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, result = -ENOSPC);
+
+       iocb->ki_filp = pccf->pccf_file;
+
+       /* Since __pcc_file_write_iter makes write calls via
+        * the normal vfs interface to the local PCC file system,
+        * the inode lock is not needed.
+        */
+       result = __pcc_file_write_iter(iocb, iter);
+       iocb->ki_filp = file;
+out:
+       pcc_io_fini(inode);
+       RETURN(result);
+}
+
+int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
+                     bool *cached)
+{
+       int rc;
+       const struct cred *old_cred;
+       struct iattr attr2 = *attr;
+       struct dentry *pcc_dentry;
+       struct pcc_inode *pcci;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
+                        ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
+                        ATTR_CTIME | ATTR_UID | ATTR_GID);
+       pcci = ll_i2pcci(inode);
+       pcc_dentry = pcci->pcci_path.dentry;
+       inode_lock(pcc_dentry->d_inode);
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
+       revert_creds(old_cred);
+       inode_unlock(pcc_dentry->d_inode);
+
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
+
+int pcc_inode_getattr(struct inode *inode, bool *cached)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       const struct cred *old_cred;
+       struct kstat stat;
+       s64 atime;
+       s64 mtime;
+       s64 ctime;
+       int rc;
+
+       ENTRY;
+
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
+       revert_creds(old_cred);
+       if (rc)
+               GOTO(out, rc);
+
+       ll_inode_size_lock(inode);
+       if (inode->i_atime.tv_sec < lli->lli_atime ||
+           lli->lli_update_atime) {
+               inode->i_atime.tv_sec = lli->lli_atime;
+               lli->lli_update_atime = 0;
+       }
+       inode->i_mtime.tv_sec = lli->lli_mtime;
+       inode->i_ctime.tv_sec = lli->lli_ctime;
+
+       atime = inode->i_atime.tv_sec;
+       mtime = inode->i_mtime.tv_sec;
+       ctime = inode->i_ctime.tv_sec;
+
+       if (atime < stat.atime.tv_sec)
+               atime = stat.atime.tv_sec;
+
+       if (ctime < stat.ctime.tv_sec)
+               ctime = stat.ctime.tv_sec;
+
+       if (mtime < stat.mtime.tv_sec)
+               mtime = stat.mtime.tv_sec;
+
+       i_size_write(inode, stat.size);
+       inode->i_blocks = stat.blocks;
+
+       inode->i_atime.tv_sec = atime;
+       inode->i_mtime.tv_sec = mtime;
+       inode->i_ctime.tv_sec = ctime;
+
+       ll_inode_size_unlock(inode);
+out:
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
+
+ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
+                            struct pipe_inode_info *pipe,
+                            size_t count, unsigned int flags,
+                            bool *cached)
+{
+       struct inode *inode = file_inode(in_file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       ssize_t result;
+
+       ENTRY;
+
+       *cached = false;
+       if (!pcc_file)
+               RETURN(0);
+
+       if (!file_inode(pcc_file)->i_fop->splice_read)
+               RETURN(-ENOTSUPP);
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
+                                                         ppos, pipe, count,
+                                                         flags);
+
+       pcc_io_fini(inode);
+       RETURN(result);
+}
+
+int pcc_fsync(struct file *file, loff_t start, loff_t end,
+             int datasync, bool *cached)
+{
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       int rc;
+
+       ENTRY;
+
+       if (!pcc_file) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                               start, end, datasync);
+
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
+
+int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
+                 bool *cached)
+{
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct pcc_inode *pcci;
+       int rc = 0;
+
+       ENTRY;
+
+       if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
+               *cached = true;
+               vma->vm_file = pcc_file;
+               rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
+               vma->vm_file = file;
+               /* Save the vm ops of backend PCC */
+               vma->vm_private_data = (void *)vma->vm_ops;
+       } else {
+               *cached = false;
+       }
+       pcc_inode_unlock(inode);
+
+       RETURN(rc);
+}
+
+void pcc_vm_open(struct vm_area_struct *vma)
+{
+       struct pcc_inode *pcci;
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
+               RETURN_EXIT;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               vma->vm_file = pcc_file;
+               pcc_vm_ops->open(vma);
+               vma->vm_file = file;
+       }
+       pcc_inode_unlock(inode);
+       EXIT;
+}
+
+void pcc_vm_close(struct vm_area_struct *vma)
+{
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
+               RETURN_EXIT;
+
+       pcc_inode_lock(inode);
+       /* Layout lock maybe revoked here */
+       vma->vm_file = pcc_file;
+       pcc_vm_ops->close(vma);
+       vma->vm_file = file;
+       pcc_inode_unlock(inode);
+       EXIT;
+}
+
+int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
+                    bool *cached)
 {
-       struct file *file = iocb->ki_filp;
+       struct page *page = vmf->page;
+       struct mm_struct *mm = vma->vm_mm;
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
-       struct pcc_file *pccf = &fd->fd_pcc_file;
-       ssize_t result;
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+       int rc;
 
        ENTRY;
 
-       if (pccf->pccf_file == NULL) {
+       if (!pcc_file || !pcc_vm_ops) {
                *cached = false;
                RETURN(0);
        }
-       *cached = true;
 
-       if (pccf->pccf_type != LU_PCC_READWRITE)
-               RETURN(-EWOULDBLOCK);
+       if (!pcc_vm_ops->page_mkwrite &&
+           page->mapping == pcc_file->f_mapping) {
+               CDEBUG(D_MMAP,
+                      "%s: PCC backend fs not support ->page_mkwrite()\n",
+                      ll_i2sbi(inode)->ll_fsname);
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
+               up_read(&mm->mmap_sem);
+               *cached = true;
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
+       /* Pause to allow for a race with concurrent detach */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
+
+       pcc_io_init(inode, cached);
+       if (!*cached) {
+               /* This happens when the file is detached from PCC after got
+                * the fault page via ->fault() on the inode of the PCC copy.
+                * Here it can not simply fall back to normal Lustre I/O path.
+                * The reason is that the address space of fault page used by
+                * ->page_mkwrite() is still the one of PCC inode. In the
+                * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
+                * handled as the address space of the fault page is not
+                * consistent with the one of the Lustre inode (though the
+                * fault page was truncated).
+                * As the file is detached from PCC, the fault page must
+                * be released frist, and retry the mmap write (->fault() and
+                * ->page_mkwrite).
+                * We use an ugly and tricky method by returning
+                * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
+                * __do_page_fault and retry the memory fault handling.
+                */
+               if (page->mapping == pcc_file->f_mapping) {
+                       *cached = true;
+                       up_read(&mm->mmap_sem);
+                       RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+               }
 
-       iocb->ki_filp = pccf->pccf_file;
+               RETURN(0);
+       }
 
-       /* Since __pcc_file_write_iter makes write calls via
-        * the normal vfs interface to the local PCC file system,
-        * the inode lock is not needed.
+       /*
+        * This fault injection can also be used to simulate -ENOSPC and
+        * -EDQUOT failure of underlying PCC backend fs.
         */
-       result = __pcc_file_write_iter(iocb, iter);
-       iocb->ki_filp = file;
-       RETURN(result);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
+               pcc_io_fini(inode);
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
+               up_read(&mm->mmap_sem);
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
+
+       vma->vm_file = pcc_file;
+#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
+       rc = pcc_vm_ops->page_mkwrite(vmf);
+#else
+       rc = pcc_vm_ops->page_mkwrite(vma, vmf);
+#endif
+       vma->vm_file = file;
+
+       pcc_io_fini(inode);
+       RETURN(rc);
 }
 
-int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
-                     bool *cached)
+int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
+             bool *cached)
 {
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
        int rc;
-       struct pcc_inode *pcci;
-       struct iattr attr2 = *attr;
-       struct dentry *pcc_dentry;
 
        ENTRY;
 
-       if (!S_ISREG(inode->i_mode)) {
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
                *cached = false;
                RETURN(0);
        }
 
-       pcc_inode_lock(inode);
-       pcci = ll_i2pcci(inode);
-       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
-               GOTO(out_unlock, rc = 0);
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
 
-       *cached = true;
-       attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
-                        ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
-                        ATTR_CTIME);
-       pcc_dentry = pcci->pcci_path.dentry;
-       inode_lock(pcc_dentry->d_inode);
-       rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
-       inode_unlock(pcc_dentry->d_inode);
-out_unlock:
-       pcc_inode_unlock(inode);
+       vma->vm_file = pcc_file;
+#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
+       rc = pcc_vm_ops->fault(vmf);
+#else
+       rc = pcc_vm_ops->fault(vma, vmf);
+#endif
+       vma->vm_file = file;
+
+       pcc_io_fini(inode);
        RETURN(rc);
 }
 
-int pcc_inode_getattr(struct inode *inode, bool *cached)
+static void pcc_layout_wait(struct pcc_inode *pcci)
+{
+       struct l_wait_info lwi = { 0 };
+
+       while (atomic_read(&pcci->pcci_active_ios) > 0) {
+               CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
+                      atomic_read(&pcci->pcci_active_ios));
+               l_wait_event(pcci->pcci_waitq,
+                            atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
+       }
+}
+
+static void __pcc_layout_invalidate(struct pcc_inode *pcci)
+{
+       pcci->pcci_type = LU_PCC_NONE;
+       pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
+       pcc_layout_wait(pcci);
+}
+
+void pcc_layout_invalidate(struct inode *inode)
 {
-       struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
-       struct kstat stat;
-       s64 atime;
-       s64 mtime;
-       s64 ctime;
-       int rc;
 
        ENTRY;
 
-       if (!S_ISREG(inode->i_mode)) {
-               *cached = false;
-               RETURN(0);
-       }
-
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
-       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
-               GOTO(out_unlock, rc = 0);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+               __pcc_layout_invalidate(pcci);
 
-       *cached = true;
-       rc = ll_vfs_getattr(&pcci->pcci_path, &stat);
-       if (rc)
-               GOTO(out_unlock, rc);
+               CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
+                      PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
 
-       ll_inode_size_lock(inode);
-       if (inode->i_atime.tv_sec < lli->lli_atime ||
-           lli->lli_update_atime) {
-               inode->i_atime.tv_sec = lli->lli_atime;
-               lli->lli_update_atime = 0;
+               pcc_inode_put(pcci);
        }
-       inode->i_mtime.tv_sec = lli->lli_mtime;
-       inode->i_ctime.tv_sec = lli->lli_ctime;
-
-       atime = inode->i_atime.tv_sec;
-       mtime = inode->i_mtime.tv_sec;
-       ctime = inode->i_ctime.tv_sec;
-
-       if (atime < stat.atime.tv_sec)
-               atime = stat.atime.tv_sec;
-
-       if (ctime < stat.ctime.tv_sec)
-               ctime = stat.ctime.tv_sec;
-
-       if (mtime < stat.mtime.tv_sec)
-               mtime = stat.mtime.tv_sec;
+       pcc_inode_unlock(inode);
 
-       i_size_write(inode, stat.size);
-       inode->i_blocks = stat.blocks;
+       EXIT;
+}
 
-       inode->i_atime.tv_sec = atime;
-       inode->i_mtime.tv_sec = mtime;
-       inode->i_ctime.tv_sec = ctime;
+static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
+{
+       int rc;
 
-       ll_inode_size_unlock(inode);
+       rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
+       if (rc)
+               CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
+                     ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
+                     pcc_dentry->d_name.name, rc);
 
-out_unlock:
-       pcc_inode_unlock(inode);
-       RETURN(rc);
+       return rc;
 }
 
 /* Create directory under base if directory does not exist */
@@ -752,9 +2000,10 @@ pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
                *ptr = '\0';
                child = pcc_mkdir(parent, entry_name, mode);
                *ptr = '/';
+               dput(parent);
                if (IS_ERR(child))
                        break;
-               dput(parent);
+
                parent = child;
                ptr++;
                entry_name = ptr;
@@ -790,20 +2039,6 @@ out:
        return dentry;
 }
 
-/* Must be called with pcci->pcci_lock held */
-static void pcc_inode_attach_init(struct pcc_dataset *dataset,
-                                 struct pcc_inode *pcci,
-                                 struct dentry *dentry,
-                                 enum lu_pcc_type type)
-{
-       pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
-       pcci->pcci_path.dentry = dentry;
-       LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
-       atomic_set(&pcci->pcci_refcount, 1);
-       pcci->pcci_type = type;
-       pcci->pcci_attr_valid = false;
-}
-
 static int __pcc_inode_create(struct pcc_dataset *dataset,
                              struct lu_fid *fid,
                              struct dentry **dentry)
@@ -819,14 +2054,14 @@ static int __pcc_inode_create(struct pcc_dataset *dataset,
 
        pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
 
-       base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
+       base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
        if (IS_ERR(base)) {
                rc = PTR_ERR(base);
                GOTO(out, rc);
        }
 
        snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
-       child = pcc_create(base, path, 0600);
+       child = pcc_create(base, path, 0);
        if (IS_ERR(child)) {
                rc = PTR_ERR(child);
                GOTO(out_base, rc);
@@ -840,32 +2075,84 @@ out:
        return rc;
 }
 
-int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
-                    struct dentry **pcc_dentry)
+/* TODO: Set the project ID for PCC copy */
+int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
+{
+       struct inode *inode = dentry->d_inode;
+       struct iattr attr;
+       int rc;
+
+       ENTRY;
+
+       attr.ia_valid = ATTR_UID | ATTR_GID;
+       attr.ia_uid = uid;
+       attr.ia_gid = gid;
+
+       inode_lock(inode);
+       rc = notify_change(dentry, &attr, NULL);
+       inode_unlock(inode);
+
+       RETURN(rc);
+}
+
+int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
+                    struct lu_fid *fid, struct dentry **pcc_dentry)
 {
-       return __pcc_inode_create(dataset, fid, pcc_dentry);
+       const struct cred *old_cred;
+       int rc;
+
+       old_cred = override_creds(pcc_super_cred(sb));
+       rc = __pcc_inode_create(dataset, fid, pcc_dentry);
+       revert_creds(old_cred);
+       return rc;
 }
 
 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
                          struct dentry *pcc_dentry)
 {
-       struct ll_inode_info *lli = ll_i2info(inode);
+       const struct cred *old_cred;
        struct pcc_inode *pcci;
+       int rc = 0;
 
        ENTRY;
 
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       pcc_inode_lock(inode);
        LASSERT(ll_i2pcci(inode) == NULL);
        OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
        if (pcci == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out_put, rc = -ENOMEM);
 
-       pcc_inode_init(pcci);
-       pcc_inode_lock(inode);
+       rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
+                                  old_cred->sgid);
+       if (rc)
+               GOTO(out_put, rc);
+
+       pcc_inode_init(pcci, ll_i2info(inode));
        pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
-       lli->lli_pcc_inode = pcci;
-       pcc_inode_unlock(inode);
 
-       RETURN(0);
+       rc = pcc_layout_xattr_set(pcci, 0);
+       if (rc) {
+               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               pcc_inode_put(pcci);
+               GOTO(out_unlock, rc);
+       }
+
+       /* Set the layout generation of newly created file with 0 */
+       pcc_layout_gen_set(pcci, 0);
+
+out_put:
+       if (rc) {
+               (void) pcc_inode_remove(inode, pcc_dentry);
+               dput(pcc_dentry);
+
+               if (pcci)
+                       OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+       }
+out_unlock:
+       pcc_inode_unlock(inode);
+       revert_creds(old_cred);
+       RETURN(rc);
 }
 
 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
@@ -874,7 +2161,7 @@ static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
        while (count > 0) {
                ssize_t size;
 
-               size = vfs_write(filp, (const void __user *)buf, count, offset);
+               size = cfs_kernel_write(filp, buf, count, offset);
                if (size < 0)
                        return size;
                count -= size;
@@ -887,7 +2174,6 @@ static int pcc_copy_data(struct file *src, struct file *dst)
 {
        int rc = 0;
        ssize_t rc2;
-       mm_segment_t oldfs;
        loff_t pos, offset = 0;
        size_t buf_len = 1048576;
        void *buf;
@@ -898,35 +2184,55 @@ static int pcc_copy_data(struct file *src, struct file *dst)
        if (buf == NULL)
                RETURN(-ENOMEM);
 
-       oldfs = get_fs();
-       set_fs(KERNEL_DS);
        while (1) {
                pos = offset;
-               rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
+               rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
                if (rc2 < 0)
-                       GOTO(out_fs, rc = rc2);
+                       GOTO(out_free, rc = rc2);
                else if (rc2 == 0)
                        break;
 
                pos = offset;
                rc = pcc_filp_write(dst, buf, rc2, &pos);
                if (rc < 0)
-                       GOTO(out_fs, rc);
+                       GOTO(out_free, rc);
                offset += rc2;
        }
 
-out_fs:
-       set_fs(oldfs);
+out_free:
        OBD_FREE_LARGE(buf, buf_len);
        RETURN(rc);
 }
 
+static int pcc_attach_allowed_check(struct inode *inode)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+       int rc = 0;
+
+       ENTRY;
+
+       pcc_inode_lock(inode);
+       if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
+               GOTO(out_unlock, rc = -EBUSY);
+
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci))
+               GOTO(out_unlock, rc = -EEXIST);
+
+       lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
+out_unlock:
+       pcc_inode_unlock(inode);
+       RETURN(rc);
+}
+
 int pcc_readwrite_attach(struct file *file, struct inode *inode,
                         __u32 archive_id)
 {
        struct pcc_dataset *dataset;
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
+       const struct cred *old_cred;
        struct dentry *dentry;
        struct file *pcc_filp;
        struct path path;
@@ -934,30 +2240,21 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 
        ENTRY;
 
-       pcc_inode_lock(inode);
-       pcci = ll_i2pcci(inode);
-       if (pcci == NULL) {
-               OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
-               if (pcci == NULL) {
-                       pcc_inode_unlock(inode);
-                       RETURN(-ENOMEM);
-               }
-
-               pcc_inode_init(pcci);
-       } else if (atomic_read(&pcci->pcci_refcount) > 0) {
-               pcc_inode_unlock(inode);
-               RETURN(-EEXIST);
-       }
-       pcc_inode_unlock(inode);
+       rc = pcc_attach_allowed_check(inode);
+       if (rc)
+               RETURN(rc);
 
-       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
-                                 archive_id);
+       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
+                                 LU_PCC_READWRITE, archive_id);
        if (dataset == NULL)
-               GOTO(out_free_pcci, rc = -ENOENT);
+               RETURN(-ENOENT);
 
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
        rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
-       if (rc)
+       if (rc) {
+               revert_creds(old_cred);
                GOTO(out_dataset_put, rc);
+       }
 
        path.mnt = dataset->pccd_path.mnt;
        path.dentry = dentry;
@@ -971,81 +2268,191 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 #endif
        if (IS_ERR_OR_NULL(pcc_filp)) {
                rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
+               revert_creds(old_cred);
                GOTO(out_dentry, rc);
        }
 
+       rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
+       revert_creds(old_cred);
+       if (rc)
+               GOTO(out_fput, rc);
+
        rc = pcc_copy_data(file, pcc_filp);
        if (rc)
                GOTO(out_fput, rc);
 
+       /* Pause to allow for a race with concurrent HSM remove */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
+
        pcc_inode_lock(inode);
-       if (lli->lli_pcc_inode)
-               GOTO(out_unlock, rc = -EEXIST);
+       pcci = ll_i2pcci(inode);
+       LASSERT(!pcci);
+       OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+       if (pcci == NULL)
+               GOTO(out_unlock, rc = -ENOMEM);
+
+       pcc_inode_init(pcci, lli);
        pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
-       lli->lli_pcc_inode = pcci;
 out_unlock:
        pcc_inode_unlock(inode);
 out_fput:
        fput(pcc_filp);
 out_dentry:
-       if (rc)
+       if (rc) {
+               old_cred = override_creds(pcc_super_cred(inode->i_sb));
+               (void) pcc_inode_remove(inode, dentry);
+               revert_creds(old_cred);
                dput(dentry);
+       }
 out_dataset_put:
        pcc_dataset_put(dataset);
-out_free_pcci:
-       if (rc)
-               OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
        RETURN(rc);
-
 }
 
 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
-                             bool lease_broken, int rc, bool attached)
+                             __u32 gen, bool lease_broken, int rc,
+                             bool attached)
 {
-       struct pcc_inode *pcci = ll_i2pcci(inode);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       const struct cred *old_cred;
+       struct pcc_inode *pcci;
+       __u32 gen2;
 
        ENTRY;
 
-       if ((rc || lease_broken) && attached && pcci)
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
+       if (rc || lease_broken) {
+               if (attached && pcci)
+                       pcc_inode_put(pcci);
+
+               GOTO(out_unlock, rc);
+       }
+
+       /* PCC inode may be released due to layout lock revocatioin */
+       if (!pcci)
+               GOTO(out_unlock, rc = -ESTALE);
+
+       LASSERT(attached);
+       rc = pcc_layout_xattr_set(pcci, gen);
+       if (rc)
+               GOTO(out_put, rc);
+
+       rc = ll_layout_refresh(inode, &gen2);
+       if (!rc) {
+               if (gen2 == gen) {
+                       pcc_layout_gen_set(pcci, gen);
+               } else {
+                       CDEBUG(D_CACHE,
+                              DFID" layout changed from %d to %d.\n",
+                              PFID(ll_inode2fid(inode)), gen, gen2);
+                       GOTO(out_put, rc = -ESTALE);
+               }
+       }
+
+out_put:
+       if (rc) {
+               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
+       }
+out_unlock:
+       pcc_inode_unlock(inode);
+       revert_creds(old_cred);
+       RETURN(rc);
+}
+
+static int pcc_hsm_remove(struct inode *inode)
+{
+       struct hsm_user_request *hur;
+       __u32 gen;
+       int len;
+       int rc;
+
+       ENTRY;
+
+       rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
+       if (rc) {
+               CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
+                      PFID(&ll_i2info(inode)->lli_fid), rc);
+               RETURN(rc);
+       }
+
+       ll_layout_refresh(inode, &gen);
 
+       len = sizeof(struct hsm_user_request) +
+             sizeof(struct hsm_user_item);
+       OBD_ALLOC(hur, len);
+       if (hur == NULL)
+               RETURN(-ENOMEM);
+
+       hur->hur_request.hr_action = HUA_REMOVE;
+       hur->hur_request.hr_archive_id = 0;
+       hur->hur_request.hr_flags = 0;
+       memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
+              sizeof(hur->hur_user_item[0].hui_fid));
+       hur->hur_user_item[0].hui_extent.offset = 0;
+       hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
+       hur->hur_request.hr_itemcount = 1;
+       rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
+                          len, hur, NULL);
+       if (rc)
+               CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
+                      PFID(&ll_i2info(inode)->lli_fid), rc);
+
+       OBD_FREE(hur, len);
        RETURN(rc);
 }
 
-int pcc_ioctl_detach(struct inode *inode)
+int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       struct pcc_inode *pcci;
+       bool hsm_remove = false;
        int rc = 0;
-       int count;
 
        ENTRY;
 
        pcc_inode_lock(inode);
-       if (pcci == NULL)
+       pcci = lli->lli_pcc_inode;
+       if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
+           !pcc_inode_has_layout(pcci))
                GOTO(out_unlock, rc = 0);
 
-       count = atomic_read(&pcci->pcci_refcount);
-       if (count > 1)
-               GOTO(out_unlock, rc = -EBUSY);
-       else if (count == 0)
-               GOTO(out_unlock, rc = 0);
+       LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+
+       if (pcci->pcci_type == LU_PCC_READWRITE) {
+               if (opt == PCC_DETACH_OPT_UNCACHE)
+                       hsm_remove = true;
+
+               __pcc_layout_invalidate(pcci);
+               pcc_inode_put(pcci);
+       }
 
-       pcc_inode_put(pcci);
-       lli->lli_pcc_inode = NULL;
 out_unlock:
        pcc_inode_unlock(inode);
+       if (hsm_remove) {
+               const struct cred *old_cred;
+
+               old_cred = override_creds(pcc_super_cred(inode->i_sb));
+               rc = pcc_hsm_remove(inode);
+               revert_creds(old_cred);
+       }
 
        RETURN(rc);
 }
 
-int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
+int pcc_ioctl_state(struct file *file, struct inode *inode,
+                   struct lu_pcc_state *state)
 {
        int rc = 0;
        int count;
        char *buf;
        char *path;
        int buf_len = sizeof(state->pccs_path);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
        struct pcc_inode *pcci;
 
        ENTRY;
@@ -1067,19 +2474,20 @@ int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
        count = atomic_read(&pcci->pcci_refcount);
        if (count == 0) {
                state->pccs_type = LU_PCC_NONE;
+               state->pccs_open_count = 0;
                GOTO(out_unlock, rc = 0);
        }
+
+       if (pcc_inode_has_layout(pcci))
+               count--;
+       if (pccf->pccf_file != NULL)
+               count--;
        state->pccs_type = pcci->pcci_type;
-       state->pccs_open_count = count - 1;
-       state->pccs_flags = pcci->pcci_attr_valid ?
-                           PCC_STATE_FLAG_ATTR_VALID : 0;
-#ifdef HAVE_DENTRY_PATH_RAW
+       state->pccs_open_count = count;
+       state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
        path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
        if (IS_ERR(path))
                GOTO(out_unlock, rc = PTR_ERR(path));
-#else
-       path = "UNKNOWN";
-#endif
 
        if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
                GOTO(out_unlock, rc = -ENAMETOOLONG);