Whamcloud - gitweb
LU-10499 pcc: add readonly mode for PCC 05/38305/38
authorQian Yingjin <qian@ddn.com>
Mon, 23 Jul 2018 14:19:25 +0000 (22:19 +0800)
committerOleg Drokin <green@whamcloud.com>
Sat, 23 Mar 2024 05:50:50 +0000 (05:50 +0000)
Readonly Persistent Client Cache (RO-PCC) shares the same framework
with Readwrite Persistent Client Cache, expect that no HSM mechanism
is used in readonly mode of PCC. Instead, RO-PCC adds a new flag
field in the file object's layout named LCM_FL_PCC_RDONLY to
indicate that the file is in PCC read-only state. It is protected
under the layout lock.

After introducing the readonly feature for the layout, the IO path
has some changes. For read, if the file has been valid RO-PCC
cached, the file data can be read from PCC directly; Otherwise, it
will read data using normal I/O path from OSTs. For data modifying
operations (write or truncate), it must clear the readonly flag of
the layout on MDT (which will invaliate the RO-PCC cached state on
clients via layout lock blocking callback), and then it can perform
I/O.

For RO-PCC, as the PCC cached file is actual a replication of
Lustre file, when data read on PCC failed, it can tolerate this
error by falling back to normal read path: read data from OSTs.

Refer to paper (LPCC: hierarchical persistent client caching for
Lustre) for more design details.

Test-Parameters: clientcount=3 testlist=sanity-pcc,sanity-pcc,sanity-pcc
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I6badd72e00a106a0f68950621ce6f82471731a95
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/38305
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
20 files changed:
lustre/include/cl_object.h
lustre/llite/file.c
lustre/llite/llite_mmap.c
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/llite/vvp_io.c
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lov/lov_ea.c
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_object.c
lustre/lov/lov_pack.c
lustre/mdd/mdd_object.c
lustre/tests/sanity-pcc.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_layout.c
lustre/utils/liblustreapi_pcc.c
lustre/utils/obd.c

index 5634944..233455d 100644 (file)
@@ -294,6 +294,8 @@ struct cl_layout {
        bool            cl_is_composite;
        /** Whether layout is a HSM released one */
        bool            cl_is_released;
+       /** Whether layout is a readonly one */
+       bool            cl_is_rdonly;
 };
 
 enum coo_inode_opc {
@@ -1866,6 +1868,11 @@ struct cl_io {
         */
                             ci_need_write_intent:1,
        /**
+        * File is in PCC-RO state, need MDS intervention to complete
+        * a data modifying operation.
+        */
+                            ci_need_pccro_clear:1,
+       /**
         * Check if layout changed after the IO finishes. Mainly for HSM
         * requirement. If IO occurs to openning files, it doesn't need to
         * verify layout because HSM won't release openning files.
index 1f1c940..a577dea 100644 (file)
@@ -4744,6 +4744,29 @@ out_ladvise:
                rc = ll_heat_set(inode, flags);
                RETURN(rc);
        }
+       case LL_IOC_PCC_ATTACH: {
+               struct lu_pcc_attach *attach;
+
+               if (!S_ISREG(inode->i_mode))
+                       RETURN(-EINVAL);
+
+               if (!inode_owner_or_capable(&init_user_ns, inode))
+                       RETURN(-EPERM);
+
+               OBD_ALLOC_PTR(attach);
+               if (attach == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(attach,
+                                  (const struct lu_pcc_attach __user *)arg,
+                                  sizeof(*attach)))
+                       GOTO(out_pcc, rc = -EFAULT);
+
+               rc = pcc_ioctl_attach(file, inode, attach);
+out_pcc:
+               OBD_FREE_PTR(attach);
+               RETURN(rc);
+       }
        case LL_IOC_PCC_DETACH: {
                struct lu_pcc_detach *detach;
 
index cda751d..b73572c 100644 (file)
@@ -97,7 +97,7 @@ struct vm_area_struct *our_vma(struct mm_struct *mm, unsigned long addr,
  */
 static struct cl_io *
 ll_fault_io_init(struct lu_env *env, struct vm_area_struct *vma,
-               pgoff_t index, bool mkwrite)
+                pgoff_t index, bool mkwrite)
 {
        struct file            *file = vma->vm_file;
        struct inode           *inode = file_inode(file);
index 99447f9..6b8ee73 100644 (file)
@@ -511,16 +511,23 @@ pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
                        return -EINVAL;
                /*
                 * By default, a PCC backend can provide caching service for
-                * both RW-PCC and RO-PCC.
+                * both PCC-RW and PCC-RO.
                 */
                if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
                        cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
 
-               /* For RW-PCC, the value of @rwid must be non zero. */
-               if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
-                   cmd->u.pccc_add.pccc_rwid == 0)
+               if (cmd->u.pccc_add.pccc_rwid == 0 &&
+                   cmd->u.pccc_add.pccc_roid == 0)
                        return -EINVAL;
 
+               if (cmd->u.pccc_add.pccc_rwid == 0 &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC)
+                       cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
+
+               if (cmd->u.pccc_add.pccc_roid == 0 &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
+                       cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
+
                break;
        case PCC_DEL_DATASET:
        case PCC_CLEAR_ALL:
@@ -772,6 +779,9 @@ pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
                if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
                    !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
                        continue;
+               if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
+                   !(dataset->pccd_flags & PCC_DATASET_ROPCC)))
+                       continue;
                atomic_inc(&dataset->pccd_refcount);
                selected = dataset;
                break;
@@ -1241,6 +1251,10 @@ static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
            !(dataset->pccd_flags & PCC_DATASET_RWPCC))
                RETURN(0);
 
+       if (type == LU_PCC_READONLY &&
+           !(dataset->pccd_flags & PCC_DATASET_ROPCC))
+               RETURN(0);
+
        rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
                                  &lli->lli_fid);
 
@@ -1419,6 +1433,9 @@ static int pcc_try_auto_attach(struct inode *inode, bool *cached,
        if (clt.cl_is_released)
                rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
                                             LU_PCC_READWRITE, cached);
+       else if (clt.cl_is_rdonly)
+               rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
+                                            LU_PCC_READONLY, cached);
 
        RETURN(rc);
 }
@@ -1429,9 +1446,11 @@ static inline bool pcc_may_auto_attach(struct inode *inode,
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_super *super = ll_i2pccs(inode);
 
+       ENTRY;
+
        /* Known the file was not in any PCC backend. */
        if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
-               return false;
+               RETURN(false);
 
        /*
         * lli_pcc_generation == 0 means that the file was never attached into
@@ -1446,16 +1465,16 @@ static inline bool pcc_may_auto_attach(struct inode *inode,
         * immediately in pcc_try_auto_attach().
         */
        if (super->pccs_generation != lli->lli_pcc_generation)
-               return true;
+               RETURN(true);
 
        /* The cached setting @lli_pcc_dsflags is valid */
        if (iot == PIT_OPEN)
-               return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
+               RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
 
        if (iot == PIT_GETATTR)
-               return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
+               RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
 
-       return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
+       RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
 }
 
 int pcc_file_open(struct inode *inode, struct file *file)
@@ -1544,6 +1563,28 @@ out:
        RETURN_EXIT;
 }
 
+/* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
+static bool pcc_io_tolerate(struct pcc_inode *pcci,
+                           enum pcc_io_type iot, int rc)
+{
+       if (pcci->pcci_type == LU_PCC_READWRITE) {
+               if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
+                       return false;
+               /* Handle the ->page_mkwrite failure tolerance separately
+                * in pcc_page_mkwrite().
+                */
+       } else if (pcci->pcci_type == LU_PCC_READONLY) {
+               if ((iot == PIT_READ || iot == PIT_GETATTR ||
+                    iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM)
+                       return false;
+               if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
+                   !(rc & VM_FAULT_OOM))
+                       return false;
+       }
+
+       return true;
+}
+
 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
 {
        struct pcc_inode *pcci;
@@ -1552,8 +1593,21 @@ static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
        pcci = ll_i2pcci(inode);
        if (pcci && pcc_inode_has_layout(pcci)) {
                LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
-               atomic_inc(&pcci->pcci_active_ios);
-               *cached = true;
+               if (pcci->pcci_type == LU_PCC_READONLY &&
+                   (iot == PIT_WRITE || iot == PIT_SETATTR ||
+                    iot == PIT_PAGE_MKWRITE)) {
+                       /* Fall back to normal I/O path */
+                       *cached = false;
+                       /* For mmap write, we need to detach the file from
+                        * RO-PCC, release the page got from ->fault(), and
+                        * then retry the memory fault handling (->fault()
+                        * and ->page_mkwrite()).
+                        * These are done in pcc_page_mkwrite();
+                        */
+               } else {
+                       atomic_inc(&pcci->pcci_active_ios);
+                       *cached = true;
+               }
        } else {
                *cached = false;
                if (pcc_may_auto_attach(inode, iot)) {
@@ -1568,11 +1622,14 @@ static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
        pcc_inode_unlock(inode);
 }
 
-static void pcc_io_fini(struct inode *inode)
+static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
+                       int rc, bool *cached)
 {
        struct pcc_inode *pcci = ll_i2pcci(inode);
 
-       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
+       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
+
+       *cached = pcc_io_tolerate(pcci, iot, rc);
        if (atomic_dec_and_test(&pcci->pcci_active_ios))
                wake_up(&pcci->pcci_waitq);
 }
@@ -1633,6 +1690,10 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
        if (!*cached)
                RETURN(0);
 
+       /* Fake I/O error on RO-PCC */
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, result = -EIO);
+
        iocb->ki_filp = pccf->pccf_file;
        /* generic_file_aio_read does not support ext4-dax,
         * __pcc_file_read_iter uses ->aio_read hook directly
@@ -1640,8 +1701,8 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
         */
        result = __pcc_file_read_iter(iocb, iter);
        iocb->ki_filp = file;
-
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_READ, result, cached);
        RETURN(result);
 }
 
@@ -1717,7 +1778,7 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
        result = __pcc_file_write_iter(iocb, iter);
        iocb->ki_filp = file;
 out:
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_WRITE, result, cached);
        RETURN(result);
 }
 
@@ -1757,7 +1818,7 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
        revert_creds(old_cred);
        inode_unlock(pcc_dentry->d_inode);
 
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_SETATTR, rc, cached);
        RETURN(rc);
 }
 
@@ -1820,7 +1881,7 @@ int pcc_inode_getattr(struct inode *inode, u32 request_mask,
 
        ll_inode_size_unlock(inode);
 out:
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_GETATTR, rc, cached);
        RETURN(rc);
 }
 
@@ -1848,7 +1909,7 @@ ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
 
        result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
 
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
        RETURN(result);
 }
 #endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
@@ -1858,7 +1919,8 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
 {
        struct inode *inode = file_inode(file);
        struct ll_file_data *fd = file->private_data;
-       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct file *pcc_file = pccf->pccf_file;
        int rc;
 
        ENTRY;
@@ -1868,6 +1930,22 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
                RETURN(0);
        }
 
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       /*
+        * After the file is attached into RO-PCC, its dirty pages on this
+        * client may not be flushed. So fsync() should fall back to normal
+        * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
+        * copy is meaningless.
+        */
+       if (pccf->pccf_type == LU_PCC_READONLY) {
+               *cached = false;
+               RETURN(-EAGAIN);
+       }
+
        pcc_io_init(inode, PIT_FSYNC, cached);
        if (!*cached)
                RETURN(0);
@@ -1875,7 +1953,7 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
        rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
                                                start, end, datasync);
 
-       pcc_io_fini(inode);
+       pcc_io_fini(inode, PIT_FSYNC, rc, cached);
        RETURN(rc);
 }
 
@@ -2011,6 +2089,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                 * __do_page_fault and retry the memory fault handling.
                 */
                if (page->mapping == pcc_file->f_mapping) {
+                       pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
                        *cached = true;
                        mmap_read_unlock(mm);
                        RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
@@ -2023,12 +2102,8 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
         * This fault injection can also be used to simulate -ENOSPC and
         * -EDQUOT failure of underlying PCC backend fs.
         */
-       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
-               pcc_io_fini(inode);
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
-               mmap_read_unlock(mm);
-               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
-       }
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
+               GOTO(out, rc = VM_FAULT_SIGBUS);
 
        vma->vm_file = pcc_file;
 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
@@ -2038,7 +2113,18 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
 #endif
        vma->vm_file = file;
 
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
+
+       /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
+        * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
+        * Lustre I/O path.
+        */
+       if (rc & VM_FAULT_SIGBUS) {
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+               mmap_read_unlock(mm);
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
        RETURN(rc);
 }
 
@@ -2059,10 +2145,19 @@ int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
                RETURN(0);
        }
 
+       if (!S_ISREG(inode->i_mode)) {
+               *cached = false;
+               RETURN(0);
+       }
+
        pcc_io_init(inode, PIT_FAULT, cached);
        if (!*cached)
                RETURN(0);
 
+       /* Tolerate the mmap read failure for RO-PCC */
+       if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, rc = VM_FAULT_SIGBUS);
+
        vma->vm_file = pcc_file;
 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
        rc = pcc_vm_ops->fault(vmf);
@@ -2070,8 +2165,8 @@ int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
        rc = pcc_vm_ops->fault(vma, vmf);
 #endif
        vma->vm_file = file;
-
-       pcc_io_fini(inode);
+out:
+       pcc_io_fini(inode, PIT_FAULT, rc, cached);
        RETURN(rc);
 }
 
@@ -2317,7 +2412,8 @@ int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
 
        rc = pcc_layout_xattr_set(pcci, 0);
        if (rc) {
-               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               if (!pcci->pcci_unlinked)
+                       (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
                GOTO(out_unlock, rc);
        }
@@ -2444,15 +2540,11 @@ out_unlock:
        RETURN(rc);
 }
 
-int pcc_readwrite_attach(struct file *file, struct inode *inode,
-                        __u32 archive_id)
+static int pcc_attach_data_archive(struct file *file, struct inode *inode,
+                                  struct pcc_dataset *dataset,
+                                  struct dentry **dentry)
 {
-       struct pcc_dataset *dataset;
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_super *super = ll_i2pccs(inode);
-       struct pcc_inode *pcci;
        const struct cred *old_cred;
-       struct dentry *dentry;
        struct file *pcc_filp;
        struct path path;
        ssize_t ret;
@@ -2460,29 +2552,20 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 
        ENTRY;
 
-       rc = pcc_attach_allowed_check(inode);
-       if (rc)
-               RETURN(rc);
-
-       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
-                                 LU_PCC_READWRITE, archive_id);
-       if (dataset == NULL)
-               RETURN(-ENOENT);
-
-       old_cred = override_creds(super->pccs_cred);
-       rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
        if (rc)
