Whamcloud - gitweb
LU-10092 pcc: auto attach during open for valid cache 87/33787/21
authorQian Yingjin <qian@ddn.com>
Wed, 5 Dec 2018 03:22:11 +0000 (11:22 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 13 Jun 2019 04:35:06 +0000 (04:35 +0000)
In current PCC implementation, all PCC state information is
stored in the in-memory data structure named pcc_inode (a member
of data structure ll_inode_info). Once the file inode is reclaimed
due to the memory pressure or memory shrinking, the corresponding
in-memory pcc_inode will be released too, and the PCC-cached file
will be detached automatically. And the revocation of layout lock
will also trigger the detach of the PCC-cached file. These all lead
that the still valid PCC-cached file can not be used.

To solve this problem, we introduce an auto-attaching mechanism
during open. During PCC attach, the L.Gen will be stored as
extented attribute of the local copy file on PCC device. When the
in-memory inode is reclaimed or the layout lock is revoked, and
the file is opend again, it can check whether the stored L.Gen on
the PCC copy is same as the Lustre file current L.Gen on MDT. If
they are consistent, it means the cached copy on PCC device is still
valid, we can continue to use it after auto-attach.

Test-Parameters: testlist=sanity-pcc,sanity-pcc,sanity-pcc
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I63be96f8d83816529983d0f97af0aaca81703fed
Reviewed-on: https://review.whamcloud.com/33787
Tested-by: Jenkins
Reviewed-by: Li Xi <lixi@ddn.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Patrick Farrell <pfarrell@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/pcc.c
lustre/llite/pcc.h
lustre/lov/lov_object.c
lustre/tests/sanity-pcc.sh

index 413c5f9..b4e5571 100644 (file)
@@ -296,6 +296,8 @@ struct cl_layout {
        u32             cl_layout_gen;
        /** whether layout is a composite one */
        bool            cl_is_composite;
+       /** Whether layout is a HSM released one */
+       bool            cl_is_released;
 };
 
 /**
index ccccdac..710aca8 100644 (file)
@@ -2335,6 +2335,8 @@ enum lu_pcc_state_flags {
        PCC_STATE_FL_ATTR_VALID         = 0x01,
        /* The file is being attached into PCC */
        PCC_STATE_FL_ATTACHING          = 0x02,
+       /* Allow to auto attach at open */
+       PCC_STATE_FL_OPEN_ATTACH        = 0x04,
 };
 
 struct lu_pcc_state {
index 32e5f65..3a1f132 100644 (file)
@@ -122,7 +122,7 @@ int pcc_super_init(struct pcc_super *super)
 
        /* Never override disk quota limits or use reserved space */
        cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
-       spin_lock_init(&super->pccs_lock);
+       init_rwsem(&super->pccs_rw_sem);
        INIT_LIST_HEAD(&super->pccs_datasets);
 
        return 0;
@@ -459,6 +459,24 @@ pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
                if (id <= 0)
                        return -EINVAL;
                cmd->u.pccc_add.pccc_roid = id;
+       } else if (strcmp(key, "open_attach") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_OPEN_ATTACH;
+       } else if (strcmp(key, "rwpcc") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
+       } else if (strcmp(key, "ropcc") == 0) {
+               rc = kstrtoul(val, 10, &id);
+               if (rc)
+                       return rc;
+               if (id > 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
        } else {
                return -EINVAL;
        }
@@ -481,6 +499,24 @@ pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
                        return rc;
        }
 
+       switch (cmd->pccc_cmd) {
+       case PCC_ADD_DATASET:
+               if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
+                   cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
+                       return -EINVAL;
+               /*
+                * By default, a PCC backend can provide caching service for
+                * both RW-PCC and RO-PCC.
+                */
+               if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
+                       cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
+               break;
+       case PCC_DEL_DATASET:
+       case PCC_CLEAR_ALL:
+               break;
+       default:
+               return -EINVAL;
+       }
        return 0;
 }
 
@@ -627,15 +663,18 @@ pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
        struct pcc_dataset *dataset;
        struct pcc_dataset *selected = NULL;
 
-       spin_lock(&super->pccs_lock);
+       down_read(&super->pccs_rw_sem);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
+               if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
+                       continue;
+
                if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
                        atomic_inc(&dataset->pccd_refcount);
                        selected = dataset;
                        break;
                }
        }
