Whamcloud - gitweb
LU-12526 pcc: Auto attach for PCC during IO
[fs/lustre-release.git] / lustre / llite / pcc.c
index b99c702..e6f9d8f 100644 (file)
@@ -459,12 +459,30 @@ pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
                if (id <= 0)
                        return -EINVAL;
                cmd->u.pccc_add.pccc_roid = id;
+       } else if (strcmp(key, "auto_attach") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id == 0)
+                       cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
        } else if (strcmp(key, "open_attach") == 0) {
                rc = kstrtoul(val, 10, &id);
                if (rc)
                        return rc;
-               if (id > 0)
-                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_OPEN_ATTACH;
+               if (id == 0)
+                       cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
+       } else if (strcmp(key, "io_attach") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id == 0)
+                       cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
+       } else if (strcmp(key, "stat_attach") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id == 0)
+                       cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
        } else if (strcmp(key, "rwpcc") == 0) {
                rc = kstrtoul(val, 10, &id);
                if (rc)
@@ -491,6 +509,18 @@ pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
        char *token;
        int rc;
 
+       switch (cmd->pccc_cmd) {
+       case PCC_ADD_DATASET:
+               /* Enable auto attach by default */
+               cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
+               break;
+       case PCC_DEL_DATASET:
+       case PCC_CLEAR_ALL:
+               break;
+       default:
+               return -EINVAL;
+       }
+
        val = buffer;
        while (val != NULL && strlen(val) != 0) {
                token = strsep(&val, " ");
@@ -974,7 +1004,6 @@ static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
 {
        pcci->pcci_lli = lli;
        lli->lli_pcc_inode = pcci;
-       lli->lli_pcc_state = PCC_STATE_FL_NONE;
        atomic_set(&pcci->pcci_refcount, 0);
        pcci->pcci_type = LU_PCC_NONE;
        pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
@@ -1044,9 +1073,9 @@ void pcc_file_init(struct pcc_file *pccf)
        pccf->pccf_type = LU_PCC_NONE;
 }
 
-static inline bool pcc_open_attach_enabled(struct pcc_dataset *dataset)
+static inline bool pcc_auto_attach_enabled(struct pcc_dataset *dataset)
 {
-       return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
+       return dataset->pccd_flags & PCC_DATASET_AUTO_ATTACH;
 }
 
 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
@@ -1059,7 +1088,7 @@ static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
 
        ENTRY;
 
-       if (!(lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH))
+       if (!(lli->lli_pcc_state & PCC_STATE_FL_AUTO_ATTACH))
                RETURN(0);
 
 #ifndef HAVE_VFS_SETXATTR
@@ -1121,6 +1150,8 @@ static void pcc_inode_attach_init(struct pcc_dataset *dataset,
                                  struct dentry *dentry,
                                  enum lu_pcc_type type)
 {
+       struct ll_inode_info *lli = pcci->pcci_lli;
+
        pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
        pcci->pcci_path.dentry = dentry;
        LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
@@ -1128,11 +1159,12 @@ static void pcc_inode_attach_init(struct pcc_dataset *dataset,
        pcci->pcci_type = type;
        pcci->pcci_attr_valid = false;
 
-       if (pcc_open_attach_enabled(dataset)) {
-               struct ll_inode_info *lli = pcci->pcci_lli;
-
+       if (dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH)
                lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
-       }
+       if (dataset->pccd_flags & PCC_DATASET_IO_ATTACH)
+               lli->lli_pcc_state |= PCC_STATE_FL_IO_ATTACH;
+       if (dataset->pccd_flags & PCC_DATASET_STAT_ATTACH)
+               lli->lli_pcc_state |= PCC_STATE_FL_STAT_ATTACH;
 }
 
 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
@@ -1244,7 +1276,7 @@ static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
        down_read(&super->pccs_rw_sem);
        list_for_each_entry_safe(dataset, tmp,
                                 &super->pccs_datasets, pccd_linkage) {
-               if (!pcc_open_attach_enabled(dataset))
+               if (!pcc_auto_attach_enabled(dataset))
                        continue;
                rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
                if (rc < 0 || (!rc && *cached))
@@ -1255,13 +1287,15 @@ static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
        RETURN(rc);
 }
 
-static int pcc_try_open_attach(struct inode *inode, bool *cached)
+static int pcc_try_auto_attach(struct inode *inode, bool *cached, bool is_open)
 {
        struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
        struct cl_layout clt = {
                .cl_layout_gen = 0,
                .cl_is_released = false,
        };
+       struct ll_inode_info *lli = ll_i2info(inode);
+       __u32 gen;
        int rc;
 
        ENTRY;
@@ -1277,13 +1311,25 @@ static int pcc_try_open_attach(struct inode *inode, bool *cached)
         * obtain valid layout lock from MDT (i.e. the file is being
         * HSM restoring).
         */
-       if (ll_layout_version_get(ll_i2info(inode)) == CL_LAYOUT_GEN_NONE)
-               RETURN(0);
+       if (is_open) {
+               if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
+                       RETURN(0);
+       } else {
+               rc = ll_layout_refresh(inode, &gen);
+               if (rc)
+                       RETURN(rc);
+       }
 
        rc = pcc_get_layout_info(inode, &clt);
        if (rc)
                RETURN(rc);
 
+       if (!is_open && gen != clt.cl_layout_gen) {
+               CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
+                      PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
+               RETURN(-EINVAL);
+       }
+
        if (clt.cl_is_released)
                rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
                                             LU_PCC_READWRITE, cached);
@@ -1315,7 +1361,9 @@ int pcc_file_open(struct inode *inode, struct file *file)
                GOTO(out_unlock, rc = 0);
 
        if (!pcci || !pcc_inode_has_layout(pcci)) {
-               rc = pcc_try_open_attach(inode, &cached);
+               if (lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH)
+                       rc = pcc_try_auto_attach(inode, &cached, true);
+
                if (rc < 0 || !cached)
                        GOTO(out_unlock, rc);
 
@@ -1383,8 +1431,9 @@ out:
        RETURN_EXIT;
 }
 
-static void pcc_io_init(struct inode *inode, bool *cached)
+static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
 {
+       struct ll_inode_info *lli = ll_i2info(inode);
        struct pcc_inode *pcci;
 
        pcc_inode_lock(inode);
@@ -1395,6 +1444,17 @@ static void pcc_io_init(struct inode *inode, bool *cached)
                *cached = true;
        } else {
                *cached = false;
+               if ((lli->lli_pcc_state & PCC_STATE_FL_IO_ATTACH &&
+                    iot != PIT_GETATTR) ||
+                   (iot == PIT_GETATTR &&
+                    lli->lli_pcc_state & PCC_STATE_FL_STAT_ATTACH)) {
+                       (void) pcc_try_auto_attach(inode, cached, false);
+                       if (*cached) {
+                               pcci = ll_i2pcci(inode);
+                               LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
+                               atomic_inc(&pcci->pcci_active_ios);
+                       }
+               }
        }
        pcc_inode_unlock(inode);
 }
@@ -1460,7 +1520,7 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb,
                RETURN(0);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_READ, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1532,7 +1592,7 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb,
                RETURN(-EAGAIN);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_WRITE, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1568,7 +1628,7 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
                RETURN(0);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_SETATTR, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1604,7 +1664,7 @@ int pcc_inode_getattr(struct inode *inode, bool *cached)
                RETURN(0);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_GETATTR, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1668,7 +1728,7 @@ ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
        if (!file_inode(pcc_file)->i_fop->splice_read)
                RETURN(-ENOTSUPP);
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_SPLICE_READ, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1695,7 +1755,7 @@ int pcc_fsync(struct file *file, loff_t start, loff_t end,
                RETURN(0);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_FSYNC, cached);
        if (!*cached)
                RETURN(0);
 
@@ -1811,7 +1871,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                CDEBUG(D_MMAP,
                       "%s: PCC backend fs not support ->page_mkwrite()\n",
                       ll_i2sbi(inode)->ll_fsname);
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
                up_read(&mm->mmap_sem);
                *cached = true;
                RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
@@ -1819,7 +1879,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
        /* Pause to allow for a race with concurrent detach */
        OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
        if (!*cached) {
                /* This happens when the file is detached from PCC after got
                 * the fault page via ->fault() on the inode of the PCC copy.
@@ -1852,7 +1912,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
         */
        if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
                pcc_io_fini(inode);
-               pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
+               pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
                up_read(&mm->mmap_sem);
                RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
        }
@@ -1886,7 +1946,7 @@ int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
                RETURN(0);
        }
 
-       pcc_io_init(inode, cached);
+       pcc_io_init(inode, PIT_FAULT, cached);
        if (!*cached)
                RETURN(0);
 
@@ -2107,15 +2167,23 @@ int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
        return rc;
 }
 
