Whamcloud - gitweb
LU-17674 build: use nop_mnt_idmap in inode_owner_or_capable
[fs/lustre-release.git] / lustre / llite / pcc.c
index b80173e..d9ed477 100644 (file)
@@ -205,12 +205,12 @@ static void pcc_cmd_fini(struct pcc_cmd *cmd)
        }
 }
 
-#define PCC_DISJUNCTION_DELIM  (',')
-#define PCC_CONJUNCTION_DELIM  ('&')
-#define PCC_EXPRESSION_DELIM   ('=')
+#define PCC_DISJUNCTION_DELIM  (",")
+#define PCC_CONJUNCTION_DELIM  ("&")
+#define PCC_EXPRESSION_DELIM   ("=")
 
 static int
-pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
+pcc_fname_list_add(char *id, struct list_head *fname_list)
 {
        struct pcc_match_fname *fname;
 
@@ -218,50 +218,42 @@ pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
        if (fname == NULL)
                return -ENOMEM;
 
-       OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
+       OBD_ALLOC(fname->pmf_name, strlen(id) + 1);
        if (fname->pmf_name == NULL) {
                OBD_FREE_PTR(fname);
                return -ENOMEM;
        }
 
-       memcpy(fname->pmf_name, id->ls_str, id->ls_len);
+       strcpy(fname->pmf_name, id);
        list_add_tail(&fname->pmf_linkage, fname_list);
        return 0;
 }
 
 static int
-pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
+pcc_fname_list_parse(char *str, struct list_head *fname_list)
 {
-       struct cfs_lstr src;
-       struct cfs_lstr res;
        int rc = 0;
 
        ENTRY;
 
-       src.ls_str = str;
-       src.ls_len = len;
        INIT_LIST_HEAD(fname_list);
-       while (src.ls_str) {
-               rc = cfs_gettok(&src, ' ', &res);
-               if (rc == 0) {
-                       rc = -EINVAL;
-                       break;
-               }
-               rc = pcc_fname_list_add(&res, fname_list);
-               if (rc)
-                       break;
+       while (rc == 0 && str) {
+               char *fname = strsep(&str, " ");
+
+               if (*fname)
+                       rc = pcc_fname_list_add(fname, fname_list);
        }
+       if (list_empty(fname_list))
+               rc = -EINVAL;
        if (rc)
                pcc_fname_list_free(fname_list);
        RETURN(rc);
 }
 
 static int
-pcc_id_list_parse(char *str, int len, struct list_head *id_list,
+pcc_id_list_parse(char *str, struct list_head *id_list,
                  enum pcc_field type)
 {
-       struct cfs_lstr src;
-       struct cfs_lstr res;
        int rc = 0;
 
        ENTRY;
@@ -270,19 +262,18 @@ pcc_id_list_parse(char *str, int len, struct list_head *id_list,
            type != PCC_FIELD_PROJID)
                RETURN(-EINVAL);
 
-       src.ls_str = str;
-       src.ls_len = len;
        INIT_LIST_HEAD(id_list);
-       while (src.ls_str) {
+       while (str) {
+               char *num;
                struct pcc_match_id *id;
-               __u32 id_val;
-
-               if (cfs_gettok(&src, ' ', &res) == 0)
-                       GOTO(out, rc = -EINVAL);
+               unsigned long id_val;
 
-               if (!cfs_str2num_check(res.ls_str, res.ls_len,
-                                      &id_val, 0, (u32)~0U))
-                       GOTO(out, rc = -EINVAL);
+               num = strsep(&str, " ");
+               if (!*num)
+                       continue;
+               rc = kstrtoul(num, 0, &id_val);
+               if (rc)
+                       GOTO(out, rc);
 
                OBD_ALLOC_PTR(id);
                if (id == NULL)
@@ -291,66 +282,59 @@ pcc_id_list_parse(char *str, int len, struct list_head *id_list,
                id->pmi_id = id_val;
                list_add_tail(&id->pmi_linkage, id_list);
        }
+       if (list_empty(id_list))
+               rc = -EINVAL;
 out:
        if (rc)
                pcc_id_list_free(id_list);
        RETURN(rc);
 }
 
-static inline bool
-pcc_check_field(struct cfs_lstr *field, char *str)
-{
-       int len = strlen(str);
-
-       return (field->ls_len == len &&
-               strncmp(field->ls_str, str, len) == 0);
-}
-
 static int
-pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
+pcc_expression_parse(char *str, struct list_head *cond_list)
 {
        struct pcc_expression *expr;
-       struct cfs_lstr field;
+       char *field;
+       int len;
        int rc = 0;
 
        OBD_ALLOC_PTR(expr);
        if (expr == NULL)
                return -ENOMEM;
 
-       rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
-       if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
-           src->ls_str[src->ls_len - 1] != '}')
+       field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
+       if (!*field || !str)
+               /* No LHS or no '=' */
+               GOTO(out, rc = -EINVAL);
+       str = skip_spaces(str);
+       len = strlen(str);
+       if (str[0] != '{' || str[len - 1] != '}')
                GOTO(out, rc = -EINVAL);
 
        /* Skip '{' and '}' */
-       src->ls_str++;
-       src->ls_len -= 2;
+       str[len - 1] = '\0';
+       str += 1;
 
-       if (pcc_check_field(&field, "uid")) {
-               if (pcc_id_list_parse(src->ls_str,
-                                     src->ls_len,
+       if (strcmp(field, "uid") == 0) {
+               if (pcc_id_list_parse(str,
                                      &expr->pe_cond,
                                      PCC_FIELD_UID) < 0)
                        GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_UID;
-       } else if (pcc_check_field(&field, "gid")) {
-               if (pcc_id_list_parse(src->ls_str,
-                                     src->ls_len,
+       } else if (strcmp(field, "gid") == 0) {
+               if (pcc_id_list_parse(str,
                                      &expr->pe_cond,
                                      PCC_FIELD_GID) < 0)
                        GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_GID;
-       } else if (pcc_check_field(&field, "projid")) {
-               if (pcc_id_list_parse(src->ls_str,
-                                     src->ls_len,
+       } else if (strcmp(field, "projid") == 0) {
+               if (pcc_id_list_parse(str,
                                      &expr->pe_cond,
                                      PCC_FIELD_PROJID) < 0)
                        GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_PROJID;
-       } else if (pcc_check_field(&field, "fname")) {
-               if (pcc_fname_list_parse(src->ls_str,
-                                        src->ls_len,
-                                        &expr->pe_cond) < 0)
+       } else if (strcmp(field, "fname") == 0) {
+               if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
                        GOTO(out, rc = -EINVAL);
                expr->pe_field = PCC_FIELD_FNAME;
        } else {
@@ -365,10 +349,9 @@ out:
 }
 
 static int
-pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
+pcc_conjunction_parse(char *str, struct list_head *cond_list)
 {
        struct pcc_conjunction *conjunction;
-       struct cfs_lstr expr;
        int rc = 0;
 
        OBD_ALLOC_PTR(conjunction);
@@ -378,39 +361,31 @@ pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
        INIT_LIST_HEAD(&conjunction->pc_expressions);
        list_add_tail(&conjunction->pc_linkage, cond_list);
 
-       while (src->ls_str) {
-               rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
-               if (rc == 0) {
-                       rc = -EINVAL;
-                       break;
-               }
-               rc = pcc_expression_parse(&expr,
-                                         &conjunction->pc_expressions);
-               if (rc)
-                       break;
+       while (rc == 0 && str) {
+               char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
+
+               rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
        }
        return rc;
 }
 
-static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
+static int pcc_conds_parse(char *orig, struct list_head *cond_list)
 {
-       struct cfs_lstr src;
-       struct cfs_lstr res;
+       char *str;
        int rc = 0;
 
-       src.ls_str = str;
-       src.ls_len = len;
+       orig = kstrdup(orig, GFP_KERNEL);
+       if (!orig)
+               return -ENOMEM;
+       str = orig;
+
        INIT_LIST_HEAD(cond_list);
-       while (src.ls_str) {
-               rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
-               if (rc == 0) {
-                       rc = -EINVAL;
-                       break;
-               }
-               rc = pcc_conjunction_parse(&res, cond_list);
-               if (rc)
-                       break;
+       while (rc == 0 && str) {
+               char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
+
+               rc = pcc_conjunction_parse(term, cond_list);
        }
+       kfree(orig);
        return rc;
 }
 
@@ -425,7 +400,6 @@ static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
        memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
 
        rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
-                            strlen(cmd->u.pccc_add.pccc_conds_str),
                             &cmd->u.pccc_add.pccc_conds);
        if (rc)
                pcc_cmd_fini(cmd);
@@ -537,16 +511,23 @@ pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
                        return -EINVAL;
                /*
                 * By default, a PCC backend can provide caching service for
-                * both RW-PCC and RO-PCC.
+                * both PCC-RW and PCC-RO.
                 */
                if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
                        cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
 
-               /* For RW-PCC, the value of @rwid must be non zero. */
-               if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
-                   cmd->u.pccc_add.pccc_rwid == 0)
+               if (cmd->u.pccc_add.pccc_rwid == 0 &&
+                   cmd->u.pccc_add.pccc_roid == 0)
                        return -EINVAL;
 
+               if (cmd->u.pccc_add.pccc_rwid == 0 &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC)
+                       cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
+
+               if (cmd->u.pccc_add.pccc_roid == 0 &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
+                       cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
+
                break;
        case PCC_DEL_DATASET:
        case PCC_CLEAR_ALL:
@@ -584,8 +565,7 @@ pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
        INIT_LIST_HEAD(&rule->pmr_conds);
        if (!list_empty(&cmd->u.pccc_add.pccc_conds))
                rc = pcc_conds_parse(rule->pmr_conds_str,
-                                         strlen(rule->pmr_conds_str),
-                                         &rule->pmr_conds);
+                                    &rule->pmr_conds);
 
        if (rc)
                pcc_dataset_rule_fini(rule);
@@ -781,7 +761,7 @@ pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
        return rc;
 }
 
-struct pcc_dataset *
+static struct pcc_dataset *
 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
 {
        struct pcc_dataset *dataset;
@@ -799,6 +779,9 @@ pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
                if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
                    !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
                        continue;
+               if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
+                   !(dataset->pccd_flags & PCC_DATASET_ROPCC)))
+                       continue;
                atomic_inc(&dataset->pccd_refcount);
                selected = dataset;
                break;
@@ -1057,18 +1040,18 @@ void pcc_inode_free(struct inode *inode)
  * reduce overhead:
  * (fid->f_oid >> 16 & oxFFFF)/FID
  */
-#define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
+#define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
 {
-       return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
-                       DFID_NOBRACE,
-                       (fid)->f_oid       & 0xFFFF,
-                       (fid)->f_oid >> 16 & 0xFFFF,
-                       (unsigned int)((fid)->f_seq       & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
-                       PFID(fid));
+       return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
+                        DFID_NOBRACE,
+                        (fid)->f_oid       & 0xFFFF,
+                        (fid)->f_oid >> 16 & 0xFFFF,
+                        (unsigned int)((fid)->f_seq       & 0xFFFF),
+                        (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
+                        (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
+                        (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
+                        PFID(fid));
 }
 
 static inline const struct cred *pcc_super_cred(struct super_block *sb)
@@ -1129,27 +1112,12 @@ static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
                RETURN(PTR_ERR(env));
 
        rc = cl_object_layout_get(env, lli->lli_clob, clt);
-       if (rc)
+       if (rc < 0)
                CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
                       PFID(ll_inode2fid(inode)));
 
        cl_env_put(env, &refcheck);
-       RETURN(rc);
-}
-
-static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
-                                   struct pcc_dataset *dataset)
-{
-       return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
-                       DFID_NOBRACE,
-                       dataset->pccd_pathname,
-                       (fid)->f_oid       & 0xFFFF,
-                       (fid)->f_oid >> 16 & 0xFFFF,
-                       (unsigned int)((fid)->f_seq       & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
-                       (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
-                       PFID(fid));
+       RETURN(rc < 0 ? rc : 0);
 }
 
 /* Must be called with pcci->pcci_lock held */
@@ -1198,6 +1166,72 @@ static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
        return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
 }
 
+static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
+{
+       char *ptr = NULL, *component;
+       struct dentry *parent;
+       struct dentry *child = ERR_PTR(-ENOENT);
+
+       ptr = pathname;
+
+       /* move past any initial '/' to the start of the first path component*/
+       while (*ptr == '/')
+               ptr++;
+
+       /* store the start of the first path component */
+       component = ptr;
+
+       parent = dget(base);
+       while (ptr) {
+               /* find the start of the next component - if we don't find it,
+                * the current component is the last component
+                */
+               ptr = strchr(ptr, '/');
+               /* put a NUL char in place of the '/' before the next compnent
+                * so we can treat this component as a string; note the full
+                * path string is NUL terminated to this is not needed for the
+                * last component
+                */
+               if (ptr)
+                       *ptr = '\0';
+
+               /* look up the current component */
+               inode_lock(parent->d_inode);
+               child = lookup_one_len(component, parent, strlen(component));
+               inode_unlock(parent->d_inode);
+
+               /* repair the path string: put '/' back in place of the NUL */
+               if (ptr)
+                       *ptr = '/';
+
+               dput(parent);
+
+               if (IS_ERR_OR_NULL(child))
+                       break;
+
+               /* we may find a cached negative dentry */
+               if (!d_is_positive(child)) {
+                       dput(child);
+                       child = NULL;
+                       break;
+               }
+
+               /* descend in to the next level of the path */
+               parent = child;
+
+               /* move the pointer past the '/' to the next component */
+               if (ptr)
+                       ptr++;
+               component = ptr;
+       }
+
+       /* NULL child means we didn't find anything */
+       if (!child)
+               child = ERR_PTR(-ENOENT);
+
+       return child;
+}
+
 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
                                  enum lu_pcc_type type,
                                  struct pcc_dataset *dataset,
@@ -1206,9 +1240,8 @@ static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci = lli->lli_pcc_inode;
        const struct cred *old_cred;
-       struct dentry *pcc_dentry;
-       struct path path;
-       char *pathname;
+       struct dentry *pcc_dentry = NULL;
+       char pathname[PCC_DATASET_MAX_PATH];
        __u32 pcc_gen;
        int rc;
 
@@ -1218,24 +1251,29 @@ static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
            !(dataset->pccd_flags & PCC_DATASET_RWPCC))
                RETURN(0);
 
-       OBD_ALLOC(pathname, PATH_MAX);
-       if (pathname == NULL)
-               RETURN(-ENOMEM);
+       if (type == LU_PCC_READONLY &&
+           !(dataset->pccd_flags & PCC_DATASET_ROPCC))
+               RETURN(0);
 
-       pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
+       rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
+                                 &lli->lli_fid);
 
        old_cred = override_creds(pcc_super_cred(inode->i_sb));
-       rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
-       if (rc)
+       pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
+       if (IS_ERR(pcc_dentry)) {
+               rc = PTR_ERR(pcc_dentry);
+               CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
+                      ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
+                      pathname, rc);
                /* ignore this error */
                GOTO(out, rc = 0);
+       }
 
-       pcc_dentry = path.dentry;
        rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
                             &pcc_gen, sizeof(pcc_gen));
        if (rc < 0)
                /* ignore this error */
-               GOTO(out_put_path, rc = 0);
+               GOTO(out_put_pcc_dentry, rc = 0);
 
        rc = 0;
        /* The file is still valid cached in PCC, attach it immediately. */
@@ -1245,7 +1283,7 @@ static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
                if (!pcci) {
                        OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
                        if (pcci == NULL)
-                               GOTO(out_put_path, rc = -ENOMEM);
+                               GOTO(out_put_pcc_dentry, rc = -ENOMEM);
 
                        pcc_inode_init(pcci, lli);
                        dget(pcc_dentry);
@@ -1267,11 +1305,10 @@ static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
                pcc_layout_gen_set(pcci, gen);
                *cached = true;
        }
-out_put_path:
-       path_put(&path);
+out_put_pcc_dentry:
+       dput(pcc_dentry);
 out:
        revert_creds(old_cred);
-       OBD_FREE(pathname, PATH_MAX);
        RETURN(rc);
 }
 
@@ -1396,6 +1433,9 @@ static int pcc_try_auto_attach(struct inode *inode, bool *cached,
        if (clt.cl_is_released)
                rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
                                             LU_PCC_READWRITE, cached);
+       else if (clt.cl_is_rdonly)
+               rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
+                                            LU_PCC_READONLY, cached);
 
        RETURN(rc);
 }
@@ -1406,9 +1446,11 @@ static inline bool pcc_may_auto_attach(struct inode *inode,
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_super *super = ll_i2pccs(inode);
 
+       ENTRY;
+
        /* Known the file was not in any PCC backend. */
        if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
-               return false;
+               RETURN(false);
 
        /*
         * lli_pcc_generation == 0 means that the file was never attached into
@@ -1423,16 +1465,16 @@ static inline bool pcc_may_auto_attach(struct inode *inode,
         * immediately in pcc_try_auto_attach().
         */
        if (super->pccs_generation != lli->lli_pcc_generation)
-               return true;
+               RETURN(true);
 
        /* The cached setting @lli_pcc_dsflags is valid */
        if (iot == PIT_OPEN)
-               return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
+               RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
 
        if (iot == PIT_GETATTR)
-               return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
+               RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
 
-       return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
+       RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
 }
 
 int pcc_file_open(struct inode *inode, struct file *file)
@@ -1451,6 +1493,9 @@ int pcc_file_open(struct inode *inode, struct file *file)
        if (!S_ISREG(inode->i_mode))
                RETURN(0);
 
+       if (IS_ENCRYPTED(inode))
+               RETURN(0);
+
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
 
@@ -1518,6 +1563,28 @@ out:
        RETURN_EXIT;
 }
 