-       spin_unlock(&super->pccs_lock);
+       up_read(&super->pccs_rw_sem);
        if (selected)
                CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
                       dataset->pccd_rule.pmr_conds_str,
@@ -673,6 +712,7 @@ pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
        strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
        dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
        dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
+       dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
        atomic_set(&dataset->pccd_refcount, 1);
 
        rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
@@ -681,7 +721,7 @@ pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
                return rc;
        }
 
-       spin_lock(&super->pccs_lock);
+       down_write(&super->pccs_rw_sem);
        list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
                if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
                    (dataset->pccd_rwid != 0 &&
@@ -694,7 +734,7 @@ pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
        }
        if (!found)
                list_add(&dataset->pccd_linkage, &super->pccs_datasets);
-       spin_unlock(&super->pccs_lock);
+       up_write(&super->pccs_rw_sem);
 
        if (found) {
                pcc_dataset_put(dataset);
@@ -717,15 +757,16 @@ pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
         * archive ID (read-write ID) or read-only ID is unique in the list,
         * we just return last added one as first priority.
         */
-       spin_lock(&super->pccs_lock);
+       down_read(&super->pccs_rw_sem);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
-               if (type == LU_PCC_READWRITE && dataset->pccd_rwid != id)
+               if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
+                   !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
                        continue;
                atomic_inc(&dataset->pccd_refcount);
                selected = dataset;
                break;
        }
-       spin_unlock(&super->pccs_lock);
+       up_read(&super->pccs_rw_sem);
        if (selected)
                CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
 
@@ -749,17 +790,17 @@ pcc_dataset_del(struct pcc_super *super, char *pathname)
        struct pcc_dataset *dataset;
        int rc = -ENOENT;
 
-       spin_lock(&super->pccs_lock);
+       down_write(&super->pccs_rw_sem);
        list_for_each_safe(l, tmp, &super->pccs_datasets) {
                dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
                if (strcmp(dataset->pccd_pathname, pathname) == 0) {
-                       list_del(&dataset->pccd_linkage);
+                       list_del_init(&dataset->pccd_linkage);
                        pcc_dataset_put(dataset);
                        rc = 0;
                        break;
                }
        }
-       spin_unlock(&super->pccs_lock);
+       up_write(&super->pccs_rw_sem);
        return rc;
 }
 
@@ -768,6 +809,7 @@ pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
 {
        seq_printf(m, "%s:\n", dataset->pccd_pathname);
        seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
+       seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
        seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
 }
 
@@ -776,11 +818,11 @@ pcc_super_dump(struct pcc_super *super, struct seq_file *m)
 {
        struct pcc_dataset *dataset;
 
-       spin_lock(&super->pccs_lock);
+       down_read(&super->pccs_rw_sem);
        list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
                pcc_dataset_dump(dataset, m);
        }
-       spin_unlock(&super->pccs_lock);
+       up_read(&super->pccs_rw_sem);
        return 0;
 }
 
@@ -788,11 +830,13 @@ static void pcc_remove_datasets(struct pcc_super *super)
 {
        struct pcc_dataset *dataset, *tmp;
 
+       down_write(&super->pccs_rw_sem);
        list_for_each_entry_safe(dataset, tmp,
                                 &super->pccs_datasets, pccd_linkage) {
                list_del(&dataset->pccd_linkage);
                pcc_dataset_put(dataset);
        }
+       up_write(&super->pccs_rw_sem);
 }
 
 void pcc_super_fini(struct pcc_super *super)
@@ -999,19 +1043,263 @@ void pcc_file_init(struct pcc_file *pccf)
        pccf->pccf_type = LU_PCC_NONE;
 }
 
