Whamcloud - gitweb
LU-10092 pcc: Non-blocking PCC caching
[fs/lustre-release.git] / lustre / llite / pcc.c
index 2de2715..be3a9db 100644 (file)
@@ -382,17 +382,25 @@ static inline void pcc_inode_unlock(struct inode *inode)
        mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
 }
 
-static void pcc_inode_init(struct pcc_inode *pcci)
+static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
 {
+       pcci->pcci_lli = lli;
+       lli->lli_pcc_inode = pcci;
        atomic_set(&pcci->pcci_refcount, 0);
        pcci->pcci_type = LU_PCC_NONE;
+       pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
+       atomic_set(&pcci->pcci_active_ios, 0);
+       init_waitqueue_head(&pcci->pcci_waitq);
 }
 
 static void pcc_inode_fini(struct pcc_inode *pcci)
 {
+       struct ll_inode_info *lli = pcci->pcci_lli;
+
        path_put(&pcci->pcci_path);
        pcci->pcci_type = LU_PCC_NONE;
        OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+       lli->lli_pcc_inode = NULL;
 }
 
 static void pcc_inode_get(struct pcc_inode *pcci)
@@ -408,13 +416,11 @@ static void pcc_inode_put(struct pcc_inode *pcci)
 
 void pcc_inode_free(struct inode *inode)
 {
-       struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       struct pcc_inode *pcci = ll_i2pcci(inode);
 
        if (pcci) {
                WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
                pcc_inode_put(pcci);
-               lli->lli_pcc_inode = NULL;
        }
 }
 
@@ -444,6 +450,11 @@ void pcc_file_init(struct pcc_file *pccf)
        pccf->pccf_type = LU_PCC_NONE;
 }
 
+static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
+{
+       return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
+}
+
 int pcc_file_open(struct inode *inode, struct file *file)
 {
        struct pcc_inode *pcci;
@@ -464,7 +475,8 @@ int pcc_file_open(struct inode *inode, struct file *file)
        if (!pcci)
                GOTO(out_unlock, rc = 0);
 
-       if (atomic_read(&pcci->pcci_refcount) == 0)
+       if (atomic_read(&pcci->pcci_refcount) == 0 ||
+           !pcc_inode_has_layout(pcci))
                GOTO(out_unlock, rc = 0);
 
        pcc_inode_get(pcci);
@@ -522,6 +534,74 @@ void pcc_file_release(struct inode *inode, struct file *file)
        pccf->pccf_file = NULL;
 out:
        pcc_inode_unlock(inode);
+       RETURN_EXIT;
+}
+
+static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
+                                     __u32 gen)
+{
+       pcci->pcci_layout_gen = gen;
+}
+
+static void pcc_io_init(struct inode *inode, bool *cached)
+{
+       struct pcc_inode *pcci;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+               atomic_inc(&pcci->pcci_active_ios);
+               *cached = true;
+       } else {
+               *cached = false;
+       }
+       pcc_inode_unlock(inode);
+}
+
+static void pcc_io_fini(struct inode *inode)
+{
+       struct pcc_inode *pcci = ll_i2pcci(inode);
+
+       LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
+       if (atomic_dec_and_test(&pcci->pcci_active_ios))
+               wake_up_all(&pcci->pcci_waitq);
+}
+
+
+static ssize_t
+__pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
+{
+       struct file *file = iocb->ki_filp;
+
+#ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
+       return file->f_op->read_iter(iocb, iter);
+#else
+       struct iovec iov;
+       struct iov_iter i;
+       ssize_t bytes = 0;
+
+       iov_for_each(iov, i, *iter) {
+               ssize_t res;
+
+               res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
+               if (-EIOCBQUEUED == res)
+                       res = wait_on_sync_kiocb(iocb);
+               if (res <= 0) {
+                       if (bytes == 0)
+                               bytes = res;
+                       break;
+               }
+
+               bytes += res;
+               if (res < iov.iov_len)
+                       break;
+       }
+
+       if (bytes > 0)
+               iov_iter_advance(iter, bytes);
+       return bytes;
+#endif
 }
 
 ssize_t pcc_file_read_iter(struct kiocb *iocb,
@@ -530,6 +610,7 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
        struct file *file = iocb->ki_filp;
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct inode *inode = file_inode(file);
        ssize_t result;
 
        ENTRY;
@@ -538,12 +619,20 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
                *cached = false;
                RETURN(0);
        }