+/* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
+static bool pcc_io_tolerate(struct pcc_inode *pcci,
+                           enum pcc_io_type iot, int rc)
+{
+       if (pcci->pcci_type == LU_PCC_READWRITE) {
+               if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
+                       return false;
+               /* Handle the ->page_mkwrite failure tolerance separately
+                * in pcc_page_mkwrite().
+                */
+       } else if (pcci->pcci_type == LU_PCC_READONLY) {
+               if ((iot == PIT_READ || iot == PIT_GETATTR ||
+                    iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM)
+                       return false;
+               if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
+                   !(rc & VM_FAULT_OOM))
+                       return false;
+       }
+
+       return true;
+}
+
 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
 {
        struct pcc_inode *pcci;
@@ -1526,8 +1593,21 @@ static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
        pcci = ll_i2pcci(inode);
        if (pcci && pcc_inode_has_layout(pcci)) {
                LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
-               atomic_inc(&pcci->pcci_active_ios);
-               *cached = true;
+               if (pcci->pcci_type == LU_PCC_READONLY &&
+                   (iot == PIT_WRITE || iot == PIT_SETATTR ||
+                    iot == PIT_PAGE_MKWRITE)) {
+                       /* Fall back to normal I/O path */
+                       *cached = false;
+                       /* For mmap write, we need to detach the file from
+                        * RO-PCC, release the page got from ->fault(), and
+                        * then retry the memory fault handling (->fault()
+                        * and ->page_mkwrite()).
+                        * These are done in pcc_page_mkwrite();
+                        */
+               } else {
+                       atomic_inc(&pcci->pcci_active_ios);
+                       *cached = true;
+               }
        } else {
                *cached = false;
                if (pcc_may_auto_attach(inode, iot)) {
@@ -1542,13 +1622,16 @@ static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
        pcc_inode_unlock(inode);
 }
 
-static void pcc_io_fini(struct inode *inode)
+static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
+                       int rc, bool *cached)
 {
        struct pcc_inode *pcci = ll_i2pcci(inode);
 
-       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
+       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
+
+       *cached = pcc_io_tolerate(pcci, iot, rc);
        if (atomic_dec_and_test(&pcci->pcci_active_ios))
-               wake_up_all(&pcci->pcci_waitq);
+               wake_up(&pcci->pcci_waitq);
 }
 
 
@@ -1607,6 +1690,10 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
        if (!*cached)
                RETURN(0);
 
+       /* Fake I/O error on RO-PCC */
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, result = -EIO);
+
        iocb->ki_filp = pccf->pccf_file;
        /* generic_file_aio_read does not support ext4-dax,
         * __pcc_file_read_iter uses ->aio_read hook directly
@@ -1614,8 +1701,8 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
         */
        result = __pcc_file_read_iter(iocb, iter);
        iocb->ki_filp = file;
