}
}
-#define PCC_DISJUNCTION_DELIM (',')
-#define PCC_CONJUNCTION_DELIM ('&')
-#define PCC_EXPRESSION_DELIM ('=')
+#define PCC_DISJUNCTION_DELIM (",")
+#define PCC_CONJUNCTION_DELIM ("&")
+#define PCC_EXPRESSION_DELIM ("=")
static int
-pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
+pcc_fname_list_add(char *id, struct list_head *fname_list)
{
struct pcc_match_fname *fname;
if (fname == NULL)
return -ENOMEM;
- OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
+ OBD_ALLOC(fname->pmf_name, strlen(id) + 1);
if (fname->pmf_name == NULL) {
OBD_FREE_PTR(fname);
return -ENOMEM;
}
- memcpy(fname->pmf_name, id->ls_str, id->ls_len);
+ strcpy(fname->pmf_name, id);
list_add_tail(&fname->pmf_linkage, fname_list);
return 0;
}
static int
-pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
+pcc_fname_list_parse(char *str, struct list_head *fname_list)
{
- struct cfs_lstr src;
- struct cfs_lstr res;
int rc = 0;
ENTRY;
- src.ls_str = str;
- src.ls_len = len;
INIT_LIST_HEAD(fname_list);
- while (src.ls_str) {
- rc = cfs_gettok(&src, ' ', &res);
- if (rc == 0) {
- rc = -EINVAL;
- break;
- }
- rc = pcc_fname_list_add(&res, fname_list);
- if (rc)
- break;
+ while (rc == 0 && str) {
+ char *fname = strsep(&str, " ");
+
+ if (*fname)
+ rc = pcc_fname_list_add(fname, fname_list);
}
+ if (list_empty(fname_list))
+ rc = -EINVAL;
if (rc)
pcc_fname_list_free(fname_list);
RETURN(rc);
}
static int
-pcc_id_list_parse(char *str, int len, struct list_head *id_list,
+pcc_id_list_parse(char *str, struct list_head *id_list,
enum pcc_field type)
{
- struct cfs_lstr src;
- struct cfs_lstr res;
int rc = 0;
ENTRY;
type != PCC_FIELD_PROJID)
RETURN(-EINVAL);
- src.ls_str = str;
- src.ls_len = len;
INIT_LIST_HEAD(id_list);
- while (src.ls_str) {
+ while (str) {
+ char *num;
struct pcc_match_id *id;
- __u32 id_val;
-
- if (cfs_gettok(&src, ' ', &res) == 0)
- GOTO(out, rc = -EINVAL);
+ unsigned long id_val;
- if (!cfs_str2num_check(res.ls_str, res.ls_len,
- &id_val, 0, (u32)~0U))
- GOTO(out, rc = -EINVAL);
+ num = strsep(&str, " ");
+ if (!*num)
+ continue;
+ rc = kstrtoul(num, 0, &id_val);
+ if (rc)
+ GOTO(out, rc);
OBD_ALLOC_PTR(id);
if (id == NULL)
id->pmi_id = id_val;
list_add_tail(&id->pmi_linkage, id_list);
}
+ if (list_empty(id_list))
+ rc = -EINVAL;
out:
if (rc)
pcc_id_list_free(id_list);
RETURN(rc);
}
-static inline bool
-pcc_check_field(struct cfs_lstr *field, char *str)
-{
- int len = strlen(str);
-
- return (field->ls_len == len &&
- strncmp(field->ls_str, str, len) == 0);
-}
-
static int
-pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
+pcc_expression_parse(char *str, struct list_head *cond_list)
{
struct pcc_expression *expr;
- struct cfs_lstr field;
+ char *field;
+ int len;
int rc = 0;
OBD_ALLOC_PTR(expr);
if (expr == NULL)
return -ENOMEM;
- rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
- if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
- src->ls_str[src->ls_len - 1] != '}')
+ field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
+ if (!*field || !str)
+ /* No LHS or no '=' */
+ GOTO(out, rc = -EINVAL);
+ str = skip_spaces(str);
+ len = strlen(str);
+ if (str[0] != '{' || str[len - 1] != '}')
GOTO(out, rc = -EINVAL);
/* Skip '{' and '}' */
- src->ls_str++;
- src->ls_len -= 2;
+ str[len - 1] = '\0';
+ str += 1;
- if (pcc_check_field(&field, "uid")) {
- if (pcc_id_list_parse(src->ls_str,
- src->ls_len,
+ if (strcmp(field, "uid") == 0) {
+ if (pcc_id_list_parse(str,
&expr->pe_cond,
PCC_FIELD_UID) < 0)
GOTO(out, rc = -EINVAL);
expr->pe_field = PCC_FIELD_UID;
- } else if (pcc_check_field(&field, "gid")) {
- if (pcc_id_list_parse(src->ls_str,
- src->ls_len,
+ } else if (strcmp(field, "gid") == 0) {
+ if (pcc_id_list_parse(str,
&expr->pe_cond,
PCC_FIELD_GID) < 0)
GOTO(out, rc = -EINVAL);
expr->pe_field = PCC_FIELD_GID;
- } else if (pcc_check_field(&field, "projid")) {
- if (pcc_id_list_parse(src->ls_str,
- src->ls_len,
+ } else if (strcmp(field, "projid") == 0) {
+ if (pcc_id_list_parse(str,
&expr->pe_cond,
PCC_FIELD_PROJID) < 0)
GOTO(out, rc = -EINVAL);
expr->pe_field = PCC_FIELD_PROJID;
- } else if (pcc_check_field(&field, "fname")) {
- if (pcc_fname_list_parse(src->ls_str,
- src->ls_len,
- &expr->pe_cond) < 0)
+ } else if (strcmp(field, "fname") == 0) {
+ if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
GOTO(out, rc = -EINVAL);
expr->pe_field = PCC_FIELD_FNAME;
} else {
}
static int
-pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
+pcc_conjunction_parse(char *str, struct list_head *cond_list)
{
struct pcc_conjunction *conjunction;
- struct cfs_lstr expr;
int rc = 0;
OBD_ALLOC_PTR(conjunction);
INIT_LIST_HEAD(&conjunction->pc_expressions);
list_add_tail(&conjunction->pc_linkage, cond_list);
- while (src->ls_str) {
- rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
- if (rc == 0) {
- rc = -EINVAL;
- break;
- }
- rc = pcc_expression_parse(&expr,
- &conjunction->pc_expressions);
- if (rc)
- break;
+ while (rc == 0 && str) {
+ char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
+
+ rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
}
return rc;
}
-static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
+static int pcc_conds_parse(char *orig, struct list_head *cond_list)
{
- struct cfs_lstr src;
- struct cfs_lstr res;
+ char *str;
int rc = 0;
- src.ls_str = str;
- src.ls_len = len;
+ orig = kstrdup(orig, GFP_KERNEL);
+ if (!orig)
+ return -ENOMEM;
+ str = orig;
+
INIT_LIST_HEAD(cond_list);
- while (src.ls_str) {
- rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
- if (rc == 0) {
- rc = -EINVAL;
- break;
- }
- rc = pcc_conjunction_parse(&res, cond_list);
- if (rc)
- break;
+ while (rc == 0 && str) {
+ char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
+
+ rc = pcc_conjunction_parse(term, cond_list);
}
+ kfree(orig);
return rc;
}
memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
- strlen(cmd->u.pccc_add.pccc_conds_str),
&cmd->u.pccc_add.pccc_conds);
if (rc)
pcc_cmd_fini(cmd);
return -EINVAL;
/*
* By default, a PCC backend can provide caching service for
- * both RW-PCC and RO-PCC.
+ * both PCC-RW and PCC-RO.
*/
if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
- /* For RW-PCC, the value of @rwid must be non zero. */
- if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
- cmd->u.pccc_add.pccc_rwid == 0)
+ if (cmd->u.pccc_add.pccc_rwid == 0 &&
+ cmd->u.pccc_add.pccc_roid == 0)
return -EINVAL;
+ if (cmd->u.pccc_add.pccc_rwid == 0 &&
+ cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC)
+ cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
+
+ if (cmd->u.pccc_add.pccc_roid == 0 &&
+ cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
+ cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
+
break;
case PCC_DEL_DATASET:
case PCC_CLEAR_ALL:
INIT_LIST_HEAD(&rule->pmr_conds);
if (!list_empty(&cmd->u.pccc_add.pccc_conds))
rc = pcc_conds_parse(rule->pmr_conds_str,
- strlen(rule->pmr_conds_str),
- &rule->pmr_conds);
+ &rule->pmr_conds);
if (rc)
pcc_dataset_rule_fini(rule);
return rc;
}
-struct pcc_dataset *
+static struct pcc_dataset *
pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
{
struct pcc_dataset *dataset;
if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
!(dataset->pccd_flags & PCC_DATASET_RWPCC)))
continue;
+ if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
+ !(dataset->pccd_flags & PCC_DATASET_ROPCC)))
+ continue;
atomic_inc(&dataset->pccd_refcount);
selected = dataset;
break;
* reduce overhead:
* (fid->f_oid >> 16 & oxFFFF)/FID
*/
-#define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
+#define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
{
- return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
- DFID_NOBRACE,
- (fid)->f_oid & 0xFFFF,
- (fid)->f_oid >> 16 & 0xFFFF,
- (unsigned int)((fid)->f_seq & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
- PFID(fid));
+ return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
+ DFID_NOBRACE,
+ (fid)->f_oid & 0xFFFF,
+ (fid)->f_oid >> 16 & 0xFFFF,
+ (unsigned int)((fid)->f_seq & 0xFFFF),
+ (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
+ (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
+ (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
+ PFID(fid));
}
static inline const struct cred *pcc_super_cred(struct super_block *sb)
RETURN(PTR_ERR(env));
rc = cl_object_layout_get(env, lli->lli_clob, clt);
- if (rc)
+ if (rc < 0)
CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
PFID(ll_inode2fid(inode)));
cl_env_put(env, &refcheck);
- RETURN(rc);
-}
-
-static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
- struct pcc_dataset *dataset)
-{
- return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
- DFID_NOBRACE,
- dataset->pccd_pathname,
- (fid)->f_oid & 0xFFFF,
- (fid)->f_oid >> 16 & 0xFFFF,
- (unsigned int)((fid)->f_seq & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
- (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
- PFID(fid));
+ RETURN(rc < 0 ? rc : 0);
}
/* Must be called with pcci->pcci_lock held */
return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
}
+static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
+{
+ char *ptr = NULL, *component;
+ struct dentry *parent;
+ struct dentry *child = ERR_PTR(-ENOENT);
+
+ ptr = pathname;
+
+ /* move past any initial '/' to the start of the first path component*/
+ while (*ptr == '/')
+ ptr++;
+
+ /* store the start of the first path component */
+ component = ptr;
+
+ parent = dget(base);
+ while (ptr) {
+ /* find the start of the next component - if we don't find it,
+ * the current component is the last component
+ */
+ ptr = strchr(ptr, '/');
+ /* put a NUL char in place of the '/' before the next compnent
+ * so we can treat this component as a string; note the full
+ * path string is NUL terminated to this is not needed for the
+ * last component
+ */
+ if (ptr)
+ *ptr = '\0';
+
+ /* look up the current component */
+ inode_lock(parent->d_inode);
+ child = lookup_one_len(component, parent, strlen(component));
+ inode_unlock(parent->d_inode);
+
+ /* repair the path string: put '/' back in place of the NUL */
+ if (ptr)
+ *ptr = '/';
+
+ dput(parent);
+
+ if (IS_ERR_OR_NULL(child))
+ break;
+
+ /* we may find a cached negative dentry */
+ if (!d_is_positive(child)) {
+ dput(child);
+ child = NULL;
+ break;
+ }
+
+ /* descend in to the next level of the path */
+ parent = child;
+
+ /* move the pointer past the '/' to the next component */
+ if (ptr)
+ ptr++;
+ component = ptr;
+ }
+
+ /* NULL child means we didn't find anything */
+ if (!child)
+ child = ERR_PTR(-ENOENT);
+
+ return child;
+}
+
static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
enum lu_pcc_type type,
struct pcc_dataset *dataset,
struct ll_inode_info *lli = ll_i2info(inode);
struct pcc_inode *pcci = lli->lli_pcc_inode;
const struct cred *old_cred;
- struct dentry *pcc_dentry;
- struct path path;
- char *pathname;
+ struct dentry *pcc_dentry = NULL;
+ char pathname[PCC_DATASET_MAX_PATH];
__u32 pcc_gen;
int rc;
!(dataset->pccd_flags & PCC_DATASET_RWPCC))
RETURN(0);
- OBD_ALLOC(pathname, PATH_MAX);
- if (pathname == NULL)
- RETURN(-ENOMEM);
+ if (type == LU_PCC_READONLY &&
+ !(dataset->pccd_flags & PCC_DATASET_ROPCC))
+ RETURN(0);
- pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
+ rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
+ &lli->lli_fid);
old_cred = override_creds(pcc_super_cred(inode->i_sb));
- rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
- if (rc)
+ pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
+ if (IS_ERR(pcc_dentry)) {
+ rc = PTR_ERR(pcc_dentry);
+ CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
+ ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
+ pathname, rc);
/* ignore this error */
GOTO(out, rc = 0);
+ }
- pcc_dentry = path.dentry;
rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
&pcc_gen, sizeof(pcc_gen));
if (rc < 0)
/* ignore this error */
- GOTO(out_put_path, rc = 0);
+ GOTO(out_put_pcc_dentry, rc = 0);
rc = 0;
/* The file is still valid cached in PCC, attach it immediately. */
if (!pcci) {
OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
if (pcci == NULL)
- GOTO(out_put_path, rc = -ENOMEM);
+ GOTO(out_put_pcc_dentry, rc = -ENOMEM);
pcc_inode_init(pcci, lli);
dget(pcc_dentry);
pcc_layout_gen_set(pcci, gen);
*cached = true;
}
-out_put_path:
- path_put(&path);
+out_put_pcc_dentry:
+ dput(pcc_dentry);
out:
revert_creds(old_cred);
- OBD_FREE(pathname, PATH_MAX);
RETURN(rc);
}
if (clt.cl_is_released)
rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
LU_PCC_READWRITE, cached);
+ else if (clt.cl_is_rdonly)
+ rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
+ LU_PCC_READONLY, cached);
RETURN(rc);
}
struct ll_inode_info *lli = ll_i2info(inode);
struct pcc_super *super = ll_i2pccs(inode);
+ ENTRY;
+
/* Known the file was not in any PCC backend. */
if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
- return false;
+ RETURN(false);
/*
* lli_pcc_generation == 0 means that the file was never attached into
* immediately in pcc_try_auto_attach().
*/
if (super->pccs_generation != lli->lli_pcc_generation)
- return true;
+ RETURN(true);
/* The cached setting @lli_pcc_dsflags is valid */
if (iot == PIT_OPEN)
- return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
+ RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
if (iot == PIT_GETATTR)
- return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
+ RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
- return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
+ RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
}
int pcc_file_open(struct inode *inode, struct file *file)
if (!S_ISREG(inode->i_mode))
RETURN(0);
+ if (IS_ENCRYPTED(inode))
+ RETURN(0);
+
pcc_inode_lock(inode);
pcci = ll_i2pcci(inode);
RETURN_EXIT;
}
+/* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
+static bool pcc_io_tolerate(struct pcc_inode *pcci,
+ enum pcc_io_type iot, int rc)
+{
+ if (pcci->pcci_type == LU_PCC_READWRITE) {
+ if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
+ return false;
+ /* Handle the ->page_mkwrite failure tolerance separately
+ * in pcc_page_mkwrite().
+ */
+ } else if (pcci->pcci_type == LU_PCC_READONLY) {
+ if ((iot == PIT_READ || iot == PIT_GETATTR ||
+ iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM)
+ return false;
+ if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
+ !(rc & VM_FAULT_OOM))
+ return false;
+ }
+
+ return true;
+}
+
static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
{
struct pcc_inode *pcci;
pcci = ll_i2pcci(inode);
if (pcci && pcc_inode_has_layout(pcci)) {
LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
- atomic_inc(&pcci->pcci_active_ios);
- *cached = true;
+ if (pcci->pcci_type == LU_PCC_READONLY &&
+ (iot == PIT_WRITE || iot == PIT_SETATTR ||
+ iot == PIT_PAGE_MKWRITE)) {
+ /* Fall back to normal I/O path */
+ *cached = false;
+ /* For mmap write, we need to detach the file from
+ * RO-PCC, release the page got from ->fault(), and
+ * then retry the memory fault handling (->fault()
+ * and ->page_mkwrite()).
+ * These are done in pcc_page_mkwrite();
+ */
+ } else {
+ atomic_inc(&pcci->pcci_active_ios);
+ *cached = true;
+ }
} else {
*cached = false;
if (pcc_may_auto_attach(inode, iot)) {
pcc_inode_unlock(inode);
}
-static void pcc_io_fini(struct inode *inode)
+static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
+ int rc, bool *cached)
{
struct pcc_inode *pcci = ll_i2pcci(inode);
- LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
+ LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
+
+ *cached = pcc_io_tolerate(pcci, iot, rc);
if (atomic_dec_and_test(&pcci->pcci_active_ios))
- wake_up_all(&pcci->pcci_waitq);
+ wake_up(&pcci->pcci_waitq);
}
if (!*cached)
RETURN(0);
+ /* Fake I/O error on RO-PCC */
+ if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+ GOTO(out, result = -EIO);
+
iocb->ki_filp = pccf->pccf_file;
/* generic_file_aio_read does not support ext4-dax,
* __pcc_file_read_iter uses ->aio_read hook directly
*/
result = __pcc_file_read_iter(iocb, iter);
iocb->ki_filp = file;
-
- pcc_io_fini(inode);
+out:
+ pcc_io_fini(inode, PIT_READ, result, cached);
RETURN(result);
}
if (!*cached)
RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+ if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
GOTO(out, result = -ENOSPC);
iocb->ki_filp = pccf->pccf_file;
result = __pcc_file_write_iter(iocb, iter);
iocb->ki_filp = file;
out:
- pcc_io_fini(inode);
+ pcc_io_fini(inode, PIT_WRITE, result, cached);
RETURN(result);
}
pcc_dentry = pcci->pcci_path.dentry;
inode_lock(pcc_dentry->d_inode);
old_cred = override_creds(pcc_super_cred(inode->i_sb));
+#ifdef HAVE_USER_NAMESPACE_ARG
+ rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
+ &attr2);
+#else
rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
+#endif
revert_creds(old_cred);
inode_unlock(pcc_dentry->d_inode);
- pcc_io_fini(inode);
+ pcc_io_fini(inode, PIT_SETATTR, rc, cached);
RETURN(rc);
}
GOTO(out, rc);
ll_inode_size_lock(inode);
- if (ll_file_test_and_clear_flag(lli, LLIF_UPDATE_ATIME) ||
- inode->i_atime.tv_sec < lli->lli_atime)
- inode->i_atime.tv_sec = lli->lli_atime;
+ if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
+ inode_get_atime_sec(inode) < lli->lli_atime)
+ inode_set_atime(inode, lli->lli_atime, 0);
- inode->i_mtime.tv_sec = lli->lli_mtime;
- inode->i_ctime.tv_sec = lli->lli_ctime;
+ inode_set_mtime(inode, lli->lli_mtime, 0);
+ inode_set_ctime(inode, lli->lli_ctime, 0);
- atime = inode->i_atime.tv_sec;
- mtime = inode->i_mtime.tv_sec;
- ctime = inode->i_ctime.tv_sec;
+ atime = inode_get_atime_sec(inode);
+ mtime = inode_get_mtime_sec(inode);
+ ctime = inode_get_ctime_sec(inode);
if (atime < stat.atime.tv_sec)
atime = stat.atime.tv_sec;
i_size_write(inode, stat.size);
inode->i_blocks = stat.blocks;
- inode->i_atime.tv_sec = atime;
- inode->i_mtime.tv_sec = mtime;
- inode->i_ctime.tv_sec = ctime;
+ inode_set_atime(inode, atime, 0);
+ inode_set_mtime(inode, mtime, 0);
+ inode_set_ctime(inode, ctime, 0);
ll_inode_size_unlock(inode);
out:
- pcc_io_fini(inode);
+ pcc_io_fini(inode, PIT_GETATTR, rc, cached);
RETURN(rc);
}
+#ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
struct pipe_inode_info *pipe,
- size_t count, unsigned int flags,
- bool *cached)
+ size_t count, unsigned int flags)
{
struct inode *inode = file_inode(in_file);
struct ll_file_data *fd = in_file->private_data;
struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+ bool cached = false;
ssize_t result;
ENTRY;
- *cached = false;
if (!pcc_file)
- RETURN(0);
+ RETURN(default_file_splice_read(in_file, ppos, pipe,
+ count, flags));
- if (!file_inode(pcc_file)->i_fop->splice_read)
- RETURN(-ENOTSUPP);
+ pcc_io_init(inode, PIT_SPLICE_READ, &cached);
+ if (!cached)
+ RETURN(default_file_splice_read(in_file, ppos, pipe,
+ count, flags));
- pcc_io_init(inode, PIT_SPLICE_READ, cached);
- if (!*cached)
- RETURN(0);
+ result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
- result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
- ppos, pipe, count,
- flags);
-
- pcc_io_fini(inode);
+ pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
RETURN(result);
}
+#endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
int pcc_fsync(struct file *file, loff_t start, loff_t end,
int datasync, bool *cached)
{
struct inode *inode = file_inode(file);
struct ll_file_data *fd = file->private_data;
- struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+ struct pcc_file *pccf = &fd->fd_pcc_file;
+ struct file *pcc_file = pccf->pccf_file;
int rc;
ENTRY;
RETURN(0);
}
+ if (!S_ISREG(inode->i_mode)) {
+ *cached = false;
+ RETURN(0);
+ }
+
+ /*
+ * After the file is attached into RO-PCC, its dirty pages on this
+ * client may not be flushed. So fsync() should fall back to normal
+ * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
+ * copy is meaningless.
+ */
+ if (pccf->pccf_type == LU_PCC_READONLY) {
+ *cached = false;
+ RETURN(-EAGAIN);
+ }
+
pcc_io_init(inode, PIT_FSYNC, cached);
if (!*cached)
RETURN(0);
rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
start, end, datasync);
- pcc_io_fini(inode);
+ pcc_io_fini(inode, PIT_FSYNC, rc, cached);
RETURN(rc);
}
"%s: PCC backend fs not support ->page_mkwrite()\n",
ll_i2sbi(inode)->ll_fsname);
pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
- up_read(&mm->mmap_sem);
+ mmap_read_unlock(mm);
*cached = true;
RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
}
/* Pause to allow for a race with concurrent detach */
- OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
if (!*cached) {
* __do_page_fault and retry the memory fault handling.
*/
if (page->mapping == pcc_file->f_mapping) {
+ pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
*cached = true;
- up_read(&mm->mmap_sem);
+ mmap_read_unlock(mm);
RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
}
* This fault injection can also be used to simulate -ENOSPC and
* -EDQUOT failure of underlying PCC backend fs.
*/
- if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
- pcc_io_fini(inode);
- pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
- up_read(&mm->mmap_sem);
- RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
- }
+ if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
+ GOTO(out, rc = VM_FAULT_SIGBUS);
vma->vm_file = pcc_file;
#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
#endif
vma->vm_file = file;
- pcc_io_fini(inode);
+out:
+ pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
+
+ /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
+ * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
+ * Lustre I/O path.
+ */
+ if (rc & VM_FAULT_SIGBUS) {
+ pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
+ mmap_read_unlock(mm);
+ RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
+ }
RETURN(rc);
}
RETURN(0);
}
+ if (!S_ISREG(inode->i_mode)) {
+ *cached = false;
+ RETURN(0);
+ }
+
pcc_io_init(inode, PIT_FAULT, cached);
if (!*cached)
RETURN(0);
+ /* Tolerate the mmap read failure for RO-PCC */
+ if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
+ GOTO(out, rc = VM_FAULT_SIGBUS);
+
vma->vm_file = pcc_file;
#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
rc = pcc_vm_ops->fault(vmf);
rc = pcc_vm_ops->fault(vma, vmf);
#endif
vma->vm_file = file;
-
- pcc_io_fini(inode);
+out:
+ pcc_io_fini(inode, PIT_FAULT, rc, cached);
RETURN(rc);
}
static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
{
+ struct dentry *parent = dget_parent(pcc_dentry);
int rc;
- rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
+ rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry);
if (rc)
CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
+ dput(parent);
return rc;
}
if (d_is_positive(dentry))
goto out;
- rc = vfs_mkdir(dir, dentry, mode);
+ rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
if (rc) {
dput(dentry);
dentry = ERR_PTR(rc);
if (d_is_positive(dentry))
goto out;
- rc = vfs_create(dir, dentry, mode, false);
+ rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
if (rc) {
dput(dentry);
dentry = ERR_PTR(rc);
struct dentry *child;
int rc = 0;
- OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
+ OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
if (path == NULL)
return -ENOMEM;
- pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
+ pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
if (IS_ERR(base)) {
GOTO(out, rc);
}
- snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
+ snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
child = pcc_create(base, path, 0);
if (IS_ERR(child)) {
rc = PTR_ERR(child);
out_base:
dput(base);
out:
- OBD_FREE(path, MAX_PCC_DATABASE_PATH);
+ OBD_FREE(path, PCC_DATASET_MAX_PATH);
return rc;
}
* Reset uid, gid or size for the PCC copy masked by @valid.
* TODO: Set the project ID for PCC copy.
*/
-int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
- kuid_t uid, kgid_t gid, loff_t size)
+static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
+ kuid_t uid, kgid_t gid, loff_t size)
{
struct inode *inode = dentry->d_inode;
struct iattr attr;
attr.ia_size = size;
inode_lock(inode);
- rc = notify_change(dentry, &attr, NULL);
+ rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
inode_unlock(inode);
RETURN(rc);
int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
{
struct dentry *pcc_dentry = pca->pca_dentry;
- struct pcc_super *super = ll_i2pccs(inode);
const struct cred *old_cred;
+ struct pcc_super *super;
struct pcc_inode *pcci;
int rc;
if (!inode)
GOTO(out_dataset_put, rc = 0);
+ super = ll_i2pccs(inode);
+
LASSERT(pcc_dentry);
old_cred = override_creds(super->pccs_cred);
rc = pcc_layout_xattr_set(pcci, 0);
if (rc) {
- (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+ if (!pcci->pcci_unlinked)
+ (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
pcc_inode_put(pcci);
GOTO(out_unlock, rc);
}
return;
if (pca->pca_dentry) {
+ struct dentry *parent;
+ struct inode *i_dir;
const struct cred *old_cred;
int rc;
old_cred = override_creds(pcc_super_cred(sb));
- rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
- pca->pca_dentry);
+ parent = dget_parent(pca->pca_dentry);
+ i_dir = d_inode(parent);
+ rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry);
+ dput(parent);
if (rc)
CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
RETURN(rc);
}
-int pcc_readwrite_attach(struct file *file, struct inode *inode,
- __u32 archive_id)
+static int pcc_attach_data_archive(struct file *file, struct inode *inode,
+ struct pcc_dataset *dataset,
+ struct dentry **dentry)
{
- struct pcc_dataset *dataset;
- struct ll_inode_info *lli = ll_i2info(inode);
- struct pcc_super *super = ll_i2pccs(inode);
- struct pcc_inode *pcci;
const struct cred *old_cred;
- struct dentry *dentry;
struct file *pcc_filp;
struct path path;
ssize_t ret;
ENTRY;
- rc = pcc_attach_allowed_check(inode);
- if (rc)
- RETURN(rc);
-
- dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
- LU_PCC_READWRITE, archive_id);
- if (dataset == NULL)
- RETURN(-ENOENT);
-
- old_cred = override_creds(super->pccs_cred);
- rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
+ old_cred = override_creds(pcc_super_cred(inode->i_sb));
+ rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
if (rc)
- GOTO(out_dataset_put, rc);
+ GOTO(out_cred, rc);
path.mnt = dataset->pccd_path.mnt;
- path.dentry = dentry;
+ path.dentry = *dentry;
pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
if (IS_ERR_OR_NULL(pcc_filp)) {
rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
GOTO(out_dentry, rc);
}
- rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
+ rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
old_cred->uid, old_cred->gid, 0);
if (rc)
GOTO(out_fput, rc);
* copy after copy data. Otherwise, it may get wrong file size after
* re-attach a file. See LU-13023 for details.
*/
- rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
+ rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
KGIDT_INIT(0), ret);
+out_fput:
+ fput(pcc_filp);
+out_dentry:
+ if (rc) {
+ pcc_inode_remove(inode, *dentry);
+ dput(*dentry);
+ }
+out_cred:
+ revert_creds(old_cred);
+ RETURN(rc);
+}
+
+int pcc_readwrite_attach(struct file *file, struct inode *inode,
+ __u32 archive_id)
+{
+ struct pcc_dataset *dataset;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct pcc_super *super = ll_i2pccs(inode);
+ struct pcc_inode *pcci;
+ struct dentry *dentry;
+ int rc;
+
+ ENTRY;
+
+ rc = pcc_attach_allowed_check(inode);
if (rc)
- GOTO(out_fput, rc);
+ RETURN(rc);
+
+ dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
+ LU_PCC_READWRITE, archive_id);
+ if (dataset == NULL)
+ RETURN(-ENOENT);
+
+ rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+ if (rc)
+ GOTO(out_dataset_put, rc);
/* Pause to allow for a race with concurrent HSM remove */
- OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
pcc_inode_lock(inode);
pcci = ll_i2pcci(inode);
dentry, LU_PCC_READWRITE);
out_unlock:
pcc_inode_unlock(inode);
-out_fput:
- fput(pcc_filp);
-out_dentry:
if (rc) {
+ const struct cred *old_cred;
+
+ old_cred = override_creds(pcc_super_cred(inode->i_sb));
(void) pcc_inode_remove(inode, dentry);
+ revert_creds(old_cred);
dput(dentry);
}
out_dataset_put:
pcc_dataset_put(dataset);
- revert_creds(old_cred);
RETURN(rc);
}
out_put:
if (rc) {
- (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
+ if (!pcci->pcci_unlinked)
+ (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
pcc_inode_put(pcci);
}
out_unlock:
RETURN(rc);
}
+static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
+
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lu_extent ext = {
+ .e_start = 0,
+ .e_end = OBD_OBJECT_EOF,
+ };
+ struct cl_layout clt = {
+ .cl_layout_gen = 0,
+ .cl_is_released = false,
+ .cl_is_rdonly = false,
+ };
+ int retries = 0;
+ int rc;
+
+ ENTRY;
+
+repeat:
+ rc = pcc_get_layout_info(inode, &clt);
+ if (rc)
+ RETURN(rc);
+
+ /*
+ * For the HSM released file, restore the data first.
+ */
+ if (clt.cl_is_released) {
+ retries++;
+ if (retries > 2)
+ RETURN(-EBUSY);
+
+ if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
+ rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
+ if (rc) {
+ CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
+ PFID(&lli->lli_fid), rc);
+ RETURN(rc);
+ }
+ }
+ rc = ll_layout_refresh(inode, gen);
+ if (rc)
+ RETURN(rc);
+
+ goto repeat;
+ }
+
+
+ if (!clt.cl_is_rdonly) {
+ rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
+ &ext);
+ if (rc)
+ RETURN(rc);
+
+ rc = ll_layout_refresh(inode, gen);
+ if (rc)
+ RETURN(rc);
+ } else { /* Readonly layout */
+ *gen = clt.cl_layout_gen;
+ }
+
+ RETURN(rc);
+}
+
+static int pcc_readonly_ioctl_attach(struct file *file,
+ struct inode *inode,
+ struct lu_pcc_attach *attach)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct pcc_super *super = ll_i2pccs(inode);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ const struct cred *old_cred;
+ struct pcc_dataset *dataset;
+ struct pcc_inode *pcci;
+ struct dentry *dentry;
+ bool attached = false;
+ bool unlinked = false;
+ __u32 gen;
+ int rc;
+
+ ENTRY;
+
+ if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
+ RETURN(-EOPNOTSUPP);
+
+ rc = pcc_attach_allowed_check(inode);
+ if (rc)
+ RETURN(rc);
+
+ rc = pcc_layout_rdonly_set(inode, &gen);
+ if (rc)
+ RETURN(rc);
+
+ dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
+ LU_PCC_READONLY, attach->pcca_id);
+ if (dataset == NULL)
+ RETURN(-ENOENT);
+
+ rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
+ if (rc)
+ GOTO(out_dataset_put, rc);
+
+ mutex_lock(&lli->lli_layout_mutex);
+ pcc_inode_lock(inode);
+ old_cred = override_creds(super->pccs_cred);
+ lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
+ if (gen != ll_layout_version_get(lli))
+ GOTO(out_put_unlock, rc = -ESTALE);
+
+ pcci = ll_i2pcci(inode);
+ if (!pcci) {
+ OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
+ if (pcci == NULL)
+ GOTO(out_put_unlock, rc = -ENOMEM);
+
+ pcc_inode_attach_set(super, dataset, lli, pcci,
+ dentry, LU_PCC_READONLY);
+ } else {
+ atomic_inc(&pcci->pcci_refcount);
+ path_put(&pcci->pcci_path);
+ pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
+ pcci->pcci_path.dentry = dentry;
+ pcci->pcci_type = LU_PCC_READONLY;
+ }
+ attached = true;
+ rc = pcc_layout_xattr_set(pcci, gen);
+ if (rc) {
+ pcci->pcci_type = LU_PCC_NONE;
+ unlinked = pcci->pcci_unlinked;
+ GOTO(out_put_unlock, rc);
+ }
+
+ pcc_layout_gen_set(pcci, gen);
+out_put_unlock:
+ if (rc) {
+ if (!unlinked)
+ (void) pcc_inode_remove(inode, dentry);
+ if (attached)
+ pcc_inode_put(pcci);
+ else
+ dput(dentry);
+ }
+ revert_creds(old_cred);
+ pcc_inode_unlock(inode);
+ mutex_unlock(&lli->lli_layout_mutex);
+out_dataset_put:
+ pcc_dataset_put(dataset);
+
+ RETURN(rc);
+}
+
+int pcc_ioctl_attach(struct file *file, struct inode *inode,
+ struct lu_pcc_attach *attach)
+{
+ int rc = 0;
+
+ ENTRY;
+
+ switch (attach->pcca_type) {
+ case LU_PCC_READWRITE:
+ rc = -EOPNOTSUPP;
+ break;
+ case LU_PCC_READONLY:
+ rc = pcc_readonly_ioctl_attach(file, inode, attach);
+ break;
+ default:
+ rc = -EINVAL;
+ break;
+ }
+
+ RETURN(rc);
+}
+
static int pcc_hsm_remove(struct inode *inode)
{
struct hsm_user_request *hur;
{
struct ll_inode_info *lli = ll_i2info(inode);
struct pcc_inode *pcci;
+ const struct cred *old_cred;
bool hsm_remove = false;
int rc = 0;
__pcc_layout_invalidate(pcci);
pcc_inode_put(pcci);
+ } else if (pcci->pcci_type == LU_PCC_READONLY) {
+ __pcc_layout_invalidate(pcci);
+
+ if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
+ old_cred = override_creds(pcc_super_cred(inode->i_sb));
+ rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
+ revert_creds(old_cred);
+ if (!rc)
+ pcci->pcci_unlinked = true;
+ }
+
+ pcc_inode_put(pcci);
+ } else {
+ rc = -EOPNOTSUPP;
}
out_unlock:
pcc_inode_unlock(inode);
if (hsm_remove) {
- const struct cred *old_cred;
-
old_cred = override_creds(pcc_super_cred(inode->i_sb));
rc = pcc_hsm_remove(inode);
revert_creds(old_cred);