-       *cached = true;
-       iocb->ki_filp = pccf->pccf_file;
 
-       result = generic_file_read_iter(iocb, iter);
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       iocb->ki_filp = pccf->pccf_file;
+       /* generic_file_aio_read does not support ext4-dax,
+        * __pcc_file_read_iter uses ->aio_read hook directly
+        * to add support for ext4-dax.
+        */
+       result = __pcc_file_read_iter(iocb, iter);
        iocb->ki_filp = file;
 
+       pcc_io_fini(inode);
        RETURN(result);
 }
 
@@ -588,6 +677,7 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
        struct file *file = iocb->ki_filp;
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct pcc_file *pccf = &fd->fd_pcc_file;
+       struct inode *inode = file_inode(file);
        ssize_t result;
 
        ENTRY;
@@ -596,10 +686,18 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
                *cached = false;
                RETURN(0);
        }
-       *cached = true;
 
-       if (pccf->pccf_type != LU_PCC_READWRITE)
-               RETURN(-EWOULDBLOCK);
+       if (pccf->pccf_type != LU_PCC_READWRITE) {
+               *cached = false;
+               RETURN(-EAGAIN);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+               GOTO(out, result = -ENOSPC);
 
        iocb->ki_filp = pccf->pccf_file;
 
@@ -609,6 +707,8 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
         */
        result = __pcc_file_write_iter(iocb, iter);
        iocb->ki_filp = file;
+out:
+       pcc_io_fini(inode);
        RETURN(result);
 }
 
@@ -616,9 +716,9 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
                      bool *cached)
 {
        int rc;
-       struct pcc_inode *pcci;
        struct iattr attr2 = *attr;
        struct dentry *pcc_dentry;
+       struct pcc_inode *pcci;
 
        ENTRY;
 
@@ -627,28 +727,26 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
                RETURN(0);
        }
 
-       pcc_inode_lock(inode);
-       pcci = ll_i2pcci(inode);
-       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
-               GOTO(out_unlock, rc = 0);
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
 
-       *cached = true;
        attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
                         ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
                         ATTR_CTIME);
+       pcci = ll_i2pcci(inode);
        pcc_dentry = pcci->pcci_path.dentry;
        inode_lock(pcc_dentry->d_inode);
        rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
        inode_unlock(pcc_dentry->d_inode);
-out_unlock:
-       pcc_inode_unlock(inode);
+
+       pcc_io_fini(inode);
        RETURN(rc);
 }
 
 int pcc_inode_getattr(struct inode *inode, bool *cached)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_inode *pcci;
        struct kstat stat;
        s64 atime;
        s64 mtime;
@@ -662,15 +760,13 @@ int pcc_inode_getattr(struct inode *inode, bool *cached)
                RETURN(0);
        }
 
-       pcc_inode_lock(inode);
-       pcci = ll_i2pcci(inode);
-       if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
-               GOTO(out_unlock, rc = 0);
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
 
-       *cached = true;
-       rc = ll_vfs_getattr(&pcci->pcci_path, &stat);
+       rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
        if (rc)