-
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_READ, result, cached);
        RETURN(result);
 }
 
@@ -1679,7 +1766,7 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
        if (!*cached)
                RETURN(0);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
                GOTO(out, result = -ENOSPC);
 
        iocb->ki_filp = pccf->pccf_file;
@@ -1691,7 +1778,7 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
        result = __pcc_file_write_iter(iocb, iter);
        iocb->ki_filp = file;
 out:
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_WRITE, result, cached);
        RETURN(result);
 }
 
@@ -1722,11 +1809,16 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
        pcc_dentry = pcci->pcci_path.dentry;
        inode_lock(pcc_dentry->d_inode);
        old_cred = override_creds(pcc_super_cred(inode->i_sb));
+#ifdef HAVE_USER_NAMESPACE_ARG
+       rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
+                                               &attr2);
+#else
        rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
+#endif
        revert_creds(old_cred);
        inode_unlock(pcc_dentry->d_inode);
 
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_SETATTR, rc, cached);
        RETURN(rc);
 }
 
@@ -1760,16 +1852,16 @@ int pcc_inode_getattr(struct inode *inode, u32 request_mask,
                GOTO(out, rc);
 
        ll_inode_size_lock(inode);
-       if (ll_file_test_and_clear_flag(lli, LLIF_UPDATE_ATIME) ||
-           inode->i_atime.tv_sec < lli->lli_atime)
-               inode->i_atime.tv_sec = lli->lli_atime;
+       if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
+           inode_get_atime_sec(inode) < lli->lli_atime)
+               inode_set_atime(inode, lli->lli_atime, 0);
 