+static inline bool pcc_open_attach_enabled(struct pcc_dataset *dataset)
+{
+       return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
+}
+
+static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
+
+static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
+{
+       struct dentry *pcc_dentry = pcci->pcci_path.dentry;
+       struct ll_inode_info *lli = pcci->pcci_lli;
+       int rc;
+
+       ENTRY;
+
+       if (!(lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH))
+               RETURN(0);
+
+#ifndef HAVE_VFS_SETXATTR
+       if (!pcc_dentry->d_inode->i_op->setxattr)
+               RETURN(-ENOTSUPP);
+
+       rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
+                                                &gen, sizeof(gen), 0);
+#else
+       rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
+                           &gen, sizeof(gen), 0);
+#endif
+       RETURN(rc);
+}
+
+static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
+{
+       struct lu_env *env;
+       struct ll_inode_info *lli = ll_i2info(inode);
+       __u16 refcheck;
+       int rc;
+
+       ENTRY;
+
+       if (!lli->lli_clob)
+               RETURN(-EINVAL);
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       rc = cl_object_layout_get(env, lli->lli_clob, clt);
+       if (rc)
+               CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
+                      PFID(ll_inode2fid(inode)));
+
+       cl_env_put(env, &refcheck);
+       RETURN(rc);
+}
+
+static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
+                                   struct pcc_dataset *dataset)
+{
+       return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
+                       DFID_NOBRACE,
+                       dataset->pccd_pathname,
+                       (fid)->f_oid       & 0xFFFF,
+                       (fid)->f_oid >> 16 & 0xFFFF,
+                       (unsigned int)((fid)->f_seq       & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
+                       (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
+                       PFID(fid));
+}
+
+/* Must be called with pcci->pcci_lock held */
+static void pcc_inode_attach_init(struct pcc_dataset *dataset,
+                                 struct pcc_inode *pcci,
+                                 struct dentry *dentry,
+                                 enum lu_pcc_type type)
+{
+       pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+       pcci->pcci_path.dentry = dentry;
+       LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
+       atomic_set(&pcci->pcci_refcount, 1);
+       pcci->pcci_type = type;
+       pcci->pcci_attr_valid = false;
+
+       if (pcc_open_attach_enabled(dataset)) {
+               struct ll_inode_info *lli = pcci->pcci_lli;
+
+               lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
+       }
+}
+
+static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
+                                     __u32 gen)
+{
+       pcci->pcci_layout_gen = gen;
+}
+
 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
 {
        return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
 }
 