-int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
-                         struct dentry *pcc_dentry)
+int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
 {
+       struct dentry *pcc_dentry = pca->pca_dentry;
        const struct cred *old_cred;
        struct pcc_inode *pcci;
-       int rc = 0;
+       int rc;
 
        ENTRY;
 
+       if (!pca->pca_dataset)
+               RETURN(0);
+
+       if (!inode)
+               GOTO(out_dataset_put, rc = 0);
+
+       LASSERT(pcc_dentry);
+
        old_cred = override_creds(pcc_super_cred(inode->i_sb));
        pcc_inode_lock(inode);
        LASSERT(ll_i2pcci(inode) == NULL);
@@ -2129,7 +2197,8 @@ int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
                GOTO(out_put, rc);
 
        pcc_inode_init(pcci, ll_i2info(inode));
-       pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
+       pcc_inode_attach_init(pca->pca_dataset, pcci, pcc_dentry,
+                             LU_PCC_READWRITE);
 
        rc = pcc_layout_xattr_set(pcci, 0);
        if (rc) {
@@ -2152,9 +2221,36 @@ out_put:
 out_unlock:
        pcc_inode_unlock(inode);
        revert_creds(old_cred);
+out_dataset_put:
+       pcc_dataset_put(pca->pca_dataset);
        RETURN(rc);
 }
 
+void pcc_create_attach_cleanup(struct super_block *sb,
+                              struct pcc_create_attach *pca)
+{
+       if (!pca->pca_dataset)
+               return;
+
+       if (pca->pca_dentry) {
+               const struct cred *old_cred;
+               int rc;
+
+               old_cred = override_creds(pcc_super_cred(sb));
+               rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
+                                  pca->pca_dentry);
+               if (rc)
+                       CWARN("failed to unlink PCC file %.*s, rc = %d\n",
+                             pca->pca_dentry->d_name.len,
+                             pca->pca_dentry->d_name.name, rc);
+               /* ignore the unlink failure */
+               revert_creds(old_cred);
+               dput(pca->pca_dentry);
+       }
+
+       pcc_dataset_put(pca->pca_dataset);
+}
+
 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
                          loff_t *offset)
 {
@@ -2323,7 +2419,6 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
        old_cred = override_creds(pcc_super_cred(inode->i_sb));
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
-       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
        if (rc || lease_broken) {
                if (attached && pcci)
                        pcc_inode_put(pcci);
@@ -2340,6 +2435,7 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
        if (rc)
                GOTO(out_put, rc);
 
+       LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
        rc = ll_layout_refresh(inode, &gen2);
        if (!rc) {
                if (gen2 == gen) {
@@ -2358,6 +2454,7 @@ out_put:
                pcc_inode_put(pcci);
        }
 out_unlock:
+       lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
        pcc_inode_unlock(inode);
        revert_creds(old_cred);
        RETURN(rc);