-       inode->i_mtime.tv_sec = lli->lli_mtime;
-       inode->i_ctime.tv_sec = lli->lli_ctime;
+       inode_set_mtime(inode, lli->lli_mtime, 0);
+       inode_set_ctime(inode, lli->lli_ctime, 0);
 
-       atime = inode->i_atime.tv_sec;
-       mtime = inode->i_mtime.tv_sec;
-       ctime = inode->i_ctime.tv_sec;
+       atime = inode_get_atime_sec(inode);
+       mtime = inode_get_mtime_sec(inode);
+       ctime = inode_get_ctime_sec(inode);
 
        if (atime < stat.atime.tv_sec)
                atime = stat.atime.tv_sec;
@@ -1783,53 +1875,52 @@ int pcc_inode_getattr(struct inode *inode, u32 request_mask,
        i_size_write(inode, stat.size);
        inode->i_blocks = stat.blocks;
 
-       inode->i_atime.tv_sec = atime;
-       inode->i_mtime.tv_sec = mtime;
-       inode->i_ctime.tv_sec = ctime;
+       inode_set_atime(inode, atime, 0);
+       inode_set_mtime(inode, mtime, 0);
+       inode_set_ctime(inode, ctime, 0);
 
        ll_inode_size_unlock(inode);
 out:
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_GETATTR, rc, cached);
        RETURN(rc);
 }
 
+#ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
                             struct pipe_inode_info *pipe,
-                            size_t count, unsigned int flags,
-                            bool *cached)
+                            size_t count, unsigned int flags)
 {
        struct inode *inode = file_inode(in_file);
        struct ll_file_data *fd = in_file->private_data;
        struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       bool cached = false;
        ssize_t result;
 
        ENTRY;
 
-       *cached = false;
        if (!pcc_file)
-               RETURN(0);
+               RETURN(default_file_splice_read(in_file, ppos, pipe,
+                                               count, flags));
 
-       if (!file_inode(pcc_file)->i_fop->splice_read)
-               RETURN(-ENOTSUPP);
+       pcc_io_init(inode, PIT_SPLICE_READ, &cached);
+       if (!cached)
+               RETURN(default_file_splice_read(in_file, ppos, pipe,
+                                               count, flags));
 
-       pcc_io_init(inode, PIT_SPLICE_READ, cached);
-       if (!*cached)
-               RETURN(0);
+       result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
 
-       result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
-                                                         ppos, pipe, count,
-                                                         flags);
-
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
        RETURN(result);
 }
+#endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
 
 int pcc_fsync(struct file *file, loff_t start, loff_t end,
              int datasync, bool *cached)
 {
        struct inode *inode = file_inode(file);
        struct ll_file_data *fd = file->private_data;
-       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct file *pcc_file = pccf->pccf_file;
        int rc;
 
        ENTRY;
@@ -1839,6 +1930,22 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
                RETURN(0);
        }
 
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       /*
+        * After the file is attached into RO-PCC, its dirty pages on this
+        * client may not be flushed. So fsync() should fall back to normal
+        * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
+        * copy is meaningless.
+        */
+       if (pccf->pccf_type == LU_PCC_READONLY) {
+               *cached = false;
+               RETURN(-EAGAIN);
+       }
+
        pcc_io_init(inode, PIT_FSYNC, cached);
        if (!*cached)
                RETURN(0);