+static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
+                                 enum lu_pcc_type type,
+                                 struct pcc_dataset *dataset,
+                                 bool *cached)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct pcc_inode *pcci = lli->lli_pcc_inode;
+       const struct cred *old_cred;
+       struct dentry *pcc_dentry;
+       struct path path;
+       char *pathname;
+       __u32 pcc_gen;
+       int rc;
+
+       ENTRY;
+
+       if (type == LU_PCC_READWRITE &&
+           !(dataset->pccd_flags & PCC_DATASET_RWPCC))
+               RETURN(0);
+
+       OBD_ALLOC(pathname, PATH_MAX);
+       if (pathname == NULL)
+               RETURN(-ENOMEM);
+
+       pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
+
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
+       rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
+       if (rc)
+               /* ignore this error */
+               GOTO(out, rc = 0);
+
+       pcc_dentry = path.dentry;
+#ifndef HAVE_VFS_SETXATTR
+       if (!pcc_dentry->d_inode->i_op->getxattr)
+               /* ignore this error */
+               GOTO(out_put_path, rc = 0);
+
+       rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
+                                                &pcc_gen, sizeof(pcc_gen));
+#else
+       rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
+                           &pcc_gen, sizeof(pcc_gen));
+#endif
+
+       if (rc < 0)
+               /* ignore this error */
+               GOTO(out_put_path, rc = 0);
+
+       rc = 0;
+       /* The file is still valid cached in PCC, attach it immediately. */
+       if (pcc_gen == gen) {
+               CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
+                      PFID(&lli->lli_fid), gen);
+               if (!pcci) {
+                       OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+                       if (pcci == NULL)
+                               GOTO(out_put_path, rc = -ENOMEM);
+
+                       pcc_inode_init(pcci, lli);
+                       dget(pcc_dentry);
+                       pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
+               } else {
+                       /*
+                        * This happened when a file was once attached into
+                        * PCC, and some processes keep this file opened
+                        * (pcci->refcount > 1) and corresponding PCC file
+                        * without any I/O activity, and then this file was
+                        * detached by the manual detach command or the
+                        * revocation of the layout lock (i.e. cached LRU lock
+                        * shrinking).
+                        */
+                       pcc_inode_get(pcci);
+                       pcci->pcci_type = type;
+               }
+               pcc_layout_gen_set(pcci, gen);
+               *cached = true;
+       }
+out_put_path:
+       path_put(&path);
+out:
+       revert_creds(old_cred);
+       OBD_FREE(pathname, PATH_MAX);
+       RETURN(rc);
+}
+
+static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
+                                  enum lu_pcc_type type, bool *cached)
+{
+       struct pcc_dataset *dataset, *tmp;
+       struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
+       int rc = 0;
+
+       ENTRY;
+
+       down_read(&super->pccs_rw_sem);
+       list_for_each_entry_safe(dataset, tmp,
+                                &super->pccs_datasets, pccd_linkage) {
+               if (!pcc_open_attach_enabled(dataset))
+                       continue;
+               rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
+               if (rc < 0 || (!rc && *cached))
+                       break;
+       }
+       up_read(&super->pccs_rw_sem);
+
+       RETURN(rc);
+}
+
+static int pcc_try_open_attach(struct inode *inode, bool *cached)
+{
+       struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
+       struct cl_layout clt = {
+               .cl_layout_gen = 0,
+               .cl_is_released = false,
+       };
+       int rc;
+
+       ENTRY;
+
+       /*
+        * Quick check whether there is PCC device.
+        */
+       if (list_empty(&super->pccs_datasets))
+               RETURN(0);
+
+       /*
+        * The file layout lock was cancelled. And this open does not
+        * obtain valid layout lock from MDT (i.e. the file is being
+        * HSM restoring).
+        */
+       if (ll_layout_version_get(ll_i2info(inode)) == CL_LAYOUT_GEN_NONE)
+               RETURN(0);
+
+       rc = pcc_get_layout_info(inode, &clt);
+       if (rc)
+               RETURN(rc);
+
+       if (clt.cl_is_released)
+               rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
+                                            LU_PCC_READWRITE, cached);
+
+       RETURN(rc);
+}
+
 int pcc_file_open(struct inode *inode, struct file *file)
 {
        struct pcc_inode *pcci;
+       struct ll_inode_info *lli = ll_i2info(inode);
        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct pcc_file *pccf = &fd->fd_pcc_file;
        struct file *pcc_file;
        struct path *path;
        struct qstr *dname;
+       bool cached = false;
        int rc = 0;
 
        ENTRY;
@@ -1021,13 +1309,19 @@ int pcc_file_open(struct inode *inode, struct file *file)
 
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
-       if (!pcci)
-               GOTO(out_unlock, rc = 0);
 
-       if (atomic_read(&pcci->pcci_refcount) == 0 ||
-           !pcc_inode_has_layout(pcci))
+       if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
                GOTO(out_unlock, rc = 0);
 
+       if (!pcci || !pcc_inode_has_layout(pcci)) {
+               rc = pcc_try_open_attach(inode, &cached);
+               if (rc < 0 || !cached)
+                       GOTO(out_unlock, rc);
+
+               if (!pcci)
+                       pcci = ll_i2pcci(inode);
+       }
+
        pcc_inode_get(pcci);
        WARN_ON(pccf->pccf_file);
 
@@ -1088,12 +1382,6 @@ out:
        RETURN_EXIT;
 }
 
-static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
-                                     __u32 gen)
-{
-       pcci->pcci_layout_gen = gen;
-}
-
 static void pcc_io_init(struct inode *inode, bool *cached)
 {
        struct pcc_inode *pcci;
@@ -1519,11 +1807,20 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
 
        ENTRY;
 
-       if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
+       if (!pcc_file || !pcc_vm_ops) {
                *cached = false;
                RETURN(0);
        }
 
+       if (!pcc_vm_ops->page_mkwrite &&
+           page->mapping == pcc_file->f_mapping) {
+               CDEBUG(D_MMAP,
+                      "%s: PCC backend fs not support ->page_mkwrite()\n",
+                      ll_i2sbi(inode)->ll_fsname);
+               pcc_ioctl_detach(inode);
+               up_read(&mm->mmap_sem);
+               RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+       }
        /* Pause to allow for a race with concurrent detach */
        OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
 
@@ -1545,7 +1842,7 @@ int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
                 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
                 * __do_page_fault and retry the memory fault handling.
                 */
-               if (page->mapping == file_inode(pcc_file)->i_mapping) {
+               if (page->mapping == pcc_file->f_mapping) {
                        *cached = true;
                        up_read(&mm->mmap_sem);
                        RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
@@ -1647,16 +1944,15 @@ void pcc_layout_invalidate(struct inode *inode)
        pcc_inode_unlock(inode);
 }
 
-static int pcc_inode_remove(struct pcc_inode *pcci)
+static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
 {
-       struct dentry *dentry;
        int rc;
 
-       dentry = pcci->pcci_path.dentry;
-       rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
+       rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
        if (rc)
-               CWARN("failed to unlink PCC file %.*s, rc = %d\n",
-                     dentry->d_name.len, dentry->d_name.name, rc);
+               CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
+                     ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
+                     pcc_dentry->d_name.name, rc);
 
        return rc;
 }
@@ -1744,20 +2040,6 @@ out:
        return dentry;
 }
 