-               GOTO(out_unlock, rc);
+               GOTO(out, rc);
 
        ll_inode_size_lock(inode);
        if (inode->i_atime.tv_sec < lli->lli_atime ||
@@ -702,12 +798,311 @@ int pcc_inode_getattr(struct inode *inode, bool *cached)
        inode->i_ctime.tv_sec = ctime;
 
        ll_inode_size_unlock(inode);
+out:
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
 
-out_unlock:
+ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
+                            struct pipe_inode_info *pipe,
+                            size_t count, unsigned int flags,
+                            bool *cached)
+{
+       struct inode *inode = file_inode(in_file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       ssize_t result;
+
+       ENTRY;
+
+       *cached = false;
+       if (!pcc_file)
+               RETURN(0);
+
+       if (!file_inode(pcc_file)->i_fop->splice_read)
+               RETURN(-ENOTSUPP);
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
+                                                         ppos, pipe, count,
+                                                         flags);
+
+       pcc_io_fini(inode);
+       RETURN(result);
+}
+
+int pcc_fsync(struct file *file, loff_t start, loff_t end,
+             int datasync, bool *cached)
+{
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       int rc;
+
+       ENTRY;
+
+       if (!pcc_file) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+#ifdef HAVE_FILE_FSYNC_4ARGS
+       rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                               start, end, datasync);
+#elif defined(HAVE_FILE_FSYNC_2ARGS)
+       rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
+#else
+       rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                               file_dentry(dentry), datasync);
+#endif
+
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
+
+int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
+                 bool *cached)
+{
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct pcc_inode *pcci;
+       int rc = 0;
+
+       ENTRY;
+
+       if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
+               *cached = true;
+               vma->vm_file = pcc_file;
+               rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
+               vma->vm_file = file;
+               /* Save the vm ops of backend PCC */
+               vma->vm_private_data = (void *)vma->vm_ops;
+       } else {
+               *cached = false;
+       }
+       pcc_inode_unlock(inode);
+
+       RETURN(rc);
+}
+
+void pcc_vm_open(struct vm_area_struct *vma)
+{
+       struct pcc_inode *pcci;
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
+               RETURN_EXIT;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               vma->vm_file = pcc_file;
+               pcc_vm_ops->open(vma);
+               vma->vm_file = file;
+       }
+       pcc_inode_unlock(inode);
+       EXIT;
+}
+
+void pcc_vm_close(struct vm_area_struct *vma)
+{
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
+               RETURN_EXIT;
+
+       pcc_inode_lock(inode);
+       /* Layout lock maybe revoked here */
+       vma->vm_file = pcc_file;
+       pcc_vm_ops->close(vma);
+       vma->vm_file = file;
        pcc_inode_unlock(inode);
+       EXIT;
+}
+
+int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
+                    bool *cached)
+{
+       struct page *page = vmf->page;
+       struct mm_struct *mm = vma->vm_mm;
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+       int rc;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       /* Pause to allow for a race with concurrent detach */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
+
+       pcc_io_init(inode, cached);
+       if (!*cached) {
+               /* This happens when the file is detached from PCC after got
+                * the fault page via ->fault() on the inode of the PCC copy.
+                * Here it can not simply fall back to normal Lustre I/O path.
+                * The reason is that the address space of fault page used by
+                * ->page_mkwrite() is still the one of PCC inode. In the
+                * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
+                * handled as the address space of the fault page is not
+                * consistent with the one of the Lustre inode (though the
+                * fault page was truncated).
+                * As the file is detached from PCC, the fault page must
+                * be released frist, and retry the mmap write (->fault() and
+                * ->page_mkwrite).
+                * We use an ugly and tricky method by returning
+                * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
+                * __do_page_fault and retry the memory fault handling.
+                */
+               if (page->mapping == file_inode(pcc_file)->i_mapping) {
+                       *cached = true;
+                       up_read(&mm->mmap_sem);
+                       RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+               }
+
+               RETURN(0);
+       }
+
+       /*
+        * This fault injection can also be used to simulate -ENOSPC and
+        * -EDQUOT failure of underlying PCC backend fs.
+        */
+       if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
+               pcc_io_fini(inode);
+               pcc_ioctl_detach(inode);
+               up_read(&mm->mmap_sem);
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
+
+       vma->vm_file = pcc_file;
+#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
+       rc = pcc_vm_ops->page_mkwrite(vmf);
+#else
+       rc = pcc_vm_ops->page_mkwrite(vma, vmf);
+#endif
+       vma->vm_file = file;
+
+       pcc_io_fini(inode);
+       RETURN(rc);
+}
+
+int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
+             bool *cached)
+{
+       struct file *file = vma->vm_file;
+       struct inode *inode = file_inode(file);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+       struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
+       int rc;
+
+       ENTRY;
+
+       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
+               *cached = false;
+               RETURN(0);
+       }
+
+       pcc_io_init(inode, cached);
+       if (!*cached)
+               RETURN(0);
+
+       vma->vm_file = pcc_file;
+#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
+       rc = pcc_vm_ops->fault(vmf);
+#else
+       rc = pcc_vm_ops->fault(vma, vmf);
+#endif
+       vma->vm_file = file;
+
+       pcc_io_fini(inode);
        RETURN(rc);
 }
 