-               GOTO(out_dataset_put, rc);
+               GOTO(out_cred, rc);
 
        path.mnt = dataset->pccd_path.mnt;
-       path.dentry = dentry;
+       path.dentry = *dentry;
        pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
        if (IS_ERR_OR_NULL(pcc_filp)) {
                rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
                GOTO(out_dentry, rc);
        }
 
-       rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
+       rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
                                   old_cred->uid, old_cred->gid, 0);
        if (rc)
                GOTO(out_fput, rc);
@@ -2496,10 +2579,44 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
         * copy after copy data. Otherwise, it may get wrong file size after
         * re-attach a file. See LU-13023 for details.
         */
-       rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
+       rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
                                   KGIDT_INIT(0), ret);
+out_fput:
+       fput(pcc_filp);
+out_dentry:
+       if (rc) {
+               pcc_inode_remove(inode, *dentry);
+               dput(*dentry);
+       }
+out_cred:
+       revert_creds(old_cred);
+       RETURN(rc);
+}
+
+int pcc_readwrite_attach(struct file *file, struct inode *inode,
+                        __u32 archive_id)
+{
+       struct pcc_dataset *dataset;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_super *super = ll_i2pccs(inode);
+       struct pcc_inode *pcci;
+       struct dentry *dentry;
+       int rc;
+
+       ENTRY;
+
+       rc = pcc_attach_allowed_check(inode);
        if (rc)
-               GOTO(out_fput, rc);
+               RETURN(rc);
+
+       dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
+                                 LU_PCC_READWRITE, archive_id);
+       if (dataset == NULL)
+               RETURN(-ENOENT);
+
+       rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+       if (rc)
+               GOTO(out_dataset_put, rc);
 
        /* Pause to allow for a race with concurrent HSM remove */
        CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
@@ -2515,16 +2632,16 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
                             dentry, LU_PCC_READWRITE);
 out_unlock:
        pcc_inode_unlock(inode);
-out_fput:
-       fput(pcc_filp);
-out_dentry:
        if (rc) {
+               const struct cred *old_cred;
+
+               old_cred = override_creds(pcc_super_cred(inode->i_sb));
                (void) pcc_inode_remove(inode, dentry);
+               revert_creds(old_cred);
                dput(dentry);
        }
 out_dataset_put:
        pcc_dataset_put(dataset);
-       revert_creds(old_cred);
 
        RETURN(rc);
 }
@@ -2574,7 +2691,8 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
 
 out_put:
        if (rc) {
-               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               if (!pcci->pcci_unlinked)
+                       (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
        }
 out_unlock:
@@ -2584,6 +2702,178 @@ out_unlock:
        RETURN(rc);
 }
 
+static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
+
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct lu_extent ext = {
+               .e_start = 0,
+               .e_end = OBD_OBJECT_EOF,
+       };
+       struct cl_layout clt = {
+               .cl_layout_gen = 0,
+               .cl_is_released = false,
+               .cl_is_rdonly = false,
+       };
+       int retries = 0;
+       int rc;
+
+       ENTRY;
+
+repeat:
+       rc = pcc_get_layout_info(inode, &clt);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * For the HSM released file, restore the data first.
+        */
+       if (clt.cl_is_released) {
+               retries++;
+               if (retries > 2)
+                       RETURN(-EBUSY);
+
+               if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
+                       rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
+                       if (rc) {
+                               CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
+                                      PFID(&lli->lli_fid), rc);
+                               RETURN(rc);
+                       }
+               }
+               rc = ll_layout_refresh(inode, gen);
+               if (rc)
+                       RETURN(rc);
+
+               goto repeat;
+       }
+
+
+       if (!clt.cl_is_rdonly) {
+               rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
+                                           &ext);
+               if (rc)
+                       RETURN(rc);
+
+               rc = ll_layout_refresh(inode, gen);
+               if (rc)
+                       RETURN(rc);
+       } else { /* Readonly layout */
+               *gen = clt.cl_layout_gen;
+       }
+
+       RETURN(rc);
+}
+
+static int pcc_readonly_ioctl_attach(struct file *file,
+                                    struct inode *inode,
+                                    struct lu_pcc_attach *attach)
+{
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct pcc_super *super = ll_i2pccs(inode);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       const struct cred *old_cred;
+       struct pcc_dataset *dataset;
+       struct pcc_inode *pcci;
+       struct dentry *dentry;
+       bool attached = false;
+       bool unlinked = false;
+       __u32 gen;
+       int rc;
+
+       ENTRY;
+
+       if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
+               RETURN(-EOPNOTSUPP);
+
+       rc = pcc_attach_allowed_check(inode);
+       if (rc)
+               RETURN(rc);
+
+       rc = pcc_layout_rdonly_set(inode, &gen);
+       if (rc)
+               RETURN(rc);
+
+       dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
+                                 LU_PCC_READONLY, attach->pcca_id);
+       if (dataset == NULL)
+               RETURN(-ENOENT);
+
+       rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+       if (rc)
+               GOTO(out_dataset_put, rc);
+
+       mutex_lock(&lli->lli_layout_mutex);
+       pcc_inode_lock(inode);
+       old_cred = override_creds(super->pccs_cred);
+       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
+       if (gen != ll_layout_version_get(lli))
+               GOTO(out_put_unlock, rc = -ESTALE);
+
+       pcci = ll_i2pcci(inode);
+       if (!pcci) {
+               OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+               if (pcci == NULL)
+                       GOTO(out_put_unlock, rc = -ENOMEM);
+
+               pcc_inode_attach_set(super, dataset, lli, pcci,
+                                    dentry, LU_PCC_READONLY);
+       } else {
+               atomic_inc(&pcci->pcci_refcount);
+               path_put(&pcci->pcci_path);
+               pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+               pcci->pcci_path.dentry = dentry;
+               pcci->pcci_type = LU_PCC_READONLY;
+       }
+       attached = true;
+       rc = pcc_layout_xattr_set(pcci, gen);
+       if (rc) {
+               pcci->pcci_type = LU_PCC_NONE;
+               unlinked = pcci->pcci_unlinked;
+               GOTO(out_put_unlock, rc);
+       }
+
+       pcc_layout_gen_set(pcci, gen);
+out_put_unlock:
+       if (rc) {
+               if (!unlinked)
+                       (void) pcc_inode_remove(inode, dentry);
+               if (attached)
+                       pcc_inode_put(pcci);
+               else
+                       dput(dentry);
+       }
+       revert_creds(old_cred);
+       pcc_inode_unlock(inode);
+       mutex_unlock(&lli->lli_layout_mutex);
+out_dataset_put:
+       pcc_dataset_put(dataset);
+
+       RETURN(rc);
+}
+
+int pcc_ioctl_attach(struct file *file, struct inode *inode,
+                    struct lu_pcc_attach *attach)
+{
+       int rc = 0;
+
+       ENTRY;
+
+       switch (attach->pcca_type) {
+       case LU_PCC_READWRITE:
+               rc = -EOPNOTSUPP;
+               break;
+       case LU_PCC_READONLY:
+               rc = pcc_readonly_ioctl_attach(file, inode, attach);
+               break;
+       default:
+               rc = -EINVAL;
+               break;
+       }
+
+       RETURN(rc);
+}
+
 static int pcc_hsm_remove(struct inode *inode)
 {
        struct hsm_user_request *hur;
@@ -2630,6 +2920,7 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
+       const struct cred *old_cred;
        bool hsm_remove = false;
        int rc = 0;
 
@@ -2656,13 +2947,25 @@ int pcc_ioctl_detach(struct inode *inode, __u32 opt)
 
                __pcc_layout_invalidate(pcci);
                pcc_inode_put(pcci);
+       } else if (pcci->pcci_type == LU_PCC_READONLY) {
+               __pcc_layout_invalidate(pcci);
+
+               if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
+                       old_cred =  override_creds(pcc_super_cred(inode->i_sb));
+                       rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
+                       revert_creds(old_cred);
+                       if (!rc)
+                               pcci->pcci_unlinked = true;
+               }
+
+               pcc_inode_put(pcci);
+       } else {
+               rc = -EOPNOTSUPP;
        }
 
 out_unlock:
        pcc_inode_unlock(inode);
        if (hsm_remove) {
-               const struct cred *old_cred;
-
                old_cred = override_creds(pcc_super_cred(inode->i_sb));
                rc = pcc_hsm_remove(inode);
                revert_creds(old_cred);
index 067daef..b1c1f7e 100644 (file)
@@ -147,9 +147,11 @@ struct pcc_inode {
         */
        atomic_t                 pcci_refcount;
        /* Whether readonly or readwrite PCC */
-       enum lu_pcc_type         pcci_type;
+       enum lu_pcc_type         pcci_type:8;
        /* Whether the inode attr is cached locally */
        bool                     pcci_attr_valid;
+       /* Whether the PCC inode is unlinked at detach */
+       bool                     pcci_unlinked;
        /* Layout generation */
        __u32                    pcci_layout_gen;
        /*
@@ -183,10 +185,8 @@ enum pcc_io_type {
        PIT_FAULT,
        /* fsync system call handling */
        PIT_FSYNC,
-#ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
        /* splice_read system call */
        PIT_SPLICE_READ,
-#endif
        /* open system call */
        PIT_OPEN
 };
@@ -229,6 +229,8 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
                              __u32 gen, bool lease_broken, int rc,
                              bool attached);
+int pcc_ioctl_attach(struct file *file, struct inode *inode,
+                    struct lu_pcc_attach *attach);
 int pcc_ioctl_detach(struct inode *inode, __u32 opt);
 int pcc_ioctl_state(struct file *file, struct inode *inode,
                    struct lu_pcc_state *state);
index abaec68..62e9903 100644 (file)
@@ -357,7 +357,7 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        }
 
        /* dynamic layout change needed, send layout intent RPC. */