-/* Must be called with pcci->pcci_lock held */
-static void pcc_inode_attach_init(struct pcc_dataset *dataset,
-                                 struct pcc_inode *pcci,
-                                 struct dentry *dentry,
-                                 enum lu_pcc_type type)
-{
-       pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
-       pcci->pcci_path.dentry = dentry;
-       LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
-       atomic_set(&pcci->pcci_refcount, 1);
-       pcci->pcci_type = type;
-       pcci->pcci_attr_valid = false;
-}
-
 static int __pcc_inode_create(struct pcc_dataset *dataset,
                              struct lu_fid *fid,
                              struct dentry **dentry)
@@ -1840,37 +2122,37 @@ int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
        LASSERT(ll_i2pcci(inode) == NULL);
        OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
        if (pcci == NULL)
-               GOTO(out_unlock, rc = -ENOMEM);
+               GOTO(out_put, rc = -ENOMEM);
 
        rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
                                   old_cred->sgid);
        if (rc)
-               GOTO(out_unlock, rc);
+               GOTO(out_put, rc);
 
        pcc_inode_init(pcci, ll_i2info(inode));
        pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
-       /* Set the layout generation of newly created file with 0 */
-       pcc_layout_gen_set(pcci, 0);
 
-out_unlock:
+       rc = pcc_layout_xattr_set(pcci, 0);
        if (rc) {
-               int rc2;
+               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+               pcc_inode_put(pcci);
+               GOTO(out_unlock, rc);
+       }
 
-               rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
-               if (rc2)
-                       CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
-                             ll_i2sbi(inode)->ll_fsname,
-                             pcc_dentry->d_name.len, pcc_dentry->d_name.name,
-                             rc2);
+       /* Set the layout generation of newly created file with 0 */
+       pcc_layout_gen_set(pcci, 0);
 
+out_put:
+       if (rc) {
+               (void) pcc_inode_remove(inode, pcc_dentry);
                dput(pcc_dentry);
-       }
 
+               if (pcci)
+                       OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
+       }
+out_unlock:
        pcc_inode_unlock(inode);
        revert_creds(old_cred);
-       if (rc && pcci)
-               OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
-
        RETURN(rc);
 }
 
@@ -2022,16 +2304,9 @@ out_fput:
        fput(pcc_filp);
 out_dentry:
        if (rc) {
-               int rc2;
-
                old_cred = override_creds(pcc_super_cred(inode->i_sb));
-               rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
+               (void) pcc_inode_remove(inode, dentry);
                revert_creds(old_cred);
-               if (rc2)
-                       CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
-                             ll_i2sbi(inode)->ll_fsname, dentry->d_name.len,
-                             dentry->d_name.name, rc2);
-
                dput(dentry);
        }
 out_dataset_put:
@@ -2050,6 +2325,7 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
 
        ENTRY;
 
+       old_cred = override_creds(pcc_super_cred(inode->i_sb));
        pcc_inode_lock(inode);
        pcci = ll_i2pcci(inode);
        lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
@@ -2065,6 +2341,10 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
                GOTO(out_unlock, rc = -ESTALE);
 
        LASSERT(attached);