@@ -1846,7 +1953,7 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
        rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
                                                start, end, datasync);
 
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_FSYNC, rc, cached);
        RETURN(rc);
 }
 
@@ -1956,12 +2063,12 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                       "%s: PCC backend fs not support ->page_mkwrite()\n",
                       ll_i2sbi(inode)->ll_fsname);
                pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
-               up_read(&mm->mmap_sem);
+               mmap_read_unlock(mm);
                *cached = true;
                RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
        }
        /* Pause to allow for a race with concurrent detach */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
 
        pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
        if (!*cached) {
@@ -1982,8 +2089,9 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                 * __do_page_fault and retry the memory fault handling.
                 */
                if (page->mapping == pcc_file->f_mapping) {
+                       pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
                        *cached = true;
-                       up_read(&mm->mmap_sem);
+                       mmap_read_unlock(mm);
                        RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
                }
 
@@ -1994,12 +2102,8 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
         * This fault injection can also be used to simulate -ENOSPC and
         * -EDQUOT failure of underlying PCC backend fs.
         */
-       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
-               pcc_io_fini(inode);
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
-               up_read(&mm->mmap_sem);
-               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
-       }
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
+               GOTO(out, rc = VM_FAULT_SIGBUS);
 
        vma->vm_file = pcc_file;
 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
@@ -2009,7 +2113,18 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
 #endif
        vma->vm_file = file;
 
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
+
+       /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
+        * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
+        * Lustre I/O path.
+        */
+       if (rc & VM_FAULT_SIGBUS) {
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+               mmap_read_unlock(mm);
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
        RETURN(rc);
 }
 
@@ -2030,10 +2145,19 @@ int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
                RETURN(0);
        }
 
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
        pcc_io_init(inode, PIT_FAULT, cached);
        if (!*cached)
                RETURN(0);
 
+       /* Tolerate the mmap read failure for RO-PCC */
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, rc = VM_FAULT_SIGBUS);
+
        vma->vm_file = pcc_file;
 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
        rc = pcc_vm_ops->fault(vmf);
@@ -2041,8 +2165,8 @@ int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
        rc = pcc_vm_ops->fault(vma, vmf);
 #endif
        vma->vm_file = file;
-
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_FAULT, rc, cached);
        RETURN(rc);
 }
 
@@ -2083,13 +2207,15 @@ void pcc_layout_invalidate(struct inode *inode)
 
 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
 {
+       struct dentry *parent = dget_parent(pcc_dentry);
        int rc;
 
-       rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
+       rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry);
        if (rc)
                CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
                      ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
 
+       dput(parent);
        return rc;
 }
 
@@ -2109,7 +2235,7 @@ pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
        if (d_is_positive(dentry))
                goto out;
 