-       if (io->ci_need_write_intent) {
+       if (io->ci_need_write_intent || io->ci_need_pccro_clear) {
                enum layout_intent_opc opc = LAYOUT_INTENT_WRITE;
 
                io->ci_need_write_intent = 0;
@@ -372,6 +372,11 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                if (cl_io_is_trunc(io))
                        opc = LAYOUT_INTENT_TRUNC;
 
+               if (io->ci_need_pccro_clear) {
+                       io->ci_need_pccro_clear = 0;
+                       opc = LAYOUT_INTENT_PCCRO_CLEAR;
+               }
+
                rc = ll_layout_write_intent(inode, opc, &io->ci_write_intent);
                io->ci_result = rc;
                if (!rc)
index 9f59dd3..de080bf 100644 (file)
@@ -1319,6 +1319,7 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
                comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
                if (comp_cnt == 0)
                        GOTO(out, rc = -EINVAL);
+
                lo->ldo_layout_gen = le32_to_cpu(comp_v1->lcm_layout_gen);
                lo->ldo_is_composite = 1;
                mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
index 5615bbb..5cb2dd2 100644 (file)
@@ -7768,6 +7768,7 @@ restart:
                for (i = 0; i < lo->ldo_mirror_count; i++) {
                        if (i == primary)
                                continue;
+
                        rc = lod_declare_update_extents(env, lo, &pri_extent,
                                                        th, i, 0);
                        /* if update_extents changed the layout, it may have
index 17cf85c..c9dfe73 100644 (file)
@@ -460,7 +460,9 @@ lsme_unpack_foreign(struct lov_obd *lov, void *buf, size_t buf_size,
 {
        struct lov_stripe_md_entry *lsme;
        struct lov_foreign_md *lfm = buf;
+       size_t length;
        __u32 magic;
+       __u32 type;
 
        ENTRY;
 
@@ -468,6 +470,27 @@ lsme_unpack_foreign(struct lov_obd *lov, void *buf, size_t buf_size,
        if (magic != LOV_MAGIC_FOREIGN)
                RETURN(ERR_PTR(-EINVAL));
 
+       type = le32_to_cpu(lfm->lfm_type);
+       if (!lov_foreign_type_supported(type)) {
+               CDEBUG(D_LAYOUT, "Unsupported foreign type: %u\n", type);
+               RETURN(ERR_PTR(-EINVAL));
+       }
+
+       length = le32_to_cpu(lfm->lfm_length);
+       if (lov_foreign_size_le(lfm) > buf_size) {
+               CDEBUG(D_LAYOUT, "LOV EA HSM too small: %zu, need %zu\n",
+                      buf_size, lov_foreign_size_le(lfm));
+               RETURN(ERR_PTR(-EINVAL));
+       }
+
+       if (lov_hsm_type_supported(type) &&
+           length < sizeof(struct lov_hsm_base)) {
+               CDEBUG(D_LAYOUT,
+                      "Invalid LOV HSM len: %zu, should be larger than %zu\n",
+                      length, sizeof(struct lov_hsm_base));
+               RETURN(ERR_PTR(-EINVAL));
+       }
+
        OBD_ALLOC_LARGE(lsme, sizeof(*lsme));
        if (!lsme)
                RETURN(ERR_PTR(-ENOMEM));
@@ -475,6 +498,13 @@ lsme_unpack_foreign(struct lov_obd *lov, void *buf, size_t buf_size,
        lsme->lsme_magic = magic;
        lsme->lsme_pattern = LOV_PATTERN_FOREIGN;
        lsme->lsme_flags = 0;
+       lsme->lsme_length = length;
+       lsme->lsme_type = type;
+       lsme->lsme_foreign_flags = le32_to_cpu(lfm->lfm_flags);
+
+       /* TODO: Initialize for other kind of foreign layout such as DAOS. */
+       if (lov_hsm_type_supported(type))
+               lov_foreign_hsm_to_cpu(&lsme->lsme_hsm, lfm);
 
        if (maxbytes)
                *maxbytes = MAX_LFS_FILESIZE;
@@ -505,7 +535,10 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
            !(lov_pattern(le32_to_cpu(lmm->lmm_pattern)) & LOV_PATTERN_MDT))
                RETURN(ERR_PTR(-EINVAL));
 
-       if (magic == LOV_MAGIC_V1) {
+       if (magic == LOV_MAGIC_FOREIGN) {
+               return lsme_unpack_foreign(lov, lmm, lmm_buf_size,
+                                          inited, maxbytes);
+       } else if (magic == LOV_MAGIC_V1) {
                return lsme_unpack(lov, lmm, lmm_buf_size, NULL,
                                   inited, lmm->lmm_objects, maxbytes);
        } else if (magic == LOV_MAGIC_V3) {
@@ -548,6 +581,7 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
        lsm->lsm_entry_count = entry_count;
        lsm->lsm_mirror_count = le16_to_cpu(lcm->lcm_mirror_count);
        lsm->lsm_flags = le16_to_cpu(lcm->lcm_flags);
+       lsm->lsm_is_rdonly = lsm->lsm_flags & LCM_FL_PCC_RDONLY;
        lsm->lsm_is_released = true;
        lsm->lsm_maxbytes = LLONG_MIN;
 
@@ -591,7 +625,8 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size)
                 * pressume that unrecognized magic component also has valid
                 * lsme_id/lsme_flags/lsme_extent
                 */
-               if (!(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
+               if (!(lsme->lsme_magic == LOV_MAGIC_FOREIGN) &&
+                   !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
                        lsm->lsm_is_released = false;
 
                lsm->lsm_entries[i] = lsme;
@@ -711,26 +746,36 @@ void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
        for (i = 0; i < lsm->lsm_entry_count; i++) {
                struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
 
-               CDEBUG(level, DEXT ": id: %u, flags: %x, "
-                      "magic 0x%08X, layout_gen %u, "
-                      "stripe count %u, sstripe size %u, "
-                      "pool: ["LOV_POOLNAMEF"]\n",
-                      PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_flags,
-                      lse->lsme_magic, lse->lsme_layout_gen,
-                      lse->lsme_stripe_count, lse->lsme_stripe_size,
-                      lse->lsme_pool_name);
-               if (!lsme_inited(lse) ||
-                   lse->lsme_pattern & LOV_PATTERN_F_RELEASED ||
-                   !lov_supported_comp_magic(lse->lsme_magic) ||
-                   !lov_pattern_supported(lov_pattern(lse->lsme_pattern)))
-                       continue;
-               for (j = 0; j < lse->lsme_stripe_count; j++) {
-                       CDEBUG(level, "   oinfo:%p: ostid: "DOSTID
-                              " ost idx: %d gen: %d\n",
-                              lse->lsme_oinfo[j],
-                              POSTID(&lse->lsme_oinfo[j]->loi_oi),
-                              lse->lsme_oinfo[j]->loi_ost_idx,
-                              lse->lsme_oinfo[j]->loi_ost_gen);
+               if (lsme_is_foreign(lse)) {
+                       CDEBUG_LIMIT(level,
+                                  "HSM layout "DEXT ": id %u, flags: %08x, magic 0x%08X, length %u, type %x, flags %08x, archive_id %llu, archive_ver %llu, archive_uuid '%.*s'\n",
+                                  PEXT(&lse->lsme_extent), lse->lsme_id,
+                                  lse->lsme_flags, lse->lsme_magic,
+                                  lse->lsme_length, lse->lsme_type,
+                                  lse->lsme_foreign_flags,
+                                  lse->lsme_archive_id, lse->lsme_archive_ver,
+                                  (int)sizeof(lse->lsme_uuid), lse->lsme_uuid);
+               } else {
+                       CDEBUG_LIMIT(level,
+                                  DEXT ": id: %u, flags: %x, magic 0x%08X, layout_gen %u, stripe count %u, sstripe size %u, pool: ["LOV_POOLNAMEF"]\n",
+                                  PEXT(&lse->lsme_extent), lse->lsme_id,
+                                  lse->lsme_flags, lse->lsme_magic,
+                                  lse->lsme_layout_gen, lse->lsme_stripe_count,
+                                  lse->lsme_stripe_size, lse->lsme_pool_name);
+                       if (!lsme_inited(lse) ||
+                           lse->lsme_pattern & LOV_PATTERN_F_RELEASED ||
+                           !lov_supported_comp_magic(lse->lsme_magic) ||
+                           !lov_pattern_supported(
+                                       lov_pattern(lse->lsme_pattern)))
+                               continue;
+                       for (j = 0; j < lse->lsme_stripe_count; j++) {
+                               CDEBUG_LIMIT(level,
+                                          "   oinfo:%p: ostid: "DOSTID" ost idx: %d gen: %d\n",
+                                          lse->lsme_oinfo[j],
+                                          POSTID(&lse->lsme_oinfo[j]->loi_oi),
+                                          lse->lsme_oinfo[j]->loi_ost_idx,
+                                          lse->lsme_oinfo[j]->loi_ost_gen);
+                       }
                }
        }
 }
index e2546ec..b56afbb 100644 (file)
@@ -47,13 +47,33 @@ struct lov_stripe_md_entry {
        u32                     lsme_flags;
        u32                     lsme_pattern;
        u64                     lsme_timestamp;
-       u32                     lsme_stripe_size;
-       u16                     lsme_stripe_count;
-       u16                     lsme_layout_gen;
-       char                    lsme_pool_name[LOV_MAXPOOLNAME + 1];
-       struct lov_oinfo       *lsme_oinfo[];
+       union {
+               struct { /* For stripe objects */
+                       u32     lsme_stripe_size;
+                       u16     lsme_stripe_count;
+                       u16     lsme_layout_gen;
+                       char    lsme_pool_name[LOV_MAXPOOLNAME + 1];
+                       struct lov_oinfo        *lsme_oinfo[];
+               };
+               struct { /* For foreign layout (i.e. HSM, DAOS) */
+                       u32     lsme_length;
+                       u32     lsme_type;
+                       u32     lsme_foreign_flags;
+                       u32     lsme_padding;
+                       union {
+                               /* inline HSM layout data */
+                               struct lov_hsm_base      lsme_hsm;
+                               /* Other kind of foreign layout (i.e. DAOS) */
+                               char                    *lsme_value;
+                       };
+               };
+       };
 };
 
+#define lsme_archive_id                lsme_hsm.lhb_archive_id
+#define lsme_archive_ver       lsme_hsm.lhb_archive_ver
+#define lsme_uuid              lsme_hsm.lhb_uuid
+
 static inline bool lsme_is_dom(struct lov_stripe_md_entry *lsme)
 {
        return (lov_pattern(lsme->lsme_pattern) & LOV_PATTERN_MDT);
@@ -87,6 +107,7 @@ struct lov_stripe_md {
        u32             lsm_layout_gen;
        u16             lsm_flags;
        bool            lsm_is_released;
+       bool            lsm_is_rdonly;
        u16             lsm_mirror_count;
        u16             lsm_entry_count;
        struct lov_stripe_md_entry *lsm_entries[];
@@ -120,6 +141,11 @@ static inline bool lsm_is_composite(__u32 magic)
        return magic == LOV_MAGIC_COMP_V1;
 }
 
+static inline bool lsm_is_rdonly(const struct lov_stripe_md *lsm)
+{
+       return lsm->lsm_is_rdonly;
+}
+
 static inline size_t lov_comp_md_size(const struct lov_stripe_md *lsm)
 {
        struct lov_stripe_md_entry *lsme;
@@ -142,13 +168,16 @@ static inline size_t lov_comp_md_size(const struct lov_stripe_md *lsm)
 
                lsme = lsm->lsm_entries[entry];
 
-               if (lsme_inited(lsme))
-                       stripe_count = lsme->lsme_stripe_count;
-               else
-                       stripe_count = 0;
+               if (lsme->lsme_magic == LOV_MAGIC_FOREIGN) {
+                       size += lov_foreign_md_size(lsme->lsme_length);
+               } else {
+                       if (lsme_inited(lsme))
+                               stripe_count = lsme->lsme_stripe_count;
+                       else
+                               stripe_count = 0;
 
-               size += lov_mds_md_size(stripe_count,
-                                       lsme->lsme_magic);
+                       size += lov_mds_md_size(stripe_count, lsme->lsme_magic);
+               }
        }
 
        return size;
index 3b5d58e..5cca7f3 100644 (file)
@@ -502,15 +502,21 @@ static int lov_io_slice_init(struct lov_io *lio,
 {
        int index;
        int result = 0;
+       bool rdonly;
        ENTRY;
 
        io->ci_result = 0;
        lio->lis_object = obj;
        lio->lis_cached_entry = LIS_CACHE_ENTRY_NONE;
 
+       rdonly = lsm_is_rdonly(obj->lo_lsm);
        switch (io->ci_type) {
        case CIT_READ:
        case CIT_WRITE:
+               if (io->ci_type == CIT_WRITE && rdonly) {
+                       io->ci_need_pccro_clear = 1;
+                       GOTO(out, result = 1);
+               }
                lio->lis_pos = io->u.ci_rw.crw_pos;
                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_bytes;
                lio->lis_io_endpos = lio->lis_endpos;
@@ -532,9 +538,17 @@ static int lov_io_slice_init(struct lov_io *lio,
 
        case CIT_SETATTR:
                if (cl_io_is_fallocate(io)) {
+                       if (rdonly) {
+                               io->ci_need_pccro_clear = 1;
+                               GOTO(out, result = 1);
+                       }
                        lio->lis_pos = io->u.ci_setattr.sa_falloc_offset;
                        lio->lis_endpos = io->u.ci_setattr.sa_falloc_end;
                } else if (cl_io_is_trunc(io)) {
+                       if (rdonly) {
+                               io->ci_need_pccro_clear = 1;
+                               GOTO(out, result = 1);
+                       }
                        lio->lis_pos = io->u.ci_setattr.sa_attr.lvb_size;
                        lio->lis_endpos = OBD_OBJECT_EOF;
                } else {
@@ -551,6 +565,11 @@ static int lov_io_slice_init(struct lov_io *lio,
        case CIT_FAULT: {
                pgoff_t index = io->u.ci_fault.ft_index;
 
+               if (cl_io_is_mkwrite(io) && rdonly) {
+                       io->ci_need_pccro_clear = 1;
+                       GOTO(out, result = -ENODATA);
+               }
+
                lio->lis_pos = index << PAGE_SHIFT;
                lio->lis_endpos = (index + 1) << PAGE_SHIFT;
                break;
index a265e6a..dabc8e9 100644 (file)
@@ -1059,6 +1059,9 @@ static int lov_attr_get_composite(const struct lu_env *env,
                if (!lsm_entry_inited(lov->lo_lsm, index))
                        continue;
 
+               if (lsm_entry_is_foreign(lov->lo_lsm, index))
+                       continue;
+
                result = entry->lle_comp_ops->lco_getattr(env, lov, index,
                                                          entry, &lov_attr);
                if (result < 0)
@@ -2277,6 +2280,7 @@ static int lov_object_layout_get(const struct lu_env *env,
 
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
+       cl->cl_is_rdonly = lsm->lsm_is_rdonly;
        cl->cl_is_released = lsm->lsm_is_released;
        cl->cl_is_composite = lsm_is_composite(lsm->lsm_magic);
 
@@ -2404,6 +2408,9 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                                    !lov_supported_comp_magic(lse->lsme_magic))
                                        break;
 
+                               if (lsme_is_foreign(lse))
+                                       break;
+
                                for (j = 0; j < lse->lsme_stripe_count; j++) {
                                        struct lov_oinfo *loi =
                                                        lse->lsme_oinfo[j];
index 42f1446..8399f3e 100644 (file)
@@ -182,17 +182,71 @@ static ssize_t lov_lsm_pack_foreign(const struct lov_stripe_md *lsm, void *buf,
        RETURN(lfm_size);
 }
 
+unsigned int lov_lsme_pack_foreign(struct lov_stripe_md_entry *lsme, void *lmm)
+{
+       struct lov_foreign_md *lfm = (struct lov_foreign_md *)lmm;
+
+       lfm->lfm_magic = cpu_to_le32(lsme->lsme_magic);
+       lfm->lfm_length = cpu_to_le32(lsme->lsme_length);
+       lfm->lfm_type = cpu_to_le32(lsme->lsme_type);
+       lfm->lfm_flags = cpu_to_le32(lsme->lsme_foreign_flags);
+
+       /* TODO: support for foreign layout other than HSM, i.e. DAOS. */
+       if (lov_hsm_type_supported(lsme->lsme_type))
+               lov_foreign_hsm_to_le(lfm, &lsme->lsme_hsm);
+
+       return lov_foreign_md_size(lsme->lsme_length);
+}
+
+unsigned int lov_lsme_pack_v1v3(struct lov_stripe_md_entry *lsme,
+                               struct lov_mds_md *lmm)
+{
+       struct lov_ost_data_v1 *lmm_objects;
+       __u16 stripe_count;
+       unsigned int i;
+
+       lmm->lmm_magic = cpu_to_le32(lsme->lsme_magic);
+       /* lmm->lmm_oi not set */
+       lmm->lmm_pattern = cpu_to_le32(lsme->lsme_pattern);
+       lmm->lmm_stripe_size = cpu_to_le32(lsme->lsme_stripe_size);
+       lmm->lmm_stripe_count = cpu_to_le16(lsme->lsme_stripe_count);
+       lmm->lmm_layout_gen = cpu_to_le16(lsme->lsme_layout_gen);
+
+       if (lsme->lsme_magic == LOV_MAGIC_V3) {
+               struct lov_mds_md_v3 *lmmv3 = (struct lov_mds_md_v3 *)lmm;
+
+               strlcpy(lmmv3->lmm_pool_name, lsme->lsme_pool_name,
+                       sizeof(lmmv3->lmm_pool_name));
+               lmm_objects = lmmv3->lmm_objects;
+       } else {
+               lmm_objects = ((struct lov_mds_md_v1 *)lmm)->lmm_objects;
+       }
+
+       if (lsme_inited(lsme) && !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
+               stripe_count = lsme->lsme_stripe_count;
+       else
+               stripe_count = 0;
+
+       for (i = 0; i < stripe_count; i++) {
+               struct lov_oinfo *loi = lsme->lsme_oinfo[i];
+
+               ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
+               lmm_objects[i].l_ost_gen = cpu_to_le32(loi->loi_ost_gen);
+               lmm_objects[i].l_ost_idx = cpu_to_le32(loi->loi_ost_idx);
+       }
+
+       return lov_mds_md_size(stripe_count, lsme->lsme_magic);
+}
+
 ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
                     size_t buf_size)
 {
        struct lov_comp_md_v1 *lcmv1 = buf;
        struct lov_comp_md_entry_v1 *lcme;
-       struct lov_ost_data_v1 *lmm_objects;
        size_t lmm_size;
        unsigned int entry;
        unsigned int offset;
        unsigned int size;
-       unsigned int i;
 
        ENTRY;
 
@@ -221,7 +275,6 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
        for (entry = 0; entry < lsm->lsm_entry_count; entry++) {
                struct lov_stripe_md_entry *lsme;
                struct lov_mds_md *lmm;
-               __u16 stripe_count;
 
                lsme = lsm->lsm_entries[entry];
                lcme = &lcmv1->lcm_entries[entry];
@@ -238,42 +291,10 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf,
                lcme->lcme_offset = cpu_to_le32(offset);
 
                lmm = (struct lov_mds_md *)((char *)lcmv1 + offset);
-               lmm->lmm_magic = cpu_to_le32(lsme->lsme_magic);
-               /* lmm->lmm_oi not set */
-               lmm->lmm_pattern = cpu_to_le32(lsme->lsme_pattern);
-               lmm->lmm_stripe_size = cpu_to_le32(lsme->lsme_stripe_size);
-               lmm->lmm_stripe_count = cpu_to_le16(lsme->lsme_stripe_count);
-               lmm->lmm_layout_gen = cpu_to_le16(lsme->lsme_layout_gen);
-
-               if (lsme->lsme_magic == LOV_MAGIC_V3) {
-                       struct lov_mds_md_v3 *lmmv3 =
-                                               (struct lov_mds_md_v3 *)lmm;
-
-                       strlcpy(lmmv3->lmm_pool_name, lsme->lsme_pool_name,
-                               sizeof(lmmv3->lmm_pool_name));
-                       lmm_objects = lmmv3->lmm_objects;
-               } else {
-                       lmm_objects =
-                               ((struct lov_mds_md_v1 *)lmm)->lmm_objects;
-               }
-
-               if (lsme_inited(lsme) &&
-                   !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
-                       stripe_count = lsme->lsme_stripe_count;
+               if (lsme->lsme_magic == LOV_MAGIC_FOREIGN)
+                       size = lov_lsme_pack_foreign(lsme, lmm);
                else
-                       stripe_count = 0;
-
-               for (i = 0; i < stripe_count; i++) {
-                       struct lov_oinfo *loi = lsme->lsme_oinfo[i];
-
-                       ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
-                       lmm_objects[i].l_ost_gen =
-                                       cpu_to_le32(loi->loi_ost_gen);
-                       lmm_objects[i].l_ost_idx =
-                                       cpu_to_le32(loi->loi_ost_idx);
-               }
-
-               size = lov_mds_md_size(stripe_count, lsme->lsme_magic);
+                       size = lov_lsme_pack_v1v3(lsme, lmm);
                lcme->lcme_size = cpu_to_le32(size);
                offset += size;
        } /* for each layout component */
index 3222e49..58783f9 100644 (file)
@@ -3442,6 +3442,7 @@ out:
 
        RETURN(rc);
 }
+
 /**
  * Layout change callback for object.
  *
index 24a875a..a8448a0 100755 (executable)
@@ -187,6 +187,17 @@ setup_pcc_mapping() {
        do_facet $facet $LCTL pcc add $MOUNT $hsm_root -p $param
 }
 
+umount_loopdev() {
+       local facet=$1
+       local mntpt=$2
+       local rc
+
+       do_facet $facet lsof $mntpt || true
+       do_facet $facet $UMOUNT $mntpt
+       rc=$?
+       return $rc
+}
+
 setup_loopdev() {
        local facet=$1
        local file=$2
@@ -202,7 +213,7 @@ setup_loopdev() {
        do_facet $facet file $file
        do_facet $facet mount -t ext4 -o loop,usrquota,grpquota $file $mntpt ||
                error "mount -o loop,usrquota,grpquota $file $mntpt failed"
-       stack_trap "do_facet $facet $UMOUNT $mntpt" EXIT
+       stack_trap "umount_loopdev $facet $mntpt" EXIT
 }
 
 lpcc_rw_test() {
@@ -373,8 +384,8 @@ test_1e() {
 
        do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
                error "failed to detach file $file"
-       check_lpcc_state $file "none"
        wait_request_state $(path2fid $file) REMOVE SUCCEED
+       check_lpcc_state $file "none"
 }
 run_test 1e "Test RW-PCC with non-root user"
 
@@ -442,13 +453,18 @@ test_1g() {
 
        dd if=/dev/zero of=$file bs=1024 count=1 ||
                error "failed to dd write to $file"
+       chmod 600 $file || error "chmod 600 $file failed"
        do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 &&
-               error "non-root user can dd write to $file"
+               error "non-root user can dd write $file"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 &&
+               error "non-root user can dd read $file"
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite"
        do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 &&
                error "non-root user can dd write to $file"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 &&
+               error "non-root user can dd read $file"
        chmod 777 $DIR2/$tfile || error "chmod 777 $DIR2/$tfile failed"
        do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
                error "non-root user cannot write $file with permission (777)"
@@ -458,8 +474,8 @@ test_1g() {
        chown $RUNAS_ID $file || error "chown $RUNAS_ID $file failed"
        do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
                error "failed to detach file $file"
-       check_lpcc_state $file "none"
        wait_request_state $(path2fid $file) REMOVE SUCCEED
+       check_lpcc_state $file "none"
        do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 ||
                error "non-root user cannot read to $file with permisson (777)"
 }
@@ -607,16 +623,17 @@ run_test 2c "Test multi open on different mount points when creating"
 
 test_3a() {
        local file=$DIR/$tdir/$tfile
+       local file2=$DIR2/$tdir/$tfile
 
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping $SINGLEAGT \
                "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
 
        mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
-       dd if=/dev/zero of=$file bs=1024 count=1 ||
+       dd if=/dev/zero of=$file2 bs=1024 count=1 ||
                error "failed to dd write to $file"
 
-       echo "Start to attach/detach the file: $file"
+       echo "Start to RW-PCC attach/detach the file: $file"
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite"
@@ -624,13 +641,32 @@ test_3a() {
                error "failed to detach file $file"
        check_lpcc_state $file "none"
 
-       echo "Repeat to attach/detach the same file: $file"
+       echo "Repeat to RW-PCC attach/detach the same file: $file"
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite"
        do_facet $SINGLEAGT $LFS pcc detach -k $file ||
                error "failed to detach file $file"
        check_lpcc_state $file "none"
+
+       rm -f $file || error "failed to remove $file"
+       echo "ropcc_data" > $file
+
+       echo "Start to RO-PCC attach/detach the file: $file"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+
+       echo "Repeat to RO-PCC attach/detach the same file: $file"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
 }
 run_test 3a "Repeat attach/detach operations"
 
@@ -642,7 +678,7 @@ test_3b() {
 
        # Start all of the copytools and setup PCC
        for n in $(seq $AGTCOUNT); do
-               copytool setup -f agt$n -a $n -m $MOUNT
+               copytool setup -f agt$n -a $n -m $MOUNT -h $(hsm_root agt$n)
                setup_pcc_mapping agt$n "projid={100}\ rwid=$n\ auto_attach=0"
        done
 
@@ -650,7 +686,7 @@ test_3b() {
        dd if=/dev/zero of=$file bs=1024 count=1 ||
                error "failed to dd write to $file"
 
-       echo "Start to attach/detach $file on $agt1_HOST"
+       echo "Start to RW-PCC attach/detach $file on $agt1_HOST"
        do_facet agt1 $LFS pcc attach -i 1 $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite" agt1
@@ -658,7 +694,7 @@ test_3b() {
                error "failed to detach file $file"
        check_lpcc_state $file "none" agt1
 
-       echo "Repeat to attach/detach $file on $agt2_HOST"
+       echo "Repeat to RW-PCC attach/detach $file on $agt2_HOST"
        do_facet agt2 $LFS pcc attach -i 2 $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite" agt2
@@ -666,7 +702,7 @@ test_3b() {
                error "failed to detach file $file"
        check_lpcc_state $file "none" agt2
 
-       echo "Try attach on two agents"
+       echo "Try RW-PCC attach on two agents"
        do_facet agt1 $LFS pcc attach -i 1 $file ||
                error "failed to attach file $file"
        check_lpcc_state $file "readwrite" agt1
@@ -679,6 +715,37 @@ test_3b() {
        do_facet agt2 $LFS pcc detach -k $file ||
                error "failed to detach file $file"
        check_lpcc_state $file "none" agt2
+
+       echo "Start to RO-PCC attach/detach $file on $agt1_HOST"
+       do_facet agt1 $LFS pcc attach -r -i 1 $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly" agt1
+       do_facet agt1 $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none" agt1
+
+       echo "Repeat to RO-PCC attach/detach $file on $agt2_HOST"
+       do_facet agt2 $LFS pcc attach -r -i 2 $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly" agt2
+       do_facet agt2 $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none" agt2
+
+       echo "Try RO-PCC attach on two agents"
+       do_facet agt1 $LFS pcc attach -r -i 1 $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly" agt1
+       do_facet agt2 $LFS pcc attach -r -i 2 $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly" agt2
+       check_lpcc_state $file "readonly" agt1
+       do_facet agt2 $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none" agt2
+       do_facet agt1 $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none" agt1
 }
 run_test 3b "Repeat attach/detach operations on multiple clients"
 
@@ -915,11 +982,13 @@ test_usrgrp_quota() {
        local loopfile="$TMP/$tfile"
        local mntpt="/mnt/pcc.$tdir"
        local hsm_root="$mntpt/$tdir"
+       local state="readwrite"
        local ug=$1
+       local ro=$2
        local id=$RUNAS_ID
 
        [[ $ug == "g" ]] && id=$RUNAS_GID
-
+       [[ -z $ro ]] || state="readonly"
        setup_loopdev $SINGLEAGT $loopfile $mntpt 50
        do_facet $SINGLEAGT quotacheck -c$ug $mntpt ||
                error "quotacheck -c$ug $mntpt failed"
@@ -946,20 +1015,24 @@ test_usrgrp_quota() {
                error "chown $RUNAS_ID:$RUNAS_GID $file1 failed"
        chown $RUNAS_ID:$RUNAS_GID $file2 ||
                error "chown $RUNAS_ID:$RUNAS_GID $file2 failed"
-       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
+       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $ro \
                $file1 || error "attach $file1 failed"
-       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
+       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $ro \
                $file2 && error "attach $file2 should fail due to quota limit"
-       check_lpcc_state $file1 "readwrite"
+       check_lpcc_state $file1 $state
        check_lpcc_state $file2 "none"
 
+       if [[ -n $ro ]]; then
+               do_facet $SINGLEAGT $LFS pcc detach $file1 ||
+                       error "detach $file1 failed"
+               return 0
+       fi
+
+       echo "Test -EDQUOT error tolerance for RW-PCC"
        do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file1 bs=1M count=30 ||
                error "dd write $file1 failed"
        # -EDQUOT error should be tolerated via fallback to normal Lustre path.
        check_lpcc_state $file1 "none"
-       do_facet $SINGLEAGT $LFS pcc detach -k $file1 ||
-               error "failed to detach file $file"
-       rm $file1 $file2
 }
 
 test_10a() {
@@ -972,6 +1045,16 @@ test_10b() {
 }
 run_test 10b "Test RW-PCC with group quota on loop PCC device"
 
+test_10c() {
+       test_usrgrp_quota "u" "-r"
+}
+run_test 10c "Test RO-PCC with user quota on loop PCC device"
+
+test_10d() {
+       test_usrgrp_quota "g" "-r"
+}
+run_test 10d "Test RO-PCC with group quota on loop PCC device"
+
 test_11() {
        local loopfile="$TMP/$tfile"
        local mntpt="/mnt/pcc.$tdir"
@@ -1230,6 +1313,19 @@ test_14() {
                set_param ldlm.namespaces.*mdc*.lru_size=clear
        check_file_data $SINGLEAGT $file "autodetach_data"
        check_lpcc_state $file "none"
+
+       rm $file || error "rm $file failed"
+       do_facet $SINGLEAGT "echo -n ro_autodetach_data > $file"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+
+       # Revoke the layout lock, the PCC-cached file will be
+       # detached automatically.
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_file_data $SINGLEAGT $file "ro_autodetach_data"
+       check_lpcc_state $file "none"
 }
 run_test 14 "Revocation of the layout lock should detach the file automatically"
 
@@ -1246,7 +1342,7 @@ test_15() {
        mkdir_on_mdt0 $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        chmod 777 $DIR/$tdir || error "chmod 777 $DIR/$tdir failed"
 
-       echo "Check open attach for non-root user"
+       echo "Verify open attach for non-root user"
        do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
                error "failed to dd write to $file"
        do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
@@ -1268,10 +1364,10 @@ test_15() {
                error "PCC detach $file failed"
        rm $file || error "rm $file failed"
 
-       echo "check open attach for root user"
+       echo "Verify auto attach at open for RW-PCC"
        do_facet $SINGLEAGT "echo -n autoattach_data > $file"
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
-               $file || error "PCC attach $file failed"
+               $file || error "RW-PCC attach $file failed"
        check_lpcc_state $file "readwrite"
 
        # Revoke the layout lock, the PCC-cached file will be
@@ -1285,7 +1381,7 @@ test_15() {
        # is not changed, so the file is still valid cached in PCC,
        # and can be reused from PCC cache directly.
        do_facet $SINGLEAGT $LFS pcc detach -k $file ||
-               error "PCC detach $file failed"
+               error "RW-PCC detach $file failed"
        check_lpcc_state $file "readwrite"
        # HSM released exists archived status
        check_hsm_flags $file "0x0000000d"
@@ -1299,6 +1395,27 @@ test_15() {
        # HSM exists archived status
        check_hsm_flags $file "0x00000009"
 
+       echo "Verify auto attach at open for RO-PCC"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+
+       # Revoke the layout lock, the PCC-cached file will be
+       # detached automatically.
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_file_data $SINGLEAGT $file "autoattach_data"
+       check_lpcc_state $file "readonly"
+
+       # Detach the file with "-k" option, as the file layout generation
+       # is not changed, so the file is still valid cached in PCC,
+       # and can be reused from PCC cache directly.
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "autoattach_data"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "RO-PCC detach $file failed"
 }
 run_test 15 "Test auto attach at open when file is still valid cached"
 
@@ -1313,10 +1430,11 @@ test_16() {
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
+       echo "Test detach for RW-PCC"
        do_facet $SINGLEAGT "echo -n detach_data > $file"
        lpcc_path=$(lpcc_fid2path $hsm_root $file)
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
-               $file || error "PCC attach $file failed"
+               $file || error "RW-PCC attach $file failed"
        check_lpcc_state $file "readwrite"
        # HSM released exists archived status
        check_hsm_flags $file "0x0000000d"
@@ -1332,7 +1450,7 @@ test_16() {
        echo "Test for the default detach"
        # Permanent detach by default, it will remove the PCC copy
        do_facet $SINGLEAGT $LFS pcc detach $file ||
-               error "PCC detach $file failed"
+               error "RW-PCC detach $file failed"
        wait_request_state $(path2fid $file) REMOVE SUCCEED
        check_lpcc_state $file "none"
        # File is removed from PCC backend
@@ -1340,6 +1458,21 @@ test_16() {
        do_facet $SINGLEAGT "[ -f $lpcc_path ]" &&
                error "RW-PCC cached file '$lpcc_path' should be removed"
 
+       echo "Test detach for RO-PCC"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "readonly"
+
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "RO-PCC detach $file failed"
+       check_lpcc_state $file "none"
+       do_facet $SINGLEAGT "[ -f $lpcc_path ]" &&
+               error "RO-PCC cached file '$lpcc_path' should be removed"
+
        return 0
 }
 run_test 16 "Test detach with different options"
@@ -1491,6 +1624,812 @@ test_20() {
 }
 run_test 20 "Auto attach works after the inode was once evicted from cache"
 
+test_21a() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       do_facet $SINGLEAGT "echo -n pccro_as_mirror_layout > $file"
+       echo "Plain layout info before PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout info after PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       echo -e "\nFLR layout info after PCC-RO detach '$file':"
+       $LFS getstripe -v $file
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout info after RO-PCC attach $file again:"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       echo -e "\nFLR layout info after RO-PCC detach '$file' again:"
+       $LFS getstripe -v $file
+}
+run_test 21a "PCC-RO storing as a plain HSM mirror component for plain layout"
+
+test_21b() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       $LFS mirror create -N -S 4M -c 2 -N -S 1M -c -1  $file ||
+               error "create mirrored file $file failed"
+       #do_facet $SINGLEAGT "echo -n pccro_as_mirror_layout > $file"
+       echo "FLR layout before PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout after PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       echo -e "\nFLR layout info after PCC-RO detach '$file':"
+       $LFS getstripe -v $file
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout after PCC-RO attach '$file' again:"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       echo -e "\nFLR layout info after PCC-RO detach '$file':"
+       $LFS getstripe -v $file
+}
+run_test 21b "PCC-RO stroing as a plain HSM mirror component for FLR layouts"
+
+test_21c() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local file2=$DIR2/$tfile
+       local fid
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       do_facet $SINGLEAGT "echo -n pccro_hsm_release > $file"
+       fid=$(path2fid $file)
+       $LFS hsm_archive --archive $HSM_ARCHIVE_NUMBER $file ||
+               error "Archive $file failed"
+       wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_state $file
+
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       # HSM exists archived status
+       check_hsm_flags $file "0x00000009"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "pccro_hsm_release"
+
+       $LFS hsm_release $file || error "HSM released $file failed"
+       $LFS getstripe $file
+       $LFS hsm_state $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach $file"
+       check_lpcc_state $file "none"
+       unlink $file || error "unlink $file failed"
+}
+run_test 21c "Verify HSM release works storing PCC-RO as HSM mirror component"
+
+test_21d() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       echo "pccro_init_data" > $file
+       $LFS getstripe $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to PCC-RO attach file $file"
+       check_lpcc_state $file "readonly"
+       echo "PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+
+       echo "Write invalidated PCC-RO cache:"
+       echo -n "write_mod_data" > $file
+       check_lpcc_state $file "none"
+       $LFS getstripe -v $file
+       check_file_data $SINGLEAGT $file "write_mod_data"
+}
+run_test 21d "Write should invalidate PCC-RO caching"
+
+test_21e() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       echo "pccro_init_data" > $file
+       $LFS getstripe $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to PCC-RO attach file $file"
+       check_lpcc_state $file "readonly"
+       echo "PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+
+       echo "Trucate invalidate PCC-RO file '$file':"
+       $TRUNCATE $file 256 || error "failed to truncate $file"
+       $LFS getstripe -v $file
+       check_lpcc_state $file "none"
+       check_file_size $SINGLEAGT $file 256
+}
+run_test 21e "Truncate should invalidate PCC-RO caching"
+
+test_21f() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       echo "pccro_mmap_data" > $file
+       $LFS getstripe $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to PCC-RO attach file $file"
+       check_lpcc_state $file "readonly"
+       echo "PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+
+       echo "Mmap write invalidate PCC-RO caching:"
+       # Mmap write will invalidate the RO-PCC cache
+       do_facet $SINGLEAGT $MULTIOP $file OSMWUc ||
+               error "mmap write $file failed"
+       check_lpcc_state $file "none"
+       $LFS getstripe -v $file
+       # After mmap-write by MULTIOP, the first character of the content
+       # will be increased with 1.
+       content=$(do_facet $SINGLEAGT $MMAP_CAT $file)
+       [[ $content == "qccro_mmap_data" ]] ||
+               error "mmap_cat data mismatch: $content"
+}
+run_test 21f "mmap write should invalidate PCC-RO caching"
+
+test_21g() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       $LFS mirror create -N -S 4M -c 2 -N -S 1M -c -1  $file ||
+               error "create mirrored file '$file' failed"
+       do_facet $SINGLEAGT "echo -n pccro_as_mirror_layout > $file"
+       echo "FLR layout before PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to PCC-RO attach '$file'"
+       echo "FLR layout after PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       echo "Layout after Write invalidate '$file':"
+       echo -n pccro_write_invalidate_mirror > $file
+       $LFS getstripe -v $file
+}
+run_test 21g "PCC-RO for file under FLR write pending state"
+
+test_21h() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       $LFS mirror create -N -S 4M -c 2 -N -S 1M -c -1  $file ||
+               error "create mirrored file $file failed"
+       #do_facet $SINGLEAGT "echo -n pccro_as_mirror_layout > $file"
+       echo "FLR layout before PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout after PCC-RO attach '$file':"
+       $LFS getstripe -v $file
+
+       $LFS mirror extend -N -S 8M -c -1 $file ||
+               error "mirror extend $file failed"
+       echo -e "\nFLR layout after extend a mirror:"
+       $LFS getstripe -v $file
+       $LFS pcc state $file
+       check_lpcc_state $file "none"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       echo -e "\nFLR layout after PCC-RO attach '$file' again:"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+}
+run_test 21h "Extend mirror once file was PCC-RO cached"
+
+test_21i() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local file2=$DIR2/$tfile
+       local fid
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       do_facet $SINGLEAGT "echo -n hsm_release_pcc_file > $file"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RW-PCC detach $file failed"
+       check_lpcc_state $file "none"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to PCC-RO attach $file"
+
+       $LFS hsm_state $file
+       $LFS hsm_release $file || error "HSM released $file failed"
+       echo "Layout after HSM release $file:"
+       $LFS getstripe -v $file
+       echo "PCC state $file:"
+       $LFS pcc state $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       echo "Layout after PCC-RO attach $file again:"
+       $LFS getstripe -v $file
+       echo "PCC state:"
+       $LFS pcc state $file
+
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RW-PCC detach $file failed"
+       check_lpcc_state $file "none"
+}
+run_test 21i "HSM release increase layout gen, should invalidate PCC-RO cache"
+
+test_22() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local file2=$DIR2/$tfile
+       local fid
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       do_facet $SINGLEAGT "echo -n roattach_data > $file"
+
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "RW-PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "RW-PCC detach $file failed"
+       check_lpcc_state $file "none"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       echo "Layout after PCC-RO attach $file:"
+       $LFS getstripe -v $file
+       # HSM exists archived status
+       check_hsm_flags $file "0x00000009"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "roattach_data"
+
+       $LFS hsm_release $file || error "HSM released $file failed"
+       echo "Layout after HSM release $file:"
+       $LFS getstripe -v $file
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       echo "Layout after PCC-RO attach $file again:"
+       $LFS getstripe -v $file
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "roattach_data"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "failed to detach $file"
+       echo "Layout after PCC-RO detach $file:"
+       $LFS getstripe -v $file
+       rm -f $file2 || error "rm -f $file failed"
+       do_facet $SINGLEAGT "echo -n roattach_data2 > $file"
+       fid=$(path2fid $file)
+       $LFS hsm_archive --archive $HSM_ARCHIVE_NUMBER $file ||
+               error "Archive $file failed"
+       wait_request_state $fid ARCHIVE SUCCEED
+       $LFS hsm_release $file || error "HSM released $file failed"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER -r $file ||
+               error "RO-PCC attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "roattach_data2"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "RO-PCC detach $file failed"
+}
+run_test 22 "Test RO-PCC attach for the HSM released file"
+
+test_23() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local -a lpcc_path
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       echo "ropcc_data" > $file
+       lpcc_path=$(lpcc_fid2path $hsm_root $file)
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to RO-PCC attach file $file"
+       check_lpcc_state $file "readonly"
+       check_lpcc_data $SINGLEAGT $lpcc_path $file "ropcc_data"
+
+       local content=$(do_facet $SINGLEAGT $MMAP_CAT $file)
+
+       [[ $content == "ropcc_data" ]] ||
+               error "mmap_cat data mismatch: $content"
+       check_lpcc_state $file "readonly"
+
+       echo -n "write_mod_data" > $file
+       echo "Write should invalidate the RO-PCC cache:"
+       $LFS getstripe -v $file
+       check_lpcc_state $file "none"
+       check_file_data $SINGLEAGT $file "write_mod_data"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to RO-PCC attach file $file"
+       check_lpcc_state $file "readonly"
+       echo "PCC-RO attach '$file' again:"
+       $LFS getstripe -v $file
+
+       echo "Truncate invalidate the RO-PCC cache:"
+       $TRUNCATE $file 256 || error "failed to truncate $file"
+       $LFS getstripe -v $file
+       echo "Finish trucate operation"
+       check_lpcc_state $file "none"
+       check_file_size $SINGLEAGT $file 256
+
+       echo "Mmap write invalidates RO-PCC caching"
+       echo -n mmap_write_data > $file || error "echo write $file failed"
+       $LFS getstripe -v $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to RO-PCC attach file $file"
+       check_lpcc_state $file "readonly"
+       echo "PCC-RO attach '$file' again:"
+       $LFS getstripe -v $file
+       echo "Mmap write $file via multiop"
+       # Mmap write will invalidate the RO-PCC cache
+       do_facet $SINGLEAGT $MULTIOP $file OSMWUc ||
+               error "mmap write $file failed"
+       check_lpcc_state $file "none"
+       $LFS getstripe -v $file
+       # After mmap-write by MULTIOP, the first character of the content
+       # increases 1.
+       content=$(do_facet $SINGLEAGT $MMAP_CAT $file)
+       [[ $content == "nmap_write_data" ]] ||
+               error "mmap_cat data mismatch: $content"
+}
+run_test 23 "Test write/truncate/mmap-write invalidating RO-PCC caching"
+
+test_24a() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tdir/$tfile
+       local -a lpcc_path
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+       $LCTL pcc list $MOUNT
+       mkdir -p $DIR/$tdir
+       chmod 777 $DIR/$tdir
+
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER \
+               $file || error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 ||
+               error "failed to dd read from $file"
+       check_lpcc_state $file "readonly"
+
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach -k $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+
+       # non-root user is forbidden to access PCC file directly
+       lpcc_path=$(lpcc_fid2path $hsm_root $file)
+       do_facet $SINGLEAGT $RUNAS touch $lpcc_path &&
+               error "non-root user can touch access PCC file $lpcc_path"
+       do_facet $SINGLEAGT $RUNAS dd if=$lpcc_path of=/dev/null bs=1024 \
+               count=1 && error "non-root user can read PCC file $lpcc_path"
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$lpcc_path bs=1024 \
+               count=1 && error "non-root user can write PCC file $lpcc_path"
+
+       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER \
+               $file || error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+
+       # Test RO-PCC detach as non-root user
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       do_facet $SINGLEAGT "[ -f $lpcc_path ]" &&
+               error "RO-PCC cached file '$lpcc_path' should be removed"
+
+       return 0
+}
+run_test 24a "Test RO-PCC with non-root user"
+
+test_24b() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tdir/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+       dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write $file"
+       chmod 600 $file || error "chmod 600 $file failed"
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 &&
+               error "non-root user can dd write $file"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 &&
+               error "non-root user can dd read $file"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach file $file"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 &&
+               error "non-root user can dd write $file"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 &&
+               error "non-root user can dd read $file"
+       chmod 777 $file || error "chmod 777 $file failed"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 ||
+               error "non-root user cannot read $file with permission (777)"
+       check_lpcc_state $file "readonly"
+
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file &&
+               error "non-root user or non owner can detach $file"
+       chown $RUNAS_ID $file || error "chown $RUNAS_ID $file failed"
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
+               error "failed to detach file $file"
+       check_lpcc_state $file "none"
+       do_facet $SINGLEAGT $RUNAS dd if=$file of=/dev/null bs=1024 count=1 ||
+               error "non-root user cannot read $file with permission (777)"
+}
+run_test 24b "General permission test for RO-PCC"
+
+test_25() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tdir/$tfile
+       local content
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+
+       echo "ro_fake_mmap_cat_err" > $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach RO-PCC file $file"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "ro_fake_mmap_cat_err"
+
+       # define OBD_FAIL_LLITE_PCC_FAKE_ERROR  0x1411
+       do_facet $SINGLEAGT $LCTL set_param fail_loc=0x1411
+       content=$(do_facet $SINGLEAGT $MMAP_CAT $file)
+       [[ $content == "ro_fake_mmap_cat_err" ]] ||
+               error "failed to fall back to Lustre I/O path for mmap-read"
+       # Above mmap read will return VM_FAULT_SIGBUS failure and
+       # retry the IO on normal IO path.
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "ro_fake_mmap_cat_err"
+
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach RO-PCC file $file"
+       check_lpcc_state $file "none"
+
+       do_facet $SINGLEAGT $LCTL set_param fail_loc=0
+       echo "ro_fake_cat_err" > $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "failed to attach RO-PCC file $file"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "ro_fake_cat_err"
+
+       # define OBD_FAIL_LLITE_PCC_FAKE_ERROR  0x1411
+       do_facet $SINGLEAGT $LCTL set_param fail_loc=0x1411
+       # Fake read I/O will return -EIO failure and
+       # retry the IO on normal IO path.
+       check_file_data $SINGLEAGT $file "ro_fake_cat_err"
+       check_lpcc_state $file "readonly"
+
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "failed to detach RO-PCC file $file"
+       check_lpcc_state $file "none"
+}
+run_test 25 "Tolerate fake read failure for RO-PCC"
+
+test_26() {
+       local agt_host=$(facet_active_host $SINGLEAGT)
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" -h "$hsm_root"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       echo -n attach_keep_open > $file
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "detach $file failed"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "attach_keep_open"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "attach_keep_open"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "attach_keep_open"
+       check_lpcc_state $file "readonly"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readwrite"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       wait_request_state $(path2fid $file) REMOVE SUCCEED
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "attach_keep_open"
+       check_lpcc_state $file "readonly"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+
+       rm $file || error "rm $file failed"
+       echo -n attach_keep_open > $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readwrite"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       wait_request_state $(path2fid $file) REMOVE SUCCEED
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "attach_keep_open"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+       check_lpcc_state $file "none"
+}
+run_test 26 "Repeat the attach/detach when the file has multiple openers"
+
+test_27() {
+       local agt_host=$(facet_active_host $SINGLEAGT)
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" -h "$hsm_root"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ open_attach=1"
+
+       echo -n auto_attach_multi_open > $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readwrite"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "readwrite"
+       check_file_data $SINGLEAGT $file "auto_attach_multi_open"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       wait_request_state $(path2fid $file) REMOVE SUCCEED
+       check_lpcc_state $file "none"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+
+       rm $file || error "rm $file failed"
+       echo -n auto_attach_multi_open > $file
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readwrite"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_lpcc_state $file "readwrite"
+       check_file_data $SINGLEAGT $file "auto_attach_multi_open"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       wait_request_state $(path2fid $file) REMOVE SUCCEED
+       check_lpcc_state $file "none"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "auto_attach_multi_open"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "none"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+
+       do_facet $SINGLEAGT $LFS pcc attach -r -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file failed"
+       check_lpcc_state $file "readonly"
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_lpcc_state $file "readonly"
+       check_file_data $SINGLEAGT $file "auto_attach_multi_open"
+       check_lpcc_state $file "readonly"
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "none"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+}
+run_test 27 "Auto attach at open when the file has multiple openers"
+
+test_28() {
+       local agt_host=$(facet_active_host $SINGLEAGT)
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
+       local file2=$DIR2/$tfile
+       local multipid
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER" -h "$hsm_root"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ auto_attach=0"
+
+       echo -n rw_attach_hasopen_fail > $file
+       rmultiop_start $agt_host $file O_c || error "multiop $file failed"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file &&
+               error "attach $file should fail"
+       rmultiop_stop $agt_host || error "multiop $file close failed"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file should fail"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "none"
+
+       multiop_bg_pause $file2 O_c || error "multiop $file2 failed"
+       multipid=$!
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file &&
+               error "attach $file should fail"
+       kill -USR1 $multipid
+       wait $multipid || error "multiop $file2 close failed"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
+               error "attach $file should fail"
+       check_lpcc_state $file "readwrite"
+       do_facet $SINGLEAGT $LFS pcc detach -k $file ||
+               error "detach $file failed"
+       check_lpcc_state $file "none"
+}
+run_test 28 "RW-PCC attach should fail when the file has cluster-wide openers"
+
 #test 101: containers and PCC
 #LU-15170: Test mount namespaces with PCC
 #This tests the cases where the PCC mount is not present in the container by
index 2242bc4..7ac0367 100644 (file)
@@ -4787,6 +4787,8 @@ static int name2layout(__u32 *layout, char *name)
                        *layout |= LOV_PATTERN_MDT;
                else if (strcmp(layout_name, "overstriping") == 0)
                        *layout |= LOV_PATTERN_OVERSTRIPING;
+               else if (strcmp(layout_name, "foreign") == 0)
+                       *layout |= LOV_PATTERN_FOREIGN;
                else
                        return -1;
        }
@@ -13341,30 +13343,36 @@ static int lfs_pcc_attach(int argc, char **argv)
        struct option long_opts[] = {
        { .val = 'h',   .name = "help", .has_arg = no_argument },
        { .val = 'i',   .name = "id",   .has_arg = required_argument },
+       { .val = 'r',   .name = "readonly",     .has_arg = no_argument },
        { .name = NULL } };
        int c;
        int rc = 0;
-       __u32 archive_id = 0;
+       __u32 attach_id = 0;
        const char *path;
        char *end;
        char fullpath[PATH_MAX];
        enum lu_pcc_type type = LU_PCC_READWRITE;
 
        optind = 0;
-       while ((c = getopt_long(argc, argv, "hi:",
+       while ((c = getopt_long(argc, argv, "hi:r",
                                long_opts, NULL)) != -1) {
                switch (c) {
                case 'i':
                        errno = 0;
-                       archive_id = strtoul(optarg, &end, 0);
+                       attach_id = strtoul(optarg, &end, 0);
                        if (errno != 0 || *end != '\0' ||
-                           archive_id == 0 || archive_id > UINT32_MAX) {
+                           attach_id == 0 || attach_id > UINT32_MAX) {
                                fprintf(stderr,
                                        "error: %s: bad archive ID '%s'\n",
                                        progname, optarg);
                                return CMD_HELP;
                        }
                        break;
+               case 'r':
+                       type = LU_PCC_READONLY;
+                       break;
+               case '?':
+                       return CMD_HELP;
                default:
                        fprintf(stderr, "%s: unrecognized option '%s'\n",
                                progname, argv[optind - 1]);
@@ -13374,7 +13382,7 @@ static int lfs_pcc_attach(int argc, char **argv)
                }
        }
 
-       if (archive_id == 0) {
+       if (attach_id == 0) {
                fprintf(stderr, "%s: must specify attach ID\n", argv[0]);
                return CMD_HELP;
        }
@@ -13397,11 +13405,11 @@ static int lfs_pcc_attach(int argc, char **argv)
                        continue;
                }
 
-               rc2 = llapi_pcc_attach(fullpath, archive_id, type);
+               rc2 = llapi_pcc_attach(fullpath, attach_id, type);
                if (rc2 < 0) {
                        fprintf(stderr,
-                               "%s: cannot attach '%s' to PCC with archive ID '%u': %s\n",
-                               argv[0], path, archive_id, strerror(-rc2));
+                               "%s: cannot attach '%s' to PCC with attach ID '%u': %s\n",
+                               argv[0], path, attach_id, strerror(-rc2));
                        if (rc == 0)
                                rc = rc2;
                }
@@ -13412,33 +13420,37 @@ static int lfs_pcc_attach(int argc, char **argv)
 static int lfs_pcc_attach_fid(int argc, char **argv)
 {
        struct option long_opts[] = {
-       { .val = 'h',   .name = "help", .has_arg = no_argument },
-       { .val = 'i',   .name = "id",   .has_arg = required_argument },
-       { .val = 'm',   .name = "mnt",  .has_arg = required_argument },
+       { .val = 'h',   .name = "help",         .has_arg = no_argument },
+       { .val = 'i',   .name = "id",           .has_arg = required_argument },
+       { .val = 'r',   .name = "readonly",     .has_arg = no_argument },
+       { .val = 'm',   .name = "mnt",          .has_arg = required_argument },
        { .name = NULL } };
        int c;
        int rc = 0;
-       __u32 archive_id = 0;
+       __u32 attach_id = 0;
        char *end;
        const char *mntpath = NULL;
        const char *fidstr;
        enum lu_pcc_type type = LU_PCC_READWRITE;
 
        optind = 0;
-       while ((c = getopt_long(argc, argv, "hi:m:",
+       while ((c = getopt_long(argc, argv, "hi:m:r",
                                long_opts, NULL)) != -1) {
                switch (c) {
                case 'i':
                        errno = 0;
-                       archive_id = strtoul(optarg, &end, 0);
+                       attach_id = strtoul(optarg, &end, 0);
                        if (errno != 0 || *end != '\0' ||
-                           archive_id > UINT32_MAX) {
+                           attach_id > UINT32_MAX) {
                                fprintf(stderr,
-                                       "error: %s: bad archive ID '%s'\n",
+                                       "error: %s: bad attach ID '%s'\n",
                                        argv[0], optarg);
                                return CMD_HELP;
                        }
                        break;
+               case 'r':
+                       type = LU_PCC_READONLY;
+                       break;
                case 'm':
                        mntpath = optarg;
                        break;
@@ -13451,7 +13463,7 @@ static int lfs_pcc_attach_fid(int argc, char **argv)
                }
        }
 
-       if (archive_id == 0) {
+       if (attach_id == 0) {
                fprintf(stderr, "%s: must specify an archive ID\n", argv[0]);
                return CMD_HELP;
        }
@@ -13473,11 +13485,11 @@ static int lfs_pcc_attach_fid(int argc, char **argv)
                fidstr = argv[optind++];
 
                rc2 = llapi_pcc_attach_fid_str(mntpath, fidstr,
-                                              archive_id, type);
+                                              attach_id, type);
                if (rc2 < 0) {
                        fprintf(stderr,
-                               "%s: cannot attach '%s' on '%s' to PCC with archive ID '%u': %s\n",
-                               argv[0], fidstr, mntpath, archive_id,
+                               "%s: cannot attach '%s' on '%s' to PCC with attach ID '%u': %s\n",
+                               argv[0], fidstr, mntpath, attach_id,
                                strerror(rc2));
                }
                if (rc == 0 && rc2 < 0)
index 333d938..6536ec2 100644 (file)
@@ -93,8 +93,12 @@ static int llapi_msg_level = LLAPI_MSG_MAX;
 const char *liblustreapi_cmd;
 
 struct lustre_foreign_type lu_foreign_types[] = {
-       {.lft_type = LU_FOREIGN_TYPE_NONE, .lft_name = "none"},
-       {.lft_type = LU_FOREIGN_TYPE_SYMLINK, .lft_name = "symlink"},
+       {.lft_type = LU_FOREIGN_TYPE_NONE,      .lft_name = "none"},
+       {.lft_type = LU_FOREIGN_TYPE_POSIX,     .lft_name = "posix"},
+       {.lft_type = LU_FOREIGN_TYPE_PCCRW,     .lft_name = "pccrw"},
+       {.lft_type = LU_FOREIGN_TYPE_PCCRO,     .lft_name = "pccro"},
+       {.lft_type = LU_FOREIGN_TYPE_S3,        .lft_name = "S3"},
+       {.lft_type = LU_FOREIGN_TYPE_SYMLINK,   .lft_name = "symlink"},
        /* must be the last element */
        {.lft_type = LU_FOREIGN_TYPE_UNKNOWN, .lft_name = NULL}
        /* array max dimension must be <= UINT32_MAX */
@@ -2612,6 +2616,8 @@ static char *layout2name(__u32 layout_pattern)
 {
        if (layout_pattern & LOV_PATTERN_F_RELEASED)
                return "released";
+       else if (layout_pattern & LOV_PATTERN_FOREIGN)
+               return "foreign";
        else if (layout_pattern == LOV_PATTERN_MDT)
                return "mdt";
        else if (layout_pattern == LOV_PATTERN_RAID0)
@@ -2954,6 +2960,87 @@ static void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name,
        llapi_printf(LLAPI_MSG_NORMAL, "\n");
 }
 
+static void hsm_flags2str(__u32 hsm_flags)
+{
+       bool found = false;
+       int i = 0;
+
+       if (!hsm_flags) {
+               llapi_printf(LLAPI_MSG_NORMAL, "0");
+               return;
+       }
+       for (i = 0; i < ARRAY_SIZE(hsm_flags_table); i++) {
+               if (hsm_flags & hsm_flags_table[i].hfn_flag) {
+                       if (found)
+                               llapi_printf(LLAPI_MSG_NORMAL, ",");
+                       llapi_printf(LLAPI_MSG_NORMAL, "%s",
+                                    hsm_flags_table[i].hfn_name);
+                       found = true;
+               }
+       }
+       if (hsm_flags) {
+               if (found)
+                       llapi_printf(LLAPI_MSG_NORMAL, ",");
+               llapi_printf(LLAPI_MSG_NORMAL, "%#x", hsm_flags);
+       }
+}
+
+static uint32_t check_foreign_type(uint32_t foreign_type)
+{
+       uint32_t i;
+
+       for (i = 0; i < LU_FOREIGN_TYPE_UNKNOWN; i++) {
+               if (lu_foreign_types[i].lft_name == NULL)
+                       break;
+               if (foreign_type == lu_foreign_types[i].lft_type)
+                       return i;
+       }
+
+       return LU_FOREIGN_TYPE_UNKNOWN;
+}
+
+void lov_dump_hsm_lmm(void *lum, char *path, int depth,
+                     enum llapi_layout_verbose verbose,
+                     enum lov_dump_flags flags)
+{
+       struct lov_hsm_md *lhm = lum;
+       bool indent = flags & LDF_INDENT;
+       bool is_dir = flags & LDF_IS_DIR;
+       char *space = indent ? "      " : "";
+
+       if (!is_dir) {
+               uint32_t type = check_foreign_type(lhm->lhm_type);
+
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_magic:         0x%08X\n",
+                            space, lhm->lhm_magic);
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_pattern:       hsm\n",
+                            space);
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_length:        %u\n",
+                            space, lhm->lhm_length);
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_type:          0x%08X",
+                            space, lhm->lhm_type);
+               if (type < LU_FOREIGN_TYPE_UNKNOWN)
+                       llapi_printf(LLAPI_MSG_NORMAL, " (%s)\n",
+                                    lu_foreign_types[type].lft_name);
+               else
+                       llapi_printf(LLAPI_MSG_NORMAL, " (unknown)\n");
+
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_flags:         ", space);
+               hsm_flags2str(lhm->lhm_flags);
+               llapi_printf(LLAPI_MSG_NORMAL, "\n");
+
+               if (!lov_hsm_type_supported(lhm->lhm_type))
+                       return;
+
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_archive_id:    %llu\n",
+                            space, (unsigned long long)lhm->lhm_archive_id);
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_archive_ver:   %llu\n",
+                            space, (unsigned long long)lhm->lhm_archive_ver);
+               llapi_printf(LLAPI_MSG_NORMAL, "%slhm_archive_uuid:  '%.*s'\n",
+                            space, UUID_MAX, lhm->lhm_archive_uuid);
+       }
+}
+
 static void lmv_dump_user_lmm(struct lmv_user_md *lum, char *pool_name,
                              char *path, int obdindex, int depth,
                              enum llapi_layout_verbose verbose,
@@ -3507,6 +3594,9 @@ static void lov_dump_comp_v1(struct find_param *param, char *path,
                                continue;
 
                        v1 = lov_comp_entry(comp_v1, i);
+                       if (v1->lmm_magic == LOV_MAGIC_FOREIGN)
+                               continue;
+
                        objects = lov_v1v3_objects(v1);
 
                        for (j = 0; j < v1->lmm_stripe_count; j++) {
@@ -3606,6 +3696,9 @@ static void lov_dump_comp_v1(struct find_param *param, char *path,
                        if (obdindex != OBD_NOT_FOUND) {
                                flags |= LDF_SKIP_OBJS;
                                v1 = lov_comp_entry(comp_v1, i);
+                               if (v1->lmm_magic == LOV_MAGIC_FOREIGN)
+                                       continue;
+
                                objects = lov_v1v3_objects(v1);
 
                                for (j = 0; j < v1->lmm_stripe_count; j++) {
@@ -3626,13 +3719,19 @@ static void lov_dump_comp_v1(struct find_param *param, char *path,
                lov_dump_comp_v1_entry(param, flags, i);
 
                v1 = lov_comp_entry(comp_v1, i);
-               objects = lov_v1v3_objects(v1);
-               lov_v1v3_pool_name(v1, pool_name);
+               if (v1->lmm_magic == LOV_MAGIC_FOREIGN) {
+                       lov_dump_hsm_lmm(v1, path, param->fp_max_depth,
+                                        param->fp_verbose, flags);
+               } else {
+                       objects = lov_v1v3_objects(v1);
+                       lov_v1v3_pool_name(v1, pool_name);
 
-               ext = entry->lcme_flags & LCME_FL_EXTENSION ? LDF_EXTENSION : 0;
-               lov_dump_user_lmm_v1v3(v1, pool_name, objects, path, obdindex,
-                                      param->fp_max_depth, param->fp_verbose,
-                                      flags | ext);
+                       ext = entry->lcme_flags & LCME_FL_EXTENSION ?
+                             LDF_EXTENSION : 0;
+                       lov_dump_user_lmm_v1v3(v1, pool_name, objects, path,
+                                              obdindex, param->fp_max_depth,
+                                              param->fp_verbose, flags | ext);
+               }
        }
        if (print_last_init_comp(param)) {
                /**
@@ -3648,14 +3747,20 @@ static void lov_dump_comp_v1(struct find_param *param, char *path,
                lov_dump_comp_v1_entry(param, flags, i);
 
                v1 = lov_comp_entry(comp_v1, i);
-               objects = lov_v1v3_objects(v1);
-               lov_v1v3_pool_name(v1, pool_name);
-
-               entry = &comp_v1->lcm_entries[i];
-               ext = entry->lcme_flags & LCME_FL_EXTENSION ? LDF_EXTENSION : 0;
-               lov_dump_user_lmm_v1v3(v1, pool_name, objects, path, obdindex,
-                                      param->fp_max_depth, param->fp_verbose,
-                                      flags | ext);
+               if (v1->lmm_magic == LOV_MAGIC_FOREIGN) {
+                       lov_dump_hsm_lmm(v1, path, param->fp_max_depth,
+                                        param->fp_verbose, flags);
+               } else {
+                       objects = lov_v1v3_objects(v1);
+                       lov_v1v3_pool_name(v1, pool_name);
+
+                       entry = &comp_v1->lcm_entries[i];
+                       ext = entry->lcme_flags & LCME_FL_EXTENSION ?
+                             LDF_EXTENSION : 0;
+                       lov_dump_user_lmm_v1v3(v1, pool_name, objects, path,
+                                              obdindex, param->fp_max_depth,
+                                              param->fp_verbose, flags | ext);
+               }
        }
 }
 
@@ -3759,20 +3864,6 @@ static void lov_dump_plain_user_lmm(struct find_param *param, char *path,
        }
 }
 
-static uint32_t check_foreign_type(uint32_t foreign_type)
-{
-       uint32_t i;
-
-       for (i = 0; i < LU_FOREIGN_TYPE_UNKNOWN; i++) {
-               if (lu_foreign_types[i].lft_name == NULL)
-                       break;
-               if (foreign_type == lu_foreign_types[i].lft_type)
-                       return i;
-       }
-
-       return LU_FOREIGN_TYPE_UNKNOWN;
-}
-
 static void lov_dump_foreign_lmm(struct find_param *param, char *path,
                                 enum lov_dump_flags flags)
 {
index 5291869..da74518 100644 (file)
 
 /**
  * Layout component, which contains all attributes of a plain
- * V1/V3 layout.
+ * V1/V3/FOREIGN(HSM) layout.
  */
 struct llapi_layout_comp {
        uint64_t        llc_pattern;
-       uint64_t        llc_stripe_size;
-       uint64_t        llc_stripe_count;
-       uint64_t        llc_stripe_offset;
-       /* Add 1 so user always gets back a null terminated string. */
-       char            llc_pool_name[LOV_MAXPOOLNAME + 1];
-       /** Number of objects in llc_objects array if was initialized. */
-       uint32_t        llc_objects_count;
-       struct          lov_user_ost_data_v1 *llc_objects;
+       union {
+               struct { /* For plain layout. */
+                       uint64_t        llc_stripe_size;
+                       uint64_t        llc_stripe_count;
+                       uint64_t        llc_stripe_offset;
+                       /**
+                        * Add 1 so user always gets back a null terminated
+                        * string.
+                        */
+                       char            llc_pool_name[LOV_MAXPOOLNAME + 1];
+                       /**
+                        * Number of objects in llc_objects array if was
+                        * initialized.
+                        */
+                       uint32_t        llc_objects_count;
+                       struct lov_user_ost_data_v1 *llc_objects;
+               };
+               struct { /* For FOREIGN/HSM layout. */
+                       uint32_t         llc_length;
+                       uint32_t         llc_type;
+                       uint32_t         llc_hsm_flags;
+                       union {
+                               struct lov_hsm_base      llc_hsm;
+                               char                    *llc_value;
+                       };
+               };
+       };
+
        /* fields used only for composite layouts */
        struct lu_extent        llc_extent;     /* [start, end) of component */
        uint32_t                llc_id;         /* unique ID of component */
@@ -68,6 +88,10 @@ struct llapi_layout_comp {
        bool            llc_ondisk;
 };
 
+#define llc_archive_id llc_hsm.lhb_archive_id
+#define llc_archive_ver        llc_hsm.lhb_archive_ver
+#define llc_uuid       llc_hsm.lhb_uuid
+
 /**
  * An Opaque data type abstracting the layout of a Lustre file.
  */
@@ -96,6 +120,9 @@ static int llapi_layout_objects_in_lum(struct lov_user_md *lum, size_t lum_size)
        uint32_t magic;
        size_t base_size;
 
+       if (lum->lmm_magic == __swab32(LOV_MAGIC_FOREIGN))
+               return 0;
+
        if (lum_size < lov_user_md_size(0, LOV_MAGIC_V1))
                return 0;
 
@@ -162,24 +189,41 @@ llapi_layout_swab_lov_user_md(struct lov_user_md *lum, int lum_size)
                                        ent->lcme_offset);
                        lum_size = ent->lcme_size;
                }
-               obj_count = llapi_layout_objects_in_lum(lum, lum_size);
 
                lum->lmm_magic = __swab32(lum->lmm_magic);
-               lum->lmm_pattern = __swab32(lum->lmm_pattern);
-               lum->lmm_stripe_size = __swab32(lum->lmm_stripe_size);
-               lum->lmm_stripe_count = __swab16(lum->lmm_stripe_count);
-               lum->lmm_stripe_offset = __swab16(lum->lmm_stripe_offset);
-
-               if (lum->lmm_magic != LOV_MAGIC_V1) {
-                       struct lov_user_md_v3 *v3;
-                       v3 = (struct lov_user_md_v3 *)lum;
-                       lod = v3->lmm_objects;
+               if (lum->lmm_magic == LOV_MAGIC_FOREIGN) {
+                       struct lov_hsm_md *lhm;
+
+                       lhm = (struct lov_hsm_md *)lum;
+                       lhm->lhm_length = __swab32(lhm->lhm_length);
+                       lhm->lhm_type = __swab32(lhm->lhm_type);
+                       lhm->lhm_flags = __swab32(lhm->lhm_flags);
+                       if (!lov_hsm_type_supported(lhm->lhm_type))
+                               continue;
+
+                       lhm->lhm_archive_id = __swab64(lhm->lhm_archive_id);
+                       lhm->lhm_archive_ver = __swab64(lhm->lhm_archive_ver);
                } else {
-                       lod = lum->lmm_objects;
-               }
+                       obj_count = llapi_layout_objects_in_lum(lum, lum_size);
+
+                       lum->lmm_pattern = __swab32(lum->lmm_pattern);
+                       lum->lmm_stripe_size = __swab32(lum->lmm_stripe_size);
+                       lum->lmm_stripe_count = __swab16(lum->lmm_stripe_count);
+                       lum->lmm_stripe_offset =
+                               __swab16(lum->lmm_stripe_offset);
 
-               for (j = 0; j < obj_count; j++)
-                       lod[j].l_ost_idx = __swab32(lod[j].l_ost_idx);
+                       if (lum->lmm_magic != LOV_MAGIC_V1) {
+                               struct lov_user_md_v3 *v3;
+
+                               v3 = (struct lov_user_md_v3 *)lum;
+                               lod = v3->lmm_objects;
+                       } else {
+                               lod = lum->lmm_objects;
+                       }
+
+                       for (j = 0; j < obj_count; j++)
+                               lod[j].l_ost_idx = __swab32(lod[j].l_ost_idx);
+               }
        }
 }
 
@@ -274,14 +318,52 @@ static struct llapi_layout_comp *__llapi_comp_alloc(unsigned int num_stripes)
 }
 
 /**
+ * Allocate storage for a HSM component with \a length buffer.
+ *
+ * \retval     valid pointer if allocation succeeds
+ * \retval     NULL if allocate fails
+ */
+static struct llapi_layout_comp *__llapi_comp_hsm_alloc(uint32_t length)
+{
+       struct llapi_layout_comp *comp;
+
+       if (lov_foreign_md_size(length) > XATTR_SIZE_MAX) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       comp = calloc(1, sizeof(*comp));
+       if (comp == NULL) {
+               errno = ENOMEM;
+               return NULL;
+       }
+
+       comp->llc_pattern = LLAPI_LAYOUT_FOREIGN;
+       comp->llc_length = length;
+       comp->llc_type = LU_FOREIGN_TYPE_UNKNOWN;
+       comp->llc_hsm_flags = 0;
+       comp->llc_archive_id = 0;
+       comp->llc_archive_ver = 0;
+       comp->llc_extent.e_start = 0;
+       comp->llc_extent.e_end = LUSTRE_EOF;
+       comp->llc_flags = 0;
+       comp->llc_id = 0;
+       INIT_LIST_HEAD(&comp->llc_list);
+
+       return comp;
+}
+
+/**
  * Free memory allocated for \a comp
  *
  * \param[in] comp     previously allocated by __llapi_comp_alloc()
  */
 static void __llapi_comp_free(struct llapi_layout_comp *comp)
 {
-       if (comp->llc_objects != NULL)
+       if (comp->llc_pattern != LLAPI_LAYOUT_FOREIGN &&
+           comp->llc_objects != NULL) {
                free(comp->llc_objects);
+       }
        free(comp);
 }
 
@@ -416,6 +498,9 @@ static bool llapi_layout_lum_valid(struct lov_user_md *lum, int lum_size)
                        lum = (struct lov_user_md *)((char *)comp_v1 +
                                comp_v1->lcm_entries[i].lcme_offset);
                        lum_size = comp_v1->lcm_entries[i].lcme_size;
+
+                       if (lum->lmm_magic == LOV_MAGIC_FOREIGN)
+                               continue;
                }
                obj_count = llapi_layout_objects_in_lum(lum, lum_size);
 
@@ -451,7 +536,7 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
        struct lov_user_md *v1;
        struct llapi_layout *layout = NULL;
        struct llapi_layout_comp *comp;
-       int i, ent_count = 0, obj_count;
+       int i, ent_count = 0, obj_count = 0;
 
        if (lov_xattr == NULL || lov_xattr_size <= 0) {
                errno = EINVAL;
@@ -532,10 +617,34 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
                        ent = NULL;
                }
 
-               obj_count = llapi_layout_objects_in_lum(v1, lov_xattr_size);
-               comp = __llapi_comp_alloc(obj_count);
-               if (comp == NULL)
-                       goto out_layout;
+               if (v1->lmm_magic == LOV_MAGIC_FOREIGN) {
+                       struct lov_hsm_md *lhm;
+
+                       lhm = (struct lov_hsm_md *)v1;
+                       if (!lov_hsm_type_supported(lhm->lhm_type))
+                               goto out_layout;
+
+                       if (lhm->lhm_length != sizeof(struct lov_hsm_base))
+                               goto out_layout;
+
+                       comp = __llapi_comp_hsm_alloc(lhm->lhm_length);
+                       if (comp == NULL)
+                               goto out_layout;
+
+                       comp->llc_length = lhm->lhm_length;
+                       comp->llc_type = lhm->lhm_type;
+                       comp->llc_hsm_flags = lhm->lhm_flags;
+                       comp->llc_archive_id = lhm->lhm_archive_id;
+                       comp->llc_archive_ver = lhm->lhm_archive_ver;
+                       memcpy(comp->llc_uuid, lhm->lhm_archive_uuid,
+                              sizeof(comp->llc_uuid));
+               } else {
+                       obj_count = llapi_layout_objects_in_lum(v1,
+                                                               lov_xattr_size);
+                       comp = __llapi_comp_alloc(obj_count);
+                       if (comp == NULL)
+                               goto out_layout;
+               }
 
                if (ent != NULL) {
                        comp->llc_extent.e_start = ent->lcme_extent.e_start;
@@ -551,6 +660,11 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
                        comp->llc_flags = 0;
                }
 
+               if (v1->lmm_magic == LOV_MAGIC_FOREIGN) {
+                       comp->llc_pattern = LLAPI_LAYOUT_FOREIGN;
+                       goto comp_add;
+               }
+
                if (v1->lmm_pattern == LOV_PATTERN_RAID0)
                        comp->llc_pattern = LLAPI_LAYOUT_RAID0;
                else if (v1->lmm_pattern == (LOV_PATTERN_RAID0 |
@@ -559,8 +673,8 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
                else if (v1->lmm_pattern & LOV_PATTERN_MDT)
                        comp->llc_pattern = LLAPI_LAYOUT_MDT;
                else
-                       /* Lustre only supports RAID0, overstripping
-                        * and DoM for now.
+                       /* Lustre only supports RAID0, overstripping,
+                        * DoM and FOREIGN/HSM for now.
                         */
                        comp->llc_pattern = v1->lmm_pattern;
 
@@ -584,10 +698,16 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
 
                if (v1->lmm_magic != LOV_USER_MAGIC_V1) {
                        const struct lov_user_md_v3 *lumv3;
+                       size_t size = sizeof(comp->llc_pool_name);
+                       int rc;
+
                        lumv3 = (struct lov_user_md_v3 *)v1;
-                       snprintf(comp->llc_pool_name,
-                                sizeof(comp->llc_pool_name),
-                                "%s", lumv3->lmm_pool_name);
+                       rc = snprintf(comp->llc_pool_name, size,
+                                     "%s", lumv3->lmm_pool_name);
+                       /* Avoid GCC 7 format-truncation warning. */
+                       if (rc > size)
+                               comp->llc_pool_name[size - 1] = 0;
+
                        memcpy(comp->llc_objects, lumv3->lmm_objects,
                               obj_count * sizeof(lumv3->lmm_objects[0]));
                } else {
@@ -600,7 +720,7 @@ struct llapi_layout *llapi_layout_get_by_xattr(void *lov_xattr,
                if (obj_count != 0)
                        comp->llc_stripe_offset =
                                comp->llc_objects[0].l_ost_idx;
-
+comp_add:
                comp->llc_ondisk = true;
                list_add_tail(&comp->llc_list, &layout->llot_comp_list);
                layout->llot_cur_comp = comp;
@@ -630,6 +750,9 @@ __u32 llapi_pattern_to_lov(uint64_t pattern)
        case LLAPI_LAYOUT_MDT:
                lov_pattern = LOV_PATTERN_MDT;
                break;
+       case LLAPI_LAYOUT_FOREIGN:
+               lov_pattern = LOV_PATTERN_FOREIGN;
+               break;
        case LLAPI_LAYOUT_OVERSTRIPING:
                lov_pattern = LOV_PATTERN_OVERSTRIPING | LOV_PATTERN_RAID0;
                break;
@@ -1160,7 +1283,10 @@ int llapi_layout_stripe_count_get(const struct llapi_layout *layout,
                return -1;
        }
 
-       *count = comp->llc_stripe_count;
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN)
+               *count = 0;
+       else
+               *count = comp->llc_stripe_count;
 
        return 0;
 }
@@ -1254,6 +1380,12 @@ static int layout_stripe_size_get(const struct llapi_layout *layout,
                return -1;
        }
 
+       /* FIXME: return a component rather than FOREIGN/HSM component. */
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        comp_ext = comp->llc_flags & LCME_FL_EXTENSION;
        if ((comp_ext && !extension) || (!comp_ext && extension)) {
                errno = EINVAL;
@@ -1299,6 +1431,11 @@ static int layout_stripe_size_set(struct llapi_layout *layout,
        if (comp == NULL)
                return -1;
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        comp_ext = comp->llc_flags & LCME_FL_EXTENSION;
        if ((comp_ext && !extension) || (!comp_ext && extension)) {
                errno = EINVAL;
@@ -1378,7 +1515,8 @@ int llapi_layout_pattern_set(struct llapi_layout *layout, uint64_t pattern)
 
        if (pattern != LLAPI_LAYOUT_DEFAULT &&
            pattern != LLAPI_LAYOUT_RAID0 && pattern != LLAPI_LAYOUT_MDT
-           && pattern != LLAPI_LAYOUT_OVERSTRIPING) {
+           && pattern != LLAPI_LAYOUT_OVERSTRIPING &&
+           pattern != LLAPI_LAYOUT_FOREIGN) {
                errno = EOPNOTSUPP;
                return -1;
        }
@@ -1421,6 +1559,11 @@ int llapi_layout_ost_index_set(struct llapi_layout *layout, int stripe_number,
        if (comp == NULL)
                return -1;
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        if (!llapi_layout_stripe_index_is_valid(ost_index)) {
                errno = EINVAL;
                return -1;
@@ -1481,6 +1624,11 @@ int llapi_layout_ost_index_get(const struct llapi_layout *layout,
        if (comp == NULL)
                return -1;
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        if (index == NULL) {
                errno = EINVAL;
                return -1;
@@ -1525,6 +1673,11 @@ int llapi_layout_pool_name_get(const struct llapi_layout *layout, char *dest,
                return -1;
        }
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        strncpy(dest, comp->llc_pool_name, n);
 
        return 0;
@@ -1553,6 +1706,11 @@ int llapi_layout_pool_name_set(struct llapi_layout *layout,
                return -1;
        }
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN) {
+               errno = EINVAL;
+               return -1;
+       }
+
        strncpy(comp->llc_pool_name, pool_name, sizeof(comp->llc_pool_name));
        return 0;
 }
@@ -1701,6 +1859,12 @@ const char *llapi_layout_flags_string(uint32_t flags)
                return "wp";
        case LCM_FL_SYNC_PENDING:
                return "sp";
+       case LCM_FL_RDONLY | LCM_FL_PCC_RDONLY:
+               return "ro,pccro";
+       case LCM_FL_WRITE_PENDING | LCM_FL_PCC_RDONLY:
+               return "wp,pccro";
+       case LCM_FL_SYNC_PENDING | LCM_FL_PCC_RDONLY:
+               return "sp,pccro";
        }
 
        return "0";
@@ -3332,6 +3496,7 @@ enum llapi_layout_comp_sanity_error {
        LSE_START_GT_END,
        LSE_ALIGN_END,
        LSE_ALIGN_EXT,
+       LSE_FOREIGN_EXTENSION,
        LSE_LAST,
 };
 
@@ -3368,6 +3533,8 @@ const char *const llapi_layout_strerror[] =
                "The component end must be aligned by the stripe size",
        [LSE_ALIGN_EXT] =
                "The extension size must be aligned by the stripe size",
+       [LSE_FOREIGN_EXTENSION] =
+               "FOREIGN components can't be extension space",
 };
 
 struct llapi_layout_sanity_args {
@@ -3504,6 +3671,15 @@ static int llapi_layout_sanity_cb(struct llapi_layout *layout,
                }
        }
 
+       if (comp->llc_pattern == LLAPI_LAYOUT_FOREIGN ||
+           comp->llc_pattern == LOV_PATTERN_FOREIGN) {
+               /* FOREING/HSM components can't be extension components */
+               if (comp->llc_flags & LCME_FL_EXTENSION) {
+                       args->lsa_rc = LSE_FOREIGN_EXTENSION;
+                       goto out_err;
+               }
+       }
+
        /* Extent sanity checks */
        /* Must set previous component extent before adding another */
        if (prev && prev->llc_extent.e_start == 0 &&
index 47192e5..b0c128f 100644 (file)
@@ -45,7 +45,7 @@
  * Fetch and attach a file to readwrite PCC.
  *
  */
-static int llapi_readwrite_pcc_attach_fd(int fd, __u32 archive_id)
+static int llapi_pcc_attach_rw_fd(int fd, __u32 archive_id)
 {
        int rc;
        struct ll_ioc_lease *data;
@@ -82,7 +82,7 @@ static int llapi_readwrite_pcc_attach_fd(int fd, __u32 archive_id)
        return rc;
 }
 
-static int llapi_readwrite_pcc_attach(const char *path, __u32 archive_id)
+static int llapi_pcc_attach_rw(const char *path, __u32 archive_id)
 {
        int fd;
        int rc;
@@ -95,7 +95,51 @@ static int llapi_readwrite_pcc_attach(const char *path, __u32 archive_id)
                return rc;
        }
 
-       rc = llapi_readwrite_pcc_attach_fd(fd, archive_id);
+       rc = llapi_pcc_attach_rw_fd(fd, archive_id);
+
+       close(fd);
+       return rc;
+}
+
+static int llapi_pcc_attach_ro_fd(int fd, __u32 roid)
+{
+       struct lu_pcc_attach attach;
+       int rc;
+
+       attach.pcca_id = roid;
+       attach.pcca_type = LU_PCC_READONLY;
+       rc = ioctl(fd, LL_IOC_PCC_ATTACH, &attach);
+       if (rc) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "cannot attach the file to PCC with ID %u failed",
+                           roid);
+       }
+
+       return rc;
+}
+
+static int llapi_pcc_attach_ro(const char *path, __u32 roid)
+{
+       int fd;
+       int rc;
+
+       if (strlen(path) <= 0 || path[0] != '/') {
+               rc = -EINVAL;
+               llapi_err_noerrno(LLAPI_MSG_ERROR, "invalid file path: %s",
+                                 path);
+               return rc;
+       }
+
+       fd = open(path, O_RDONLY);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc, "open file: %s failed",
+                           path);
+               return rc;
+       }
+
+       rc = llapi_pcc_attach_ro_fd(fd, roid);
 
        close(fd);
        return rc;
@@ -107,7 +151,10 @@ int llapi_pcc_attach(const char *path, __u32 id, enum lu_pcc_type type)
 
        switch (type & LU_PCC_TYPE_MASK) {
        case LU_PCC_READWRITE:
-               rc = llapi_readwrite_pcc_attach(path, id);
+               rc = llapi_pcc_attach_rw(path, id);
+               break;
+       case LU_PCC_READONLY:
+               rc = llapi_pcc_attach_ro(path, id);
                break;
        default:
                rc = -EINVAL;
@@ -116,9 +163,9 @@ int llapi_pcc_attach(const char *path, __u32 id, enum lu_pcc_type type)
        return rc;
 }
 
-static int llapi_readwrite_pcc_attach_fid(const char *mntpath,
-                                         const struct lu_fid *fid,
-                                         __u32 id)
+static int llapi_pcc_attach_rw_fid(const char *mntpath,
+                                  const struct lu_fid *fid,
+                                  __u32 rwid)
 {
        int rc;
        int fd;
@@ -132,7 +179,29 @@ static int llapi_readwrite_pcc_attach_fid(const char *mntpath,
                return rc;
        }
 
-       rc = llapi_readwrite_pcc_attach_fd(fd, id);
+       rc = llapi_pcc_attach_rw_fd(fd, rwid);
+
+       close(fd);
+       return rc;
+}
+
+static int llapi_pcc_attach_ro_fid(const char *mntpath,
+                                  const struct lu_fid *fid,
+                                  __u32 roid)
+{
+       int rc;
+       int fd;
+
+       fd = llapi_open_by_fid(mntpath, fid, O_RDONLY);
+       if (fd < 0) {
+               rc = -errno;
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "llapi_open_by_fid for " DFID "failed",
+                           PFID(fid));
+               return rc;
+       }
+
+       rc = llapi_pcc_attach_ro_fd(fd, roid);
 
        close(fd);
        return rc;
@@ -145,7 +214,10 @@ int llapi_pcc_attach_fid(const char *mntpath, const struct lu_fid *fid,
 
        switch (type & LU_PCC_TYPE_MASK) {
        case LU_PCC_READWRITE:
-               rc = llapi_readwrite_pcc_attach_fid(mntpath, fid, id);
+               rc = llapi_pcc_attach_rw_fid(mntpath, fid, id);
+               break;
+       case LU_PCC_READONLY:
+               rc = llapi_pcc_attach_ro_fid(mntpath, fid, id);
                break;
        default:
                rc = -EINVAL;
index c478905..89e6ec8 100644 (file)
@@ -5743,8 +5743,8 @@ int jt_pcc_add(int argc, char **argv)
 
        if (optind + 2 != argc) {
                fprintf(stderr,
-                       "%s: must specify mount path and PCC path %d:%d\n",
-                       jt_cmdname(argv[0]), optind, argc);
+                       "%s: must specify mount path and PCC path\n",
+                       jt_cmdname(argv[0]));
                return CMD_HELP;
        }
 
@@ -5769,7 +5769,7 @@ int jt_pcc_del(int argc, char **argv)
 
        optind = 1;
        if (argc != 3) {
-               fprintf(stderr, "%s: require 3 arguments\n",
+               fprintf(stderr, "%s: require 2 arguments\n",
                        jt_cmdname(argv[0]));
                return CMD_HELP;
        }
@@ -5793,7 +5793,7 @@ int jt_pcc_clear(int argc, char **argv)
 
        optind = 1;
        if (argc != 2) {
-               fprintf(stderr, "%s: require 2 arguments\n",
+               fprintf(stderr, "%s: require 1 arguments\n",
                        jt_cmdname(argv[0]));
                return CMD_HELP;
        }
@@ -5814,7 +5814,7 @@ int jt_pcc_list(int argc, char **argv)
 
        optind = 1;
        if (argc != 2) {
-               fprintf(stderr, "%s: require 2 arguments\n",
+               fprintf(stderr, "%s: require 1 arguments\n",
                        jt_cmdname(argv[0]));
                return CMD_HELP;
        }