+static void pcc_layout_wait(struct pcc_inode *pcci)
+{
+       struct l_wait_info lwi = { 0 };
+
+       while (atomic_read(&pcci->pcci_active_ios) > 0) {
+               CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
+                      atomic_read(&pcci->pcci_active_ios));
+               l_wait_event(pcci->pcci_waitq,
+                            atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
+       }
+}
+
+static void __pcc_layout_invalidate(struct pcc_inode *pcci)
+{
+       pcci->pcci_type = LU_PCC_NONE;
+       pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
+       pcc_layout_wait(pcci);
+}
+
+void pcc_layout_invalidate(struct inode *inode)
+{
+       struct pcc_inode *pcci;
+
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci)) {
+               LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+               __pcc_layout_invalidate(pcci);
+
+               CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
+                      PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
+
+               pcc_inode_put(pcci);
+       }
+       pcc_inode_unlock(inode);
+}
+
+static int pcc_inode_remove(struct pcc_inode *pcci)
+{
+       struct dentry *dentry;
+       int rc;
+
+       dentry = pcci->pcci_path.dentry;
+       rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
+       if (rc)
+               CWARN("failed to unlink cached file, rc = %d\n", rc);
+
+       return rc;
+}
+
 /* Create directory under base if directory does not exist */
 static struct dentry *
 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
@@ -752,9 +1147,10 @@ pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
                *ptr = '\0';
                child = pcc_mkdir(parent, entry_name, mode);
                *ptr = '/';
+               dput(parent);
                if (IS_ERR(child))
                        break;
-               dput(parent);
+
                parent = child;
                ptr++;
                entry_name = ptr;
@@ -849,23 +1245,35 @@ int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
                          struct dentry *pcc_dentry)
 {
-       struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
+       int rc = 0;
 
        ENTRY;
 
+       pcc_inode_lock(inode);
        LASSERT(ll_i2pcci(inode) == NULL);
        OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
        if (pcci == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out_unlock, rc = -ENOMEM);
 
-       pcc_inode_init(pcci);
-       pcc_inode_lock(inode);
+       pcc_inode_init(pcci, ll_i2info(inode));
        pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
-       lli->lli_pcc_inode = pcci;
-       pcc_inode_unlock(inode);
+       /* Set the layout generation of newly created file with 0 */
+       pcc_layout_gen_set(pcci, 0);
+
+out_unlock:
+       if (rc) {
+               int rc2;
 
-       RETURN(0);
+               rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
+               if (rc2)
+                       CWARN("failed to unlink PCC file, rc = %d\n", rc2);
+
+               dput(pcc_dentry);
+       }
+
+       pcc_inode_unlock(inode);
+       RETURN(rc);
 }
 
 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
@@ -921,6 +1329,28 @@ out_fs:
        RETURN(rc);
 }
 
+static int pcc_attach_allowed_check(struct inode *inode)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+       int rc = 0;
+
+       ENTRY;
+
+       pcc_inode_lock(inode);
+       if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
+               GOTO(out_unlock, rc = -EBUSY);
+
+       pcci = ll_i2pcci(inode);
+       if (pcci && pcc_inode_has_layout(pcci))
+               GOTO(out_unlock, rc = -EEXIST);
+
+       lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
+out_unlock:
+       pcc_inode_unlock(inode);
+       RETURN(rc);
+}
+
 int pcc_readwrite_attach(struct file *file, struct inode *inode,
                         __u32 archive_id)
 {
@@ -934,26 +1364,14 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
 
        ENTRY;
 
-       pcc_inode_lock(inode);
-       pcci = ll_i2pcci(inode);
-       if (pcci == NULL) {
-               OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
-               if (pcci == NULL) {
-                       pcc_inode_unlock(inode);
-                       RETURN(-ENOMEM);
-               }
-
-               pcc_inode_init(pcci);
-       } else if (atomic_read(&pcci->pcci_refcount) > 0) {
-               pcc_inode_unlock(inode);
-               RETURN(-EEXIST);
-       }
-       pcc_inode_unlock(inode);
+       rc = pcc_attach_allowed_check(inode);
+       if (rc)
+               RETURN(rc);
 
        dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
                                  archive_id);
        if (dataset == NULL)
-               GOTO(out_free_pcci, rc = -ENOENT);
+               RETURN(-ENOENT);
 
        rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
        if (rc)