-       rc = vfs_mkdir(dir, dentry, mode);
+       rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
        if (rc) {
                dput(dentry);
                dentry = ERR_PTR(rc);
@@ -2165,7 +2291,7 @@ pcc_create(struct dentry *base, const char *name, umode_t mode)
        if (d_is_positive(dentry))
                goto out;
 
-       rc = vfs_create(dir, dentry, mode, false);
+       rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
        if (rc) {
                dput(dentry);
                dentry = ERR_PTR(rc);
@@ -2185,11 +2311,11 @@ static int __pcc_inode_create(struct pcc_dataset *dataset,
        struct dentry *child;
        int rc = 0;
 
-       OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
+       OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
        if (path == NULL)
                return -ENOMEM;
 
-       pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
+       pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
 
        base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
        if (IS_ERR(base)) {
@@ -2197,7 +2323,7 @@ static int __pcc_inode_create(struct pcc_dataset *dataset,
                GOTO(out, rc);
        }
 
-       snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
+       snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
        child = pcc_create(base, path, 0);
        if (IS_ERR(child)) {
                rc = PTR_ERR(child);
@@ -2208,7 +2334,7 @@ static int __pcc_inode_create(struct pcc_dataset *dataset,
 out_base:
        dput(base);
 out:
-       OBD_FREE(path, MAX_PCC_DATABASE_PATH);
+       OBD_FREE(path, PCC_DATASET_MAX_PATH);
        return rc;
 }
 
@@ -2216,8 +2342,8 @@ out:
  * Reset uid, gid or size for the PCC copy masked by @valid.
  * TODO: Set the project ID for PCC copy.
  */
-int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
-                         kuid_t uid, kgid_t gid, loff_t size)
+static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
+                                kuid_t uid, kgid_t gid, loff_t size)
 {
        struct inode *inode = dentry->d_inode;
        struct iattr attr;
@@ -2231,7 +2357,7 @@ int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
        attr.ia_size = size;
 
        inode_lock(inode);
-       rc = notify_change(dentry, &attr, NULL);
+       rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
        inode_unlock(inode);
 
        RETURN(rc);
@@ -2252,8 +2378,8 @@ int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
 {
        struct dentry *pcc_dentry = pca->pca_dentry;
-       struct pcc_super *super = ll_i2pccs(inode);
        const struct cred *old_cred;
+       struct pcc_super *super;
        struct pcc_inode *pcci;
        int rc;
 
@@ -2265,6 +2391,8 @@ int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
        if (!inode)
                GOTO(out_dataset_put, rc = 0);
 
+       super = ll_i2pccs(inode);
+
        LASSERT(pcc_dentry);
 
        old_cred = override_creds(super->pccs_cred);
@@ -2284,7 +2412,8 @@ int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
 
        rc = pcc_layout_xattr_set(pcci, 0);
        if (rc) {
-               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               if (!pcci->pcci_unlinked)
+                       (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
                GOTO(out_unlock, rc);
        }
@@ -2315,12 +2444,16 @@ void pcc_create_attach_cleanup(struct super_block *sb,
                return;
 
        if (pca->pca_dentry) {
+               struct dentry *parent;
+               struct inode *i_dir;
                const struct cred *old_cred;
                int rc;
 
                old_cred = override_creds(pcc_super_cred(sb));
-               rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
-                                  pca->pca_dentry);
+               parent = dget_parent(pca->pca_dentry);
+               i_dir = d_inode(parent);
+               rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry);
+               dput(parent);
                if (rc)
                        CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
                              ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
@@ -2407,15 +2540,11 @@ out_unlock:
        RETURN(rc);
 }
 
-int pcc_readwrite_attach(struct file *file, struct inode *inode,
-                        __u32 archive_id)
+static int pcc_attach_data_archive(struct file *file, struct inode *inode,
+                                  struct pcc_dataset *dataset,
+                                  struct dentry **dentry)
 {
-       struct pcc_dataset *dataset;
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_super *super = ll_i2pccs(inode);
-       struct pcc_inode *pcci;
        const struct cred *old_cred;
-       struct dentry *dentry;
        struct file *pcc_filp;
        struct path path;
        ssize_t ret;
@@ -2423,29 +2552,20 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 
        ENTRY;
 
-       rc = pcc_attach_allowed_check(inode);
-       if (rc)
-               RETURN(rc);
-
-       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
-                                 LU_PCC_READWRITE, archive_id);
-       if (dataset == NULL)
-               RETURN(-ENOENT);
-
-       old_cred = override_creds(super->pccs_cred);
-       rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
        if (rc)
-               GOTO(out_dataset_put, rc);
+               GOTO(out_cred, rc);
 
        path.mnt = dataset->pccd_path.mnt;
-       path.dentry = dentry;
+       path.dentry = *dentry;
        pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
        if (IS_ERR_OR_NULL(pcc_filp)) {
                rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
                GOTO(out_dentry, rc);
        }
 
-       rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
+       rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
                                   old_cred->uid, old_cred->gid, 0);
        if (rc)
                GOTO(out_fput, rc);
@@ -2459,13 +2579,47 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
         * copy after copy data. Otherwise, it may get wrong file size after
         * re-attach a file. See LU-13023 for details.
         */
-       rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
+       rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
                                   KGIDT_INIT(0), ret);
+out_fput:
+       fput(pcc_filp);
+out_dentry:
+       if (rc) {
+               pcc_inode_remove(inode, *dentry);
+               dput(*dentry);
+       }
+out_cred:
+       revert_creds(old_cred);
+       RETURN(rc);
+}
+
+int pcc_readwrite_attach(struct file *file, struct inode *inode,
+                        __u32 archive_id)
+{
+       struct pcc_dataset *dataset;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_super *super = ll_i2pccs(inode);
+       struct pcc_inode *pcci;
+       struct dentry *dentry;
+       int rc;
+
+       ENTRY;
+
+       rc = pcc_attach_allowed_check(inode);
        if (rc)
-               GOTO(out_fput, rc);
+               RETURN(rc);
+
+       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
+                                 LU_PCC_READWRITE, archive_id);
+       if (dataset == NULL)
+               RETURN(-ENOENT);
+
+       rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+       if (rc)
+               GOTO(out_dataset_put, rc);
 
        /* Pause to allow for a race with concurrent HSM remove */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
 
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
@@ -2478,16 +2632,16 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
                             dentry, LU_PCC_READWRITE);
 out_unlock:
        pcc_inode_unlock(inode);
-out_fput:
-       fput(pcc_filp);
-out_dentry:
        if (rc) {
+               const struct cred *old_cred;
+
+               old_cred = override_creds(pcc_super_cred(inode->i_sb));
                (void) pcc_inode_remove(inode, dentry);
+               revert_creds(old_cred);
                dput(dentry);
        }
 out_dataset_put:
        pcc_dataset_put(dataset);
-       revert_creds(old_cred);
 
        RETURN(rc);
 }
@@ -2537,7 +2691,8 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
 
 out_put:
        if (rc) {
-               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               if (!pcci->pcci_unlinked)
+                       (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
        }
 out_unlock:
@@ -2547,6 +2702,178 @@ out_unlock:
        RETURN(rc);
 }
 