+       rc = pcc_layout_xattr_set(pcci, gen);
+       if (rc)
+               GOTO(out_put, rc);
+
        rc = ll_layout_refresh(inode, &gen2);
        if (!rc) {
                if (gen2 == gen) {
@@ -2079,13 +2359,12 @@ int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
 
 out_put:
        if (rc) {
-               old_cred = override_creds(pcc_super_cred(inode->i_sb));
-               pcc_inode_remove(pcci);
-               revert_creds(old_cred);
+               (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
                pcc_inode_put(pcci);
        }
 out_unlock:
        pcc_inode_unlock(inode);
+       revert_creds(old_cred);
        RETURN(rc);
 }
 
index 0840a0b..da3a6cb 100644 (file)
@@ -90,12 +90,23 @@ struct pcc_matcher {
        struct qstr     *pm_name;
 };
 
+enum pcc_dataset_flags {
+       PCC_DATASET_NONE        = 0x0,
+       /* Try auto attach at open, disabled by default */
+       PCC_DATASET_OPEN_ATTACH = 0x1,
+       /* PCC backend is only used for RW-PCC */
+       PCC_DATASET_RWPCC       = 0x2,
+       /* PCC backend is only used for RO-PCC */
+       PCC_DATASET_ROPCC       = 0x4,
+       /* PCC backend provides caching services for both RW-PCC and RO-PCC */
+       PCC_DATASET_PCC_ALL     = PCC_DATASET_RWPCC | PCC_DATASET_ROPCC,
+};
+
 struct pcc_dataset {
        __u32                   pccd_rwid;       /* Archive ID */
        __u32                   pccd_roid;       /* Readonly ID */
        struct pcc_match_rule   pccd_rule;       /* Match rule */
-       __u32                   pccd_rwonly:1, /* Only use as RW-PCC */
-                               pccd_roonly:1; /* Only use as RO-PCC */
+       enum pcc_dataset_flags  pccd_flags;      /* flags of PCC backend */
        char                    pccd_pathname[PATH_MAX]; /* full path */
        struct path             pccd_path;       /* Root path */
        struct list_head        pccd_linkage;  /* Linked to pccs_datasets */
@@ -104,7 +115,7 @@ struct pcc_dataset {
 
 struct pcc_super {
        /* Protect pccs_datasets */
-       spinlock_t               pccs_lock;
+       struct rw_semaphore      pccs_rw_sem;
        /* List of datasets */
        struct list_head         pccs_datasets;
        /* creds of process who forced instantiation of super block */
@@ -157,6 +168,7 @@ struct pcc_cmd {
                        __u32                    pccc_roid;
                        struct list_head         pccc_conds;
                        char                    *pccc_conds_str;
+                       enum pcc_dataset_flags   pccc_flags;
                } pccc_add;
                struct pcc_cmd_del {
                        __u32                    pccc_pad;
index 08e9dd1..ae535c0 100644 (file)
@@ -2044,6 +2044,7 @@ static int lov_object_layout_get(const struct lu_env *env,
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
        cl->cl_dom_comp_size = 0;
+       cl->cl_is_released = lsm->lsm_is_released;
        if (lsm_is_composite(lsm->lsm_magic)) {
                struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
 
index 7517005..d3dc578 100644 (file)
@@ -135,7 +135,8 @@ check_lpcc_state()
        local lustre_path="$1"
        local expected_state="$2"
        local facet=${3:-$SINGLEAGT}
-       local state=$(do_facet $facet $LFS pcc state $lustre_path |
+       local myRUNAS="$4"
+       local state=$(do_facet $facet $myRUNAS $LFS pcc state $lustre_path |
                        awk -F 'type: ' '{print $2}' | awk -F ',' '{print $1}')
 
        [[ "x$state" == "x$expected_state" ]] || error \
@@ -230,6 +231,7 @@ lpcc_rw_test() {
        check_hsm_flags $file "0x0000000d"
        check_lpcc_data $SINGLEAGT $lpcc_path $file "file_data"
 
+       echo "Restore testing..."
        if [ $CLIENTCOUNT -lt 2 -o $restore ]; then
                $LFS hsm_restore $file || error \
                        "failed to restore $file"
@@ -299,6 +301,7 @@ test_1e() {
 
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
+       $LCTL pcc list $MOUNT
        mkdir $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        chmod 777 $DIR/$tdir || error "chmod 777 $DIR/$tdir failed"
 
@@ -640,12 +643,11 @@ test_4() {
 run_test 4 "Auto cache test for mmap"
 
 test_5() {
-       local file=$DIR/$tdir/$tfile
+       local file=$DIR/$tfile
 
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        do_facet $SINGLEAGT "echo -n attach_mmap_data > $file" ||
                error "echo $file failed"
 
@@ -668,15 +670,35 @@ test_5() {
 }
 run_test 5 "Mmap & cat a RW-PCC cached file"
 
+setup_loopdev() {
+       local facet=$1
+       local file=$2
+       local mntpt=$3
+       local size=${4:-50}
+
+       do_facet $facet mkdir -p $mntpt || error "mkdir -p $hsm_root failed"
+       stack_trap "do_facet $facet rm -rf $mntpt" EXIT
+       do_facet $facet dd if=/dev/zero of=$file bs=1M count=$size
+       stack_trap "do_facet $facet rm -f $file" EXIT
+       do_facet $facet mkfs.ext4 $file ||
+               error "mkfs.ext4 $file failed"
+       do_facet $facet file $file
+       do_facet $facet mount -t ext4 -o loop,usrquota,grpquota $file $mntpt ||
+               error "mount -o loop,usrquota,grpquota $file $mntpt failed"
+       stack_trap "do_facet $facet $UMOUNT $mntpt" EXIT
+}
+
 test_6() {
-       local file=$DIR/$tdir/$tfile
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
        local content
 
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
-
        echo -n mmap_write_data > $file || error "echo write $file failed"
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
@@ -702,13 +724,16 @@ test_6() {
 run_test 6 "Test mmap write on RW-PCC "
 
 test_7a() {
-       local file=$DIR/$tdir/$tfile
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
        local content
 
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        echo "QQQQQ" > $file
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
@@ -729,14 +754,17 @@ test_7a() {
 run_test 7a "Fake file detached between fault() and page_mkwrite() for RW-PCC"
 
 test_7b() {
-       local file=$DIR/$tdir/$tfile
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
        local content
        local pid
 
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        echo "QQQQQ" > $file
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
@@ -762,13 +790,11 @@ test_7b() {
 run_test 7b "Test the race with concurrent mkwrite and detach"
 
 test_8() {
-       local file=$DIR/$tdir/$tfile
+       local file=$DIR/$tfile
 
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir -p $DIR/$tdir || error "mkdir $DIR/$tdir failed"
-
        echo "QQQQQ" > $file
        do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER $file ||
                error "failed to attach file $file"
@@ -785,24 +811,6 @@ test_8() {
 }
 run_test 8 "Test fake -ENOSPC tolerance for RW-PCC"
 
-setup_loopdev() {
-       local facet=$1
-       local file=$2
-       local mntpt=$3
-       local size=${4:-50}
-
-       do_facet $facet mkdir -p $mntpt || error "mkdir -p $hsm_root failed"
-       stack_trap "do_facet $facet rm -rf $mntpt" EXIT
-       do_facet $facet dd if=/dev/zero of=$file bs=1M count=$size
-       stack_trap "do_facet $facet rm -f $file" EXIT
-       do_facet $facet mkfs.ext4 $file ||
-               error "mkfs.ext4 $file failed"
-       do_facet $facet file $file
-       do_facet $facet mount -t ext4 -o loop,usrquota,grpquota $file $mntpt ||
-               error "mount -o loop,usrquota,grpquota $file $mntpt failed"
-       stack_trap "do_facet $facet $UMOUNT $mntpt" EXIT
-}
-
 test_9() {
        local loopfile="$TMP/$tfile"
        local mntpt="/mnt/pcc.9a"
@@ -889,16 +897,17 @@ test_10b() {
 run_test 10b "Test RW-PCC with group quota on loop PCC device"
 
 test_11() {
-       local file=$DIR/$tdir/$tfile
-       local hsm_root=$(hsm_root)
-       local file=$DIR/$tdir/$tfile
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tfile
        local -a lpcc_path
        local lpcc_dir
 
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
        copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
        setup_pcc_mapping
 
-       mkdir $DIR/$tdir || error "mkdir $DIR/$tdir failed"
        do_facet $SINGLEAGT "echo -n QQQQQ > $file"
        lpcc_path=$(lpcc_fid2path $hsm_root $file)
        lpcc_dir=$(dirname $lpcc_path)
@@ -995,8 +1004,6 @@ test_rule_id() {
        do_facet $SINGLEAGT $myRUNAS $LFS pcc detach $file ||
                error "failed to detach file $file"
        check_lpcc_state $file "none"
-
-       cleanup_pcc_mapping
 }
 
 test_13a() {
@@ -1049,8 +1056,6 @@ test_13b() {
                error "failed to dd write to $file"
        check_lpcc_state $file "none"
        rm $file || error "rm $file failed"
-
-       cleanup_pcc_mapping
 }
 run_test 13b "Test auto RW-PCC create caching for file name with wildcard"
 
@@ -1112,11 +1117,95 @@ test_13c() {
        do_facet $SINGLEAGT $LFS pcc detach $file ||
                error "failed to detach $file"
        rm $file || error "rm $file failed"
-
-       cleanup_pcc_mapping
 }
 run_test 13c "Check auto RW-PCC create caching for UID/GID/ProjID/fname rule"
 
+test_14() {
+       local file=$DIR/$tdir/$tfile
+
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping
+
+       mkdir -p $DIR/$tdir || error "mkdir -p $DIR/$tdir failed"
+       do_facet $SINGLEAGT "echo -n autodetach_data > $file"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
+               $file || error "PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+
+       # Revoke the layout lock, the PCC-cached file will be
+       # detached automatically.
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_file_data $SINGLEAGT $file "autodetach_data"
+       check_lpcc_state $file "none"
+}
+run_test 14 "Revocation of the layout lock should detach the file automatically"
+
+test_15() {
+       local loopfile="$TMP/$tfile"
+       local mntpt="/mnt/pcc.$tdir"
+       local hsm_root="$mntpt/$tdir"
+       local file=$DIR/$tdir/$tfile
+
+       setup_loopdev $SINGLEAGT $loopfile $mntpt 50
+       copytool setup -m "$MOUNT" -a "$HSM_ARCHIVE_NUMBER"
+       setup_pcc_mapping $SINGLEAGT \
+               "projid={100}\ rwid=$HSM_ARCHIVE_NUMBER\ open_attach=1"
+
+       mkdir $DIR/$tdir || error "mkdir $DIR/$tdir failed"
+       chmod 777 $DIR/$tdir || error "chmod 777 $DIR/$tdir failed"
+
+       echo "Check open attach for non-root user"
+       do_facet $SINGLEAGT $RUNAS dd if=/dev/zero of=$file bs=1024 count=1 ||
+               error "failed to dd write to $file"
+       do_facet $SINGLEAGT $RUNAS $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
+               $file || error "failed to attach file $file"
+       do_facet $SINGLEAGT $RUNAS $LFS pcc state $file
+       check_lpcc_state $file "readwrite" $SINGLEAGT "$RUNAS"
+       # Revoke the layout lock, the PCC-cached file will be
+       # detached automatically.
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_lpcc_state $file "readwrite" $SINGLEAGT "$RUNAS"
+       # Detach the file directly, as the file layout generation
+       # is not changed, so the file is still valid cached in PCC,
+       # and can be reused from PCC cache directly.
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
+               error "PCC detach $file failed"
+       check_lpcc_state $file "readwrite" $SINGLEAGT "$RUNAS"
+       do_facet $SINGLEAGT $RUNAS $LFS pcc detach $file ||
+               error "PCC detach $file failed"
+       rm $file || error "rm $file failed"
+
+       echo "check open attach for root user"
+       do_facet $SINGLEAGT "echo -n autoattach_data > $file"
+       do_facet $SINGLEAGT $LFS pcc attach -i $HSM_ARCHIVE_NUMBER \
+               $file || error "PCC attach $file failed"
+       check_lpcc_state $file "readwrite"
+
+       # Revoke the layout lock, the PCC-cached file will be
+       # detached automatically.
+       do_facet $SINGLEAGT $LCTL \
+               set_param ldlm.namespaces.*mdc*.lru_size=clear
+       check_file_data $SINGLEAGT $file "autoattach_data"
+       check_lpcc_state $file "readwrite"
+
+       # Detach the file directly, as the file layout generation
+       # is not changed, so the file is still valid cached in PCC,
+       # and can be reused from PCC cache directly.
+       do_facet $SINGLEAGT $LFS pcc detach $file ||
+               error "PCC detach $file failed"
+       check_lpcc_state $file "readwrite"
+       # HSM released exists archived status
+       check_hsm_flags $file "0x0000000d"
+       check_file_data $SINGLEAGT $file "autoattach_data"
+
+       $LFS hsm_restore $file || error "failed to restore $file"
+       wait_request_state $(path2fid $file) RESTORE SUCCEED
+       check_lpcc_state $file "none"
+}
+run_test 15 "Test auto attach at open when file is still valid cached"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status