@@ -978,74 +1396,116 @@ int pcc_readwrite_attach(struct file *file, struct inode *inode,
        if (rc)
                GOTO(out_fput, rc);
 
+       /* Pause to allow for a race with concurrent HSM remove */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
+
        pcc_inode_lock(inode);
-       if (lli->lli_pcc_inode)
-               GOTO(out_unlock, rc = -EEXIST);
+       pcci = ll_i2pcci(inode);
+       LASSERT(!pcci);
+       OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+       if (pcci == NULL)
+               GOTO(out_unlock, rc = -ENOMEM);
+
+       pcc_inode_init(pcci, lli);
        pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
-       lli->lli_pcc_inode = pcci;
 out_unlock:
        pcc_inode_unlock(inode);
 out_fput:
        fput(pcc_filp);
 out_dentry:
-       if (rc)
+       if (rc) {
+               int rc2;
+
+               rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
+               if (rc2)
+                       CWARN("failed to unlink PCC file, rc = %d\n", rc2);
+
                dput(dentry);
+       }
 out_dataset_put:
        pcc_dataset_put(dataset);
-out_free_pcci:
-       if (rc)
-               OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
        RETURN(rc);
-
 }
 
 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
-                             bool lease_broken, int rc, bool attached)
+                             __u32 gen, bool lease_broken, int rc,
+                             bool attached)
 {
-       struct pcc_inode *pcci = ll_i2pcci(inode);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci;
+       __u32 gen2;
 
        ENTRY;
 
-       if ((rc || lease_broken) && attached && pcci)
-               pcc_inode_put(pcci);
+       pcc_inode_lock(inode);
+       pcci = ll_i2pcci(inode);
+       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
+       if ((rc || lease_broken)) {
+               if (attached && pcci)
+                       pcc_inode_put(pcci);
 
+               GOTO(out_unlock, rc);
+       }
+
+       /* PCC inode may be released due to layout lock revocatioin */
+       if (!pcci)
+               GOTO(out_unlock, rc = -ESTALE);
+
+       LASSERT(attached);
+       rc = ll_layout_refresh(inode, &gen2);
+       if (!rc) {
+               if (gen2 == gen) {
+                       pcc_layout_gen_set(pcci, gen);
+               } else {
+                       CDEBUG(D_CACHE,
+                              DFID" layout changed from %d to %d.\n",
+                              PFID(ll_inode2fid(inode)), gen, gen2);
+                       GOTO(out_put, rc = -ESTALE);
+               }
+       }
+
+out_put:
+       if (rc) {
+               pcc_inode_remove(pcci);
+               pcc_inode_put(pcci);
+       }
+out_unlock:
+       pcc_inode_unlock(inode);
        RETURN(rc);
 }
 
 int pcc_ioctl_detach(struct inode *inode)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
-       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       struct pcc_inode *pcci;
        int rc = 0;
-       int count;
 
        ENTRY;
 
        pcc_inode_lock(inode);
-       if (pcci == NULL)
-               GOTO(out_unlock, rc = 0);
-
-       count = atomic_read(&pcci->pcci_refcount);
-       if (count > 1)
-               GOTO(out_unlock, rc = -EBUSY);
-       else if (count == 0)
+       pcci = lli->lli_pcc_inode;
+       if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
+           !pcc_inode_has_layout(pcci))
                GOTO(out_unlock, rc = 0);
 
+       __pcc_layout_invalidate(pcci);
        pcc_inode_put(pcci);
-       lli->lli_pcc_inode = NULL;
+
 out_unlock:
        pcc_inode_unlock(inode);
-
        RETURN(rc);
 }
 
-int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
+int pcc_ioctl_state(struct file *file, struct inode *inode,
+                   struct lu_pcc_state *state)
 {
        int rc = 0;
        int count;
        char *buf;
        char *path;
        int buf_len = sizeof(state->pccs_path);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+       struct pcc_file *pccf = &fd->fd_pcc_file;
        struct pcc_inode *pcci;
 
        ENTRY;
@@ -1067,12 +1527,17 @@ int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
        count = atomic_read(&pcci->pcci_refcount);
        if (count == 0) {
                state->pccs_type = LU_PCC_NONE;
+               state->pccs_open_count = 0;
                GOTO(out_unlock, rc = 0);
        }
+
+       if (pcc_inode_has_layout(pcci))
+               count--;
+       if (pccf->pccf_file != NULL)
+               count--;
        state->pccs_type = pcci->pcci_type;
-       state->pccs_open_count = count - 1;
-       state->pccs_flags = pcci->pcci_attr_valid ?
-                           PCC_STATE_FLAG_ATTR_VALID : 0;
+       state->pccs_open_count = count;
+       state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
 #ifdef HAVE_DENTRY_PATH_RAW
        path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
        if (IS_ERR(path))