+static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
+
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct lu_extent ext = {
+               .e_start = 0,
+               .e_end = OBD_OBJECT_EOF,
+       };
+       struct cl_layout clt = {
+               .cl_layout_gen = 0,
+               .cl_is_released = false,
+               .cl_is_rdonly = false,
+       };
+       int retries = 0;
+       int rc;
+
+       ENTRY;
+
+repeat:
+       rc = pcc_get_layout_info(inode, &clt);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * For the HSM released file, restore the data first.
+        */
+       if (clt.cl_is_released) {
+               retries++;
+               if (retries > 2)
+                       RETURN(-EBUSY);
+
+               if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
+                       rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
+                       if (rc) {
+                               CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
+                                      PFID(&lli->lli_fid), rc);
+                               RETURN(rc);
+                       }
+               }
+               rc = ll_layout_refresh(inode, gen);
+               if (rc)
+                       RETURN(rc);
+
+               goto repeat;
+       }
+
+
+       if (!clt.cl_is_rdonly) {
+               rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
+                                           &ext);
+               if (rc)
+                       RETURN(rc);
+
+               rc = ll_layout_refresh(inode, gen);
+               if (rc)
+                       RETURN(rc);
+       } else { /* Readonly layout */
+               *gen = clt.cl_layout_gen;
+       }
+
+       RETURN(rc);
+}
+
+static int pcc_readonly_ioctl_attach(struct file *file,
+                                    struct inode *inode,
+                                    struct lu_pcc_attach *attach)
+{
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct pcc_super *super = ll_i2pccs(inode);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       const struct cred *old_cred;
+       struct pcc_dataset *dataset;
+       struct pcc_inode *pcci;
+       struct dentry *dentry;
+       bool attached = false;
+       bool unlinked = false;
+       __u32 gen;
+       int rc;
+
+       ENTRY;
+
+       if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
+               RETURN(-EOPNOTSUPP);
+
+       rc = pcc_attach_allowed_check(inode);
+       if (rc)
+               RETURN(rc);
+
+       rc = pcc_layout_rdonly_set(inode, &gen);
+       if (rc)
+               RETURN(rc);
+
+       dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
+                                 LU_PCC_READONLY, attach->pcca_id);
+       if (dataset == NULL)
+               RETURN(-ENOENT);
+
+       rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+       if (rc)
+               GOTO(out_dataset_put, rc);
+
+       mutex_lock(&lli->lli_layout_mutex);
+       pcc_inode_lock(inode);
+       old_cred = override_creds(super->pccs_cred);
+       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
+       if (gen != ll_layout_version_get(lli))
+               GOTO(out_put_unlock, rc = -ESTALE);
+
+       pcci = ll_i2pcci(inode);
+       if (!pcci) {
+               OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+               if (pcci == NULL)
+                       GOTO(out_put_unlock, rc = -ENOMEM);
+
+               pcc_inode_attach_set(super, dataset, lli, pcci,
+                                    dentry, LU_PCC_READONLY);
+       } else {
+               atomic_inc(&pcci->pcci_refcount);
+               path_put(&pcci->pcci_path);
+               pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+               pcci->pcci_path.dentry = dentry;
+               pcci->pcci_type = LU_PCC_READONLY;
+       }
+       attached = true;
+       rc = pcc_layout_xattr_set(pcci, gen);
+       if (rc) {
+               pcci->pcci_type = LU_PCC_NONE;
+               unlinked = pcci->pcci_unlinked;
+               GOTO(out_put_unlock, rc);
+       }
+
+       pcc_layout_gen_set(pcci, gen);
+out_put_unlock:
+       if (rc) {
+               if (!unlinked)
+                       (void) pcc_inode_remove(inode, dentry);
+               if (attached)
+                       pcc_inode_put(pcci);
+               else
+                       dput(dentry);
+       }
+       revert_creds(old_cred);
+       pcc_inode_unlock(inode);
+       mutex_unlock(&lli->lli_layout_mutex);
+out_dataset_put:
+       pcc_dataset_put(dataset);
+
+       RETURN(rc);
+}
+
+int pcc_ioctl_attach(struct file *file, struct inode *inode,
+                    struct lu_pcc_attach *attach)
+{
+       int rc = 0;
+
+       ENTRY;
+
+       switch (attach->pcca_type) {
+       case LU_PCC_READWRITE:
+               rc = -EOPNOTSUPP;
+               break;
+       case LU_PCC_READONLY:
+               rc = pcc_readonly_ioctl_attach(file, inode, attach);
+               break;
+       default:
+               rc = -EINVAL;
+               break;
+       }
+
+       RETURN(rc);
+}
+
 static int pcc_hsm_remove(struct inode *inode)
 {
        struct hsm_user_request *hur;
@@ -2593,6 +2920,7 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
+       const struct cred *old_cred;
        bool hsm_remove = false;
        int rc = 0;
 
@@ -2619,13 +2947,25 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 
                __pcc_layout_invalidate(pcci);
                pcc_inode_put(pcci);
+       } else if (pcci->pcci_type == LU_PCC_READONLY) {
+               __pcc_layout_invalidate(pcci);
+
+               if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
+                       old_cred =  override_creds(pcc_super_cred(inode->i_sb));
+                       rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
+                       revert_creds(old_cred);
+                       if (!rc)
+                               pcci->pcci_unlinked = true;
+               }
+
+               pcc_inode_put(pcci);
+       } else {
+               rc = -EOPNOTSUPP;
        }
 
 out_unlock:
        pcc_inode_unlock(inode);
        if (hsm_remove) {
-               const struct cred *old_cred;
-
                old_cred = override_creds(pcc_super_cred(inode->i_sb));
                rc = pcc_hsm_remove(inode);
                revert_creds(old_cred);