X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fpcc.c;h=b80173e768ec9f76240f1ceab4df5cc69fab0b4d;hp=2de2715f6265f0767b8059e7e5db10731dbf45f6;hb=HEAD;hpb=f172b116885753d0f316549a2fb9d451e9b4bd2e diff --git a/lustre/llite/pcc.c b/lustre/llite/pcc.c index 2de2715..472325d 100644 --- a/lustre/llite/pcc.c +++ b/lustre/llite/pcc.c @@ -112,29 +112,610 @@ struct kmem_cache *pcc_inode_slab; -void pcc_super_init(struct pcc_super *super) +int pcc_super_init(struct pcc_super *super) { - spin_lock_init(&super->pccs_lock); + struct cred *cred; + + super->pccs_cred = cred = prepare_creds(); + if (!cred) + return -ENOMEM; + + /* Never override disk quota limits or use reserved space */ + cap_lower(cred->cap_effective, CAP_SYS_RESOURCE); + init_rwsem(&super->pccs_rw_sem); INIT_LIST_HEAD(&super->pccs_datasets); + super->pccs_generation = 1; + + return 0; +} + +/* Rule based auto caching */ +static void pcc_id_list_free(struct list_head *id_list) +{ + struct pcc_match_id *id, *n; + + list_for_each_entry_safe(id, n, id_list, pmi_linkage) { + list_del_init(&id->pmi_linkage); + OBD_FREE_PTR(id); + } +} + +static void pcc_fname_list_free(struct list_head *fname_list) +{ + struct pcc_match_fname *fname, *n; + + list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) { + OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1); + list_del_init(&fname->pmf_linkage); + OBD_FREE_PTR(fname); + } +} + +static void pcc_expression_free(struct pcc_expression *expr) +{ + LASSERT(expr->pe_field >= PCC_FIELD_UID && + expr->pe_field < PCC_FIELD_MAX); + switch (expr->pe_field) { + case PCC_FIELD_UID: + case PCC_FIELD_GID: + case PCC_FIELD_PROJID: + pcc_id_list_free(&expr->pe_cond); + break; + case PCC_FIELD_FNAME: + pcc_fname_list_free(&expr->pe_cond); + break; + default: + LBUG(); + } + OBD_FREE_PTR(expr); +} + +static void pcc_conjunction_free(struct pcc_conjunction *conjunction) +{ + struct pcc_expression *expression, *n; + + LASSERT(list_empty(&conjunction->pc_linkage)); + list_for_each_entry_safe(expression, n, + &conjunction->pc_expressions, + pe_linkage) { + list_del_init(&expression->pe_linkage); + pcc_expression_free(expression); + } + OBD_FREE_PTR(conjunction); +} + +static void pcc_rule_conds_free(struct list_head *cond_list) +{ + struct pcc_conjunction *conjunction, *n; + + list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) { + list_del_init(&conjunction->pc_linkage); + pcc_conjunction_free(conjunction); + } +} + +static void pcc_cmd_fini(struct pcc_cmd *cmd) +{ + if (cmd->pccc_cmd == PCC_ADD_DATASET) { + if (!list_empty(&cmd->u.pccc_add.pccc_conds)) + pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds); + if (cmd->u.pccc_add.pccc_conds_str) + OBD_FREE(cmd->u.pccc_add.pccc_conds_str, + strlen(cmd->u.pccc_add.pccc_conds_str) + 1); + } +} + +#define PCC_DISJUNCTION_DELIM (",") +#define PCC_CONJUNCTION_DELIM ("&") +#define PCC_EXPRESSION_DELIM ("=") + +static int +pcc_fname_list_add(char *id, struct list_head *fname_list) +{ + struct pcc_match_fname *fname; + + OBD_ALLOC_PTR(fname); + if (fname == NULL) + return -ENOMEM; + + OBD_ALLOC(fname->pmf_name, strlen(id) + 1); + if (fname->pmf_name == NULL) { + OBD_FREE_PTR(fname); + return -ENOMEM; + } + + strcpy(fname->pmf_name, id); + list_add_tail(&fname->pmf_linkage, fname_list); + return 0; +} + +static int +pcc_fname_list_parse(char *str, struct list_head *fname_list) +{ + int rc = 0; + + ENTRY; + + INIT_LIST_HEAD(fname_list); + while (rc == 0 && str) { + char *fname = strsep(&str, " "); + + if (*fname) + rc = pcc_fname_list_add(fname, fname_list); + } + if (list_empty(fname_list)) + rc = -EINVAL; + if (rc) + pcc_fname_list_free(fname_list); + RETURN(rc); +} + +static int +pcc_id_list_parse(char *str, struct list_head *id_list, + enum pcc_field type) +{ + int rc = 0; + + ENTRY; + + if (type != PCC_FIELD_UID && type != PCC_FIELD_GID && + type != PCC_FIELD_PROJID) + RETURN(-EINVAL); + + INIT_LIST_HEAD(id_list); + while (str) { + char *num; + struct pcc_match_id *id; + unsigned long id_val; + + num = strsep(&str, " "); + if (!*num) + continue; + rc = kstrtoul(num, 0, &id_val); + if (rc) + GOTO(out, rc); + + OBD_ALLOC_PTR(id); + if (id == NULL) + GOTO(out, rc = -ENOMEM); + + id->pmi_id = id_val; + list_add_tail(&id->pmi_linkage, id_list); + } + if (list_empty(id_list)) + rc = -EINVAL; +out: + if (rc) + pcc_id_list_free(id_list); + RETURN(rc); +} + +static int +pcc_expression_parse(char *str, struct list_head *cond_list) +{ + struct pcc_expression *expr; + char *field; + int len; + int rc = 0; + + OBD_ALLOC_PTR(expr); + if (expr == NULL) + return -ENOMEM; + + field = strim(strsep(&str, PCC_EXPRESSION_DELIM)); + if (!*field || !str) + /* No LHS or no '=' */ + GOTO(out, rc = -EINVAL); + str = skip_spaces(str); + len = strlen(str); + if (str[0] != '{' || str[len - 1] != '}') + GOTO(out, rc = -EINVAL); + + /* Skip '{' and '}' */ + str[len - 1] = '\0'; + str += 1; + + if (strcmp(field, "uid") == 0) { + if (pcc_id_list_parse(str, + &expr->pe_cond, + PCC_FIELD_UID) < 0) + GOTO(out, rc = -EINVAL); + expr->pe_field = PCC_FIELD_UID; + } else if (strcmp(field, "gid") == 0) { + if (pcc_id_list_parse(str, + &expr->pe_cond, + PCC_FIELD_GID) < 0) + GOTO(out, rc = -EINVAL); + expr->pe_field = PCC_FIELD_GID; + } else if (strcmp(field, "projid") == 0) { + if (pcc_id_list_parse(str, + &expr->pe_cond, + PCC_FIELD_PROJID) < 0) + GOTO(out, rc = -EINVAL); + expr->pe_field = PCC_FIELD_PROJID; + } else if (strcmp(field, "fname") == 0) { + if (pcc_fname_list_parse(str, &expr->pe_cond) < 0) + GOTO(out, rc = -EINVAL); + expr->pe_field = PCC_FIELD_FNAME; + } else { + GOTO(out, rc = -EINVAL); + } + + list_add_tail(&expr->pe_linkage, cond_list); + return 0; +out: + OBD_FREE_PTR(expr); + return rc; +} + +static int +pcc_conjunction_parse(char *str, struct list_head *cond_list) +{ + struct pcc_conjunction *conjunction; + int rc = 0; + + OBD_ALLOC_PTR(conjunction); + if (conjunction == NULL) + return -ENOMEM; + + INIT_LIST_HEAD(&conjunction->pc_expressions); + list_add_tail(&conjunction->pc_linkage, cond_list); + + while (rc == 0 && str) { + char *expr = strsep(&str, PCC_CONJUNCTION_DELIM); + + rc = pcc_expression_parse(expr, &conjunction->pc_expressions); + } + return rc; +} + +static int pcc_conds_parse(char *orig, struct list_head *cond_list) +{ + char *str; + int rc = 0; + + orig = kstrdup(orig, GFP_KERNEL); + if (!orig) + return -ENOMEM; + str = orig; + + INIT_LIST_HEAD(cond_list); + while (rc == 0 && str) { + char *term = strsep(&str, PCC_DISJUNCTION_DELIM); + + rc = pcc_conjunction_parse(term, cond_list); + } + kfree(orig); + return rc; +} + +static int pcc_id_parse(struct pcc_cmd *cmd, const char *id) +{ + int rc; + + OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1); + if (cmd->u.pccc_add.pccc_conds_str == NULL) + return -ENOMEM; + + memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id)); + + rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str, + &cmd->u.pccc_add.pccc_conds); + if (rc) + pcc_cmd_fini(cmd); + + return rc; +} + +static int +pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer) +{ + char *key, *val; + unsigned long id; + int rc; + + val = buffer; + key = strsep(&val, "="); + if (val == NULL || strlen(val) == 0) + return -EINVAL; + + /* Key of the value pair */ + if (strcmp(key, "rwid") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id <= 0) + return -EINVAL; + cmd->u.pccc_add.pccc_rwid = id; + } else if (strcmp(key, "roid") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id <= 0) + return -EINVAL; + cmd->u.pccc_add.pccc_roid = id; + } else if (strcmp(key, "auto_attach") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id == 0) + cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH; + } else if (strcmp(key, "open_attach") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id == 0) + cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH; + } else if (strcmp(key, "io_attach") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id == 0) + cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH; + } else if (strcmp(key, "stat_attach") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id == 0) + cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH; + } else if (strcmp(key, "rwpcc") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id > 0) + cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC; + } else if (strcmp(key, "ropcc") == 0) { + rc = kstrtoul(val, 10, &id); + if (rc) + return rc; + if (id > 0) + cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC; + } else { + return -EINVAL; + } + + return 0; +} + +static int +pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer) +{ + char *val; + char *token; + int rc; + + switch (cmd->pccc_cmd) { + case PCC_ADD_DATASET: + /* Enable auto attach by default */ + cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH; + break; + case PCC_DEL_DATASET: + case PCC_CLEAR_ALL: + break; + default: + return -EINVAL; + } + + val = buffer; + while (val != NULL && strlen(val) != 0) { + token = strsep(&val, " "); + rc = pcc_parse_value_pair(cmd, token); + if (rc) + return rc; + } + + switch (cmd->pccc_cmd) { + case PCC_ADD_DATASET: + if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC && + cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC) + return -EINVAL; + /* + * By default, a PCC backend can provide caching service for + * both PCC-RW and PCC-RO. + */ + if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0) + cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL; + + if (cmd->u.pccc_add.pccc_rwid == 0 && + cmd->u.pccc_add.pccc_roid == 0) + return -EINVAL; + + if (cmd->u.pccc_add.pccc_rwid == 0 && + cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC) + cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid; + + if (cmd->u.pccc_add.pccc_roid == 0 && + cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC) + cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid; + + break; + case PCC_DEL_DATASET: + case PCC_CLEAR_ALL: + break; + default: + return -EINVAL; + } + return 0; +} + +static void +pcc_dataset_rule_fini(struct pcc_match_rule *rule) +{ + if (!list_empty(&rule->pmr_conds)) + pcc_rule_conds_free(&rule->pmr_conds); + LASSERT(rule->pmr_conds_str != NULL); + OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1); +} + +static int +pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd) +{ + int rc = 0; + + LASSERT(cmd->u.pccc_add.pccc_conds_str); + OBD_ALLOC(rule->pmr_conds_str, + strlen(cmd->u.pccc_add.pccc_conds_str) + 1); + if (rule->pmr_conds_str == NULL) + return -ENOMEM; + + memcpy(rule->pmr_conds_str, + cmd->u.pccc_add.pccc_conds_str, + strlen(cmd->u.pccc_add.pccc_conds_str)); + + INIT_LIST_HEAD(&rule->pmr_conds); + if (!list_empty(&cmd->u.pccc_add.pccc_conds)) + rc = pcc_conds_parse(rule->pmr_conds_str, + &rule->pmr_conds); + + if (rc) + pcc_dataset_rule_fini(rule); + + return rc; +} + +/* Rule Matching */ +static int +pcc_id_list_match(struct list_head *id_list, __u32 id_val) +{ + struct pcc_match_id *id; + + list_for_each_entry(id, id_list, pmi_linkage) { + if (id->pmi_id == id_val) + return 1; + } + return 0; +} + +static bool +cfs_match_wildcard(const char *pattern, const char *content) +{ + if (*pattern == '\0' && *content == '\0') + return true; + + if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0') + return false; + + while (*pattern == *content) { + pattern++; + content++; + if (*pattern == '\0' && *content == '\0') + return true; + + if (*pattern == '*' && *(pattern + 1) != '\0' && + *content == '\0') + return false; + } + + if (*pattern == '*') + return (cfs_match_wildcard(pattern + 1, content) || + cfs_match_wildcard(pattern, content + 1)); + + return false; +} + +static int +pcc_fname_list_match(struct list_head *fname_list, const char *name) +{ + struct pcc_match_fname *fname; + + list_for_each_entry(fname, fname_list, pmf_linkage) { + if (cfs_match_wildcard(fname->pmf_name, name)) + return 1; + } + return 0; +} + +static int +pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher) +{ + switch (expr->pe_field) { + case PCC_FIELD_UID: + return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid); + case PCC_FIELD_GID: + return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid); + case PCC_FIELD_PROJID: + return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid); + case PCC_FIELD_FNAME: + return pcc_fname_list_match(&expr->pe_cond, + matcher->pm_name->name); + default: + return 0; + } +} + +static int +pcc_conjunction_match(struct pcc_conjunction *conjunction, + struct pcc_matcher *matcher) +{ + struct pcc_expression *expr; + int matched; + + list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) { + matched = pcc_expression_match(expr, matcher); + if (!matched) + return 0; + } + + return 1; +} + +static int +pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher) +{ + struct pcc_conjunction *conjunction; + int matched; + + list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) { + matched = pcc_conjunction_match(conjunction, matcher); + if (matched) + return 1; + } + + return 0; +} + +struct pcc_dataset* +pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher) +{ + struct pcc_dataset *dataset; + struct pcc_dataset *selected = NULL; + + down_read(&super->pccs_rw_sem); + list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) { + if (!(dataset->pccd_flags & PCC_DATASET_RWPCC)) + continue; + + if (pcc_cond_match(&dataset->pccd_rule, matcher)) { + atomic_inc(&dataset->pccd_refcount); + selected = dataset; + break; + } + } + up_read(&super->pccs_rw_sem); + if (selected) + CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n", + dataset->pccd_rule.pmr_conds_str, + matcher->pm_uid, matcher->pm_gid, + matcher->pm_projid, matcher->pm_name->name); + + return selected; } /** * pcc_dataset_add - Add a Cache policy to control which files need be * cached and where it will be cached. * - * @super: superblock of pcc - * @pathname: root path of pcc - * @id: HSM archive ID - * @projid: files with specified project ID will be cached. + * @super: superblock of pcc + * @cmd: pcc command */ static int -pcc_dataset_add(struct pcc_super *super, const char *pathname, - __u32 archive_id, __u32 projid) +pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd) { - int rc; + char *pathname = cmd->pccc_pathname; struct pcc_dataset *dataset; struct pcc_dataset *tmp; bool found = false; + int rc; OBD_ALLOC_PTR(dataset); if (dataset == NULL) @@ -146,20 +727,31 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname, return rc; } strncpy(dataset->pccd_pathname, pathname, PATH_MAX); - dataset->pccd_id = archive_id; - dataset->pccd_projid = projid; + dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid; + dataset->pccd_roid = cmd->u.pccc_add.pccc_roid; + dataset->pccd_flags = cmd->u.pccc_add.pccc_flags; atomic_set(&dataset->pccd_refcount, 1); - spin_lock(&super->pccs_lock); + rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd); + if (rc) { + pcc_dataset_put(dataset); + return rc; + } + + down_write(&super->pccs_rw_sem); list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) { - if (tmp->pccd_id == archive_id) { + if (strcmp(tmp->pccd_pathname, pathname) == 0 || + (dataset->pccd_rwid != 0 && + dataset->pccd_rwid == tmp->pccd_rwid) || + (dataset->pccd_roid != 0 && + dataset->pccd_roid == tmp->pccd_roid)) { found = true; break; } } if (!found) list_add(&dataset->pccd_linkage, &super->pccs_datasets); - spin_unlock(&super->pccs_lock); + up_write(&super->pccs_rw_sem); if (found) { pcc_dataset_put(dataset); @@ -169,33 +761,35 @@ pcc_dataset_add(struct pcc_super *super, const char *pathname, return rc; } -struct pcc_dataset * -pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id) +static struct pcc_dataset * +pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id) { struct pcc_dataset *dataset; struct pcc_dataset *selected = NULL; - if (projid == 0 && archive_id == 0) + if (id == 0) return NULL; /* - * archive ID is unique in the list, projid might be duplicate, + * archive ID (read-write ID) or read-only ID is unique in the list, * we just return last added one as first priority. */ - spin_lock(&super->pccs_lock); + down_read(&super->pccs_rw_sem); list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) { - if (projid && dataset->pccd_projid != projid) + if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id || + !(dataset->pccd_flags & PCC_DATASET_RWPCC))) continue; - if (archive_id && dataset->pccd_id != archive_id) + if (type == LU_PCC_READONLY && (dataset->pccd_roid != id || + !(dataset->pccd_flags & PCC_DATASET_ROPCC))) continue; atomic_inc(&dataset->pccd_refcount); selected = dataset; break; } - spin_unlock(&super->pccs_lock); + up_read(&super->pccs_rw_sem); if (selected) - CDEBUG(D_CACHE, "matched projid %u, PCC create\n", - selected->pccd_projid); + CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type); + return selected; } @@ -203,6 +797,7 @@ void pcc_dataset_put(struct pcc_dataset *dataset) { if (atomic_dec_and_test(&dataset->pccd_refcount)) { + pcc_dataset_rule_fini(&dataset->pccd_rule); path_put(&dataset->pccd_path); OBD_FREE_PTR(dataset); } @@ -215,17 +810,18 @@ pcc_dataset_del(struct pcc_super *super, char *pathname) struct pcc_dataset *dataset; int rc = -ENOENT; - spin_lock(&super->pccs_lock); + down_write(&super->pccs_rw_sem); list_for_each_safe(l, tmp, &super->pccs_datasets) { dataset = list_entry(l, struct pcc_dataset, pccd_linkage); if (strcmp(dataset->pccd_pathname, pathname) == 0) { - list_del(&dataset->pccd_linkage); + list_del_init(&dataset->pccd_linkage); pcc_dataset_put(dataset); + super->pccs_generation++; rc = 0; break; } } - spin_unlock(&super->pccs_lock); + up_write(&super->pccs_rw_sem); return rc; } @@ -233,8 +829,9 @@ static void pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m) { seq_printf(m, "%s:\n", dataset->pccd_pathname); - seq_printf(m, " rwid: %u\n", dataset->pccd_id); - seq_printf(m, " autocache: projid=%u\n", dataset->pccd_projid); + seq_printf(m, " rwid: %u\n", dataset->pccd_rwid); + seq_printf(m, " flags: %x\n", dataset->pccd_flags); + seq_printf(m, " autocache: %s\n", dataset->pccd_rule.pmr_conds_str); } int @@ -242,25 +839,33 @@ pcc_super_dump(struct pcc_super *super, struct seq_file *m) { struct pcc_dataset *dataset; - spin_lock(&super->pccs_lock); + down_read(&super->pccs_rw_sem); list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) { pcc_dataset_dump(dataset, m); } - spin_unlock(&super->pccs_lock); + up_read(&super->pccs_rw_sem); return 0; } -void pcc_super_fini(struct pcc_super *super) +static void pcc_remove_datasets(struct pcc_super *super) { struct pcc_dataset *dataset, *tmp; + down_write(&super->pccs_rw_sem); list_for_each_entry_safe(dataset, tmp, &super->pccs_datasets, pccd_linkage) { list_del(&dataset->pccd_linkage); pcc_dataset_put(dataset); } + super->pccs_generation++; + up_write(&super->pccs_rw_sem); } +void pcc_super_fini(struct pcc_super *super) +{ + pcc_remove_datasets(super); + put_cred(super->pccs_cred); +} static bool pathname_is_valid(const char *pathname) { @@ -277,7 +882,6 @@ pcc_cmd_parse(char *buffer, unsigned long count) static struct pcc_cmd *cmd; char *token; char *val; - unsigned long tmp; int rc = 0; OBD_ALLOC_PTR(cmd); @@ -311,28 +915,35 @@ pcc_cmd_parse(char *buffer, unsigned long count) cmd->pccc_pathname = token; if (cmd->pccc_cmd == PCC_ADD_DATASET) { - /* archive ID */ - token = strsep(&val, " "); - if (val == NULL) + /* List of ID */ + LASSERT(val); + token = val; + val = strrchr(token, '}'); + if (!val) GOTO(out_free_cmd, rc = -EINVAL); - rc = kstrtoul(token, 10, &tmp); - if (rc != 0) + /* Skip '}' */ + val++; + if (*val == '\0') { + val = NULL; + } else if (*val == ' ') { + *val = '\0'; + val++; + } else { GOTO(out_free_cmd, rc = -EINVAL); - if (tmp == 0) - GOTO(out_free_cmd, rc = -EINVAL); - cmd->u.pccc_add.pccc_id = tmp; + } - token = val; - rc = kstrtoul(token, 10, &tmp); - if (rc != 0) - GOTO(out_free_cmd, rc = -EINVAL); - if (tmp == 0) - GOTO(out_free_cmd, rc = -EINVAL); - cmd->u.pccc_add.pccc_projid = tmp; - } + rc = pcc_id_parse(cmd, token); + if (rc) + GOTO(out_free_cmd, rc); + rc = pcc_parse_value_pairs(cmd, val); + if (rc) + GOTO(out_cmd_fini, rc = -EINVAL); + } goto out; +out_cmd_fini: + pcc_cmd_fini(cmd); out_free_cmd: OBD_FREE_PTR(cmd); out: @@ -353,21 +964,20 @@ int pcc_cmd_handle(char *buffer, unsigned long count, switch (cmd->pccc_cmd) { case PCC_ADD_DATASET: - rc = pcc_dataset_add(super, cmd->pccc_pathname, - cmd->u.pccc_add.pccc_id, - cmd->u.pccc_add.pccc_projid); + rc = pcc_dataset_add(super, cmd); break; case PCC_DEL_DATASET: rc = pcc_dataset_del(super, cmd->pccc_pathname); break; case PCC_CLEAR_ALL: - pcc_super_fini(super); + pcc_remove_datasets(super); break; default: rc = -EINVAL; break; } + pcc_cmd_fini(cmd); OBD_FREE_PTR(cmd); return rc; } @@ -382,17 +992,25 @@ static inline void pcc_inode_unlock(struct inode *inode) mutex_unlock(&ll_i2info(inode)->lli_pcc_lock); } -static void pcc_inode_init(struct pcc_inode *pcci) +static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli) { + pcci->pcci_lli = lli; + lli->lli_pcc_inode = pcci; atomic_set(&pcci->pcci_refcount, 0); pcci->pcci_type = LU_PCC_NONE; + pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE; + atomic_set(&pcci->pcci_active_ios, 0); + init_waitqueue_head(&pcci->pcci_waitq); } static void pcc_inode_fini(struct pcc_inode *pcci) { + struct ll_inode_info *lli = pcci->pcci_lli; + path_put(&pcci->pcci_path); pcci->pcci_type = LU_PCC_NONE; OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab); + lli->lli_pcc_inode = NULL; } static void pcc_inode_get(struct pcc_inode *pcci) @@ -408,13 +1026,11 @@ static void pcc_inode_put(struct pcc_inode *pcci) void pcc_inode_free(struct inode *inode) { - struct ll_inode_info *lli = ll_i2info(inode); - struct pcc_inode *pcci = lli->lli_pcc_inode; + struct pcc_inode *pcci = ll_i2pcci(inode); if (pcci) { WARN_ON(atomic_read(&pcci->pcci_refcount) > 1); pcc_inode_put(pcci); - lli->lli_pcc_inode = NULL; } } @@ -424,18 +1040,23 @@ void pcc_inode_free(struct inode *inode) * reduce overhead: * (fid->f_oid >> 16 & oxFFFF)/FID */ -#define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1) +#define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1) static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid) { - return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/" - DFID_NOBRACE, - (fid)->f_oid & 0xFFFF, - (fid)->f_oid >> 16 & 0xFFFF, - (unsigned int)((fid)->f_seq & 0xFFFF), - (unsigned int)((fid)->f_seq >> 16 & 0xFFFF), - (unsigned int)((fid)->f_seq >> 32 & 0xFFFF), - (unsigned int)((fid)->f_seq >> 48 & 0xFFFF), - PFID(fid)); + return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/" + DFID_NOBRACE, + (fid)->f_oid & 0xFFFF, + (fid)->f_oid >> 16 & 0xFFFF, + (unsigned int)((fid)->f_seq & 0xFFFF), + (unsigned int)((fid)->f_seq >> 16 & 0xFFFF), + (unsigned int)((fid)->f_seq >> 32 & 0xFFFF), + (unsigned int)((fid)->f_seq >> 48 & 0xFFFF), + PFID(fid)); +} + +static inline const struct cred *pcc_super_cred(struct super_block *sb) +{ + return ll_s2sbi(sb)->ll_pcc_super.pccs_cred; } void pcc_file_init(struct pcc_file *pccf) @@ -444,14 +1065,427 @@ void pcc_file_init(struct pcc_file *pccf) pccf->pccf_type = LU_PCC_NONE; } +static inline bool pcc_auto_attach_enabled(enum pcc_dataset_flags flags, + enum pcc_io_type iot) +{ + if (iot == PIT_OPEN) + return flags & PCC_DATASET_OPEN_ATTACH; + if (iot == PIT_GETATTR) + return flags & PCC_DATASET_STAT_ATTACH; + else + return flags & PCC_DATASET_AUTO_ATTACH; +} + +static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout"; + +static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen) +{ + struct dentry *pcc_dentry = pcci->pcci_path.dentry; + struct ll_inode_info *lli = pcci->pcci_lli; + int rc; + + ENTRY; + + if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH)) + RETURN(0); + + rc = ll_vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout, + &gen, sizeof(gen), 0); + + RETURN(rc); +} + +static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt) +{ + struct lu_env *env; + struct ll_inode_info *lli = ll_i2info(inode); + __u16 refcheck; + int rc; + + ENTRY; + + if (!lli->lli_clob) + RETURN(-EINVAL); + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + rc = cl_object_layout_get(env, lli->lli_clob, clt); + if (rc < 0) + CDEBUG(D_INODE, "Cannot get layout for "DFID"\n", + PFID(ll_inode2fid(inode))); + + cl_env_put(env, &refcheck); + RETURN(rc < 0 ? rc : 0); +} + +/* Must be called with pcci->pcci_lock held */ +static void pcc_inode_attach_init(struct pcc_dataset *dataset, + struct pcc_inode *pcci, + struct dentry *dentry, + enum lu_pcc_type type) +{ + pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt); + pcci->pcci_path.dentry = dentry; + LASSERT(atomic_read(&pcci->pcci_refcount) == 0); + atomic_set(&pcci->pcci_refcount, 1); + pcci->pcci_type = type; + pcci->pcci_attr_valid = false; +} + +static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli, + struct pcc_dataset *dataset) +{ + lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation; + lli->lli_pcc_dsflags = dataset->pccd_flags; +} + +static void pcc_inode_attach_set(struct pcc_super *super, + struct pcc_dataset *dataset, + struct ll_inode_info *lli, + struct pcc_inode *pcci, + struct dentry *dentry, + enum lu_pcc_type type) +{ + pcc_inode_init(pcci, lli); + pcc_inode_attach_init(dataset, pcci, dentry, type); + down_read(&super->pccs_rw_sem); + pcc_inode_dsflags_set(lli, dataset); + up_read(&super->pccs_rw_sem); +} + +static inline void pcc_layout_gen_set(struct pcc_inode *pcci, + __u32 gen) +{ + pcci->pcci_layout_gen = gen; +} + +static inline bool pcc_inode_has_layout(struct pcc_inode *pcci) +{ + return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE; +} + +static struct dentry *pcc_lookup(struct dentry *base, char *pathname) +{ + char *ptr = NULL, *component; + struct dentry *parent; + struct dentry *child = ERR_PTR(-ENOENT); + + ptr = pathname; + + /* move past any initial '/' to the start of the first path component*/ + while (*ptr == '/') + ptr++; + + /* store the start of the first path component */ + component = ptr; + + parent = dget(base); + while (ptr) { + /* find the start of the next component - if we don't find it, + * the current component is the last component + */ + ptr = strchr(ptr, '/'); + /* put a NUL char in place of the '/' before the next compnent + * so we can treat this component as a string; note the full + * path string is NUL terminated to this is not needed for the + * last component + */ + if (ptr) + *ptr = '\0'; + + /* look up the current component */ + inode_lock(parent->d_inode); + child = lookup_one_len(component, parent, strlen(component)); + inode_unlock(parent->d_inode); + + /* repair the path string: put '/' back in place of the NUL */ + if (ptr) + *ptr = '/'; + + dput(parent); + + if (IS_ERR_OR_NULL(child)) + break; + + /* we may find a cached negative dentry */ + if (!d_is_positive(child)) { + dput(child); + child = NULL; + break; + } + + /* descend in to the next level of the path */ + parent = child; + + /* move the pointer past the '/' to the next component */ + if (ptr) + ptr++; + component = ptr; + } + + /* NULL child means we didn't find anything */ + if (!child) + child = ERR_PTR(-ENOENT); + + return child; +} + +static int pcc_try_dataset_attach(struct inode *inode, __u32 gen, + enum lu_pcc_type type, + struct pcc_dataset *dataset, + bool *cached) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct pcc_inode *pcci = lli->lli_pcc_inode; + const struct cred *old_cred; + struct dentry *pcc_dentry = NULL; + char pathname[PCC_DATASET_MAX_PATH]; + __u32 pcc_gen; + int rc; + + ENTRY; + + if (type == LU_PCC_READWRITE && + !(dataset->pccd_flags & PCC_DATASET_RWPCC)) + RETURN(0); + + if (type == LU_PCC_READONLY && + !(dataset->pccd_flags & PCC_DATASET_ROPCC)) + RETURN(0); + + rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH, + &lli->lli_fid); + + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname); + if (IS_ERR(pcc_dentry)) { + rc = PTR_ERR(pcc_dentry); + CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n", + ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid), + pathname, rc); + /* ignore this error */ + GOTO(out, rc = 0); + } + + rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout, + &pcc_gen, sizeof(pcc_gen)); + if (rc < 0) + /* ignore this error */ + GOTO(out_put_pcc_dentry, rc = 0); + + rc = 0; + /* The file is still valid cached in PCC, attach it immediately. */ + if (pcc_gen == gen) { + CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n", + PFID(&lli->lli_fid), gen); + if (!pcci) { + OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); + if (pcci == NULL) + GOTO(out_put_pcc_dentry, rc = -ENOMEM); + + pcc_inode_init(pcci, lli); + dget(pcc_dentry); + pcc_inode_attach_init(dataset, pcci, pcc_dentry, type); + } else { + /* + * This happened when a file was once attached into + * PCC, and some processes keep this file opened + * (pcci->refcount > 1) and corresponding PCC file + * without any I/O activity, and then this file was + * detached by the manual detach command or the + * revocation of the layout lock (i.e. cached LRU lock + * shrinking). + */ + pcc_inode_get(pcci); + pcci->pcci_type = type; + } + pcc_inode_dsflags_set(lli, dataset); + pcc_layout_gen_set(pcci, gen); + *cached = true; + } +out_put_pcc_dentry: + dput(pcc_dentry); +out: + revert_creds(old_cred); + RETURN(rc); +} + +static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot, + __u32 gen, enum lu_pcc_type type, + bool *cached) +{ + struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super; + struct ll_inode_info *lli = ll_i2info(inode); + struct pcc_dataset *dataset = NULL, *tmp; + int rc = 0; + + ENTRY; + + down_read(&super->pccs_rw_sem); + list_for_each_entry_safe(dataset, tmp, + &super->pccs_datasets, pccd_linkage) { + if (!pcc_auto_attach_enabled(dataset->pccd_flags, iot)) + break; + + rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached); + if (rc < 0 || (!rc && *cached)) + break; + } + + /* + * Update the saved dataset flags for the inode accordingly if failed. + */ + if (!rc && !*cached) { + /* + * Currently auto attach strategy for a PCC backend is + * unchangeable once once it was added into the PCC datasets on + * a client as the support to change auto attach strategy is + * not implemented yet. + */ + /* + * If tried to attach from one PCC backend: + * @lli_pcc_generation > 0: + * 1) The file was once attached into PCC, but now the + * corresponding PCC backend should be removed from the client; + * 2) The layout generation was changed, the data has been + * restored; + * 3) The corresponding PCC copy is not existed on PCC + * @lli_pcc_generation == 0: + * The file is never attached into PCC but in a HSM released + * state, or once attached into PCC but the inode was evicted + * from icache later. + * Set the saved dataset flags with PCC_DATASET_NONE. Then this + * file will skip from the candidates to try auto attach until + * the file is attached into PCC again. + * + * If the file was never attached into PCC, or once attached but + * its inode was evicted from icache (lli_pcc_generation == 0), + * or the corresponding dataset was removed from the client, + * set the saved dataset flags with PCC_DATASET_NONE. + * + * TODO: If the file was once attached into PCC but not try to + * auto attach due to the change of the configuration parameters + * for this dataset (i.e. change from auto attach enabled to + * auto attach disabled for this dataset), update the saved + * dataset flags with the found one. + */ + lli->lli_pcc_dsflags = PCC_DATASET_NONE; + } + up_read(&super->pccs_rw_sem); + + RETURN(rc); +} + +/* + * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606). + * Thus the client can get archive ID from the layout directly. When try to + * attach the file automatically which is in HSM released state (according to + * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is + * valid cached on PCC more precisely according to the @rwid (archive ID) in + * the PCC dataset and the archive ID in HSM attrs. + */ +static int pcc_try_auto_attach(struct inode *inode, bool *cached, + enum pcc_io_type iot) +{ + struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super; + struct cl_layout clt = { + .cl_layout_gen = 0, + .cl_is_released = false, + }; + struct ll_inode_info *lli = ll_i2info(inode); + __u32 gen; + int rc; + + ENTRY; + + /* + * Quick check whether there is PCC device. + */ + if (list_empty(&super->pccs_datasets)) + RETURN(0); + + /* + * The file layout lock was cancelled. And this open does not + * obtain valid layout lock from MDT (i.e. the file is being + * HSM restoring). + */ + if (iot == PIT_OPEN) { + if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE) + RETURN(0); + } else { + rc = ll_layout_refresh(inode, &gen); + if (rc) + RETURN(rc); + } + + rc = pcc_get_layout_info(inode, &clt); + if (rc) + RETURN(rc); + + if (iot != PIT_OPEN && gen != clt.cl_layout_gen) { + CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n", + PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen); + RETURN(-EINVAL); + } + + if (clt.cl_is_released) + rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen, + LU_PCC_READWRITE, cached); + else if (clt.cl_is_rdonly) + rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen, + LU_PCC_READONLY, cached); + + RETURN(rc); +} + +static inline bool pcc_may_auto_attach(struct inode *inode, + enum pcc_io_type iot) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct pcc_super *super = ll_i2pccs(inode); + + ENTRY; + + /* Known the file was not in any PCC backend. */ + if (lli->lli_pcc_dsflags & PCC_DATASET_NONE) + RETURN(false); + + /* + * lli_pcc_generation == 0 means that the file was never attached into + * PCC, or may be once attached into PCC but detached as the inode is + * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or + * icache shrinking due to the memory pressure), which will cause the + * file detach from PCC when releasing the inode from icache. + * In either case, we still try to attach. + */ + /* lli_pcc_generation == 0, or the PCC setting was changed, + * or there is no PCC setup on the client and the try will return + * immediately in pcc_try_auto_attach(). + */ + if (super->pccs_generation != lli->lli_pcc_generation) + RETURN(true); + + /* The cached setting @lli_pcc_dsflags is valid */ + if (iot == PIT_OPEN) + RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH); + + if (iot == PIT_GETATTR) + RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH); + + RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH); +} + int pcc_file_open(struct inode *inode, struct file *file) { struct pcc_inode *pcci; - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_file_data *fd = file->private_data; struct pcc_file *pccf = &fd->fd_pcc_file; struct file *pcc_file; struct path *path; - struct qstr *dname; + bool cached = false; int rc = 0; ENTRY; @@ -459,27 +1493,34 @@ int pcc_file_open(struct inode *inode, struct file *file) if (!S_ISREG(inode->i_mode)) RETURN(0); + if (IS_ENCRYPTED(inode)) + RETURN(0); + pcc_inode_lock(inode); pcci = ll_i2pcci(inode); - if (!pcci) - GOTO(out_unlock, rc = 0); - if (atomic_read(&pcci->pcci_refcount) == 0) + if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING) GOTO(out_unlock, rc = 0); + if (!pcci || !pcc_inode_has_layout(pcci)) { + if (pcc_may_auto_attach(inode, PIT_OPEN)) + rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN); + + if (rc < 0 || !cached) + GOTO(out_unlock, rc); + + if (!pcci) + pcci = ll_i2pcci(inode); + } + pcc_inode_get(pcci); WARN_ON(pccf->pccf_file); path = &pcci->pcci_path; - dname = &path->dentry->d_name; - CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len, - dname->name); -#ifdef HAVE_DENTRY_OPEN_USE_PATH - pcc_file = dentry_open(path, file->f_flags, current_cred()); -#else - pcc_file = dentry_open(path->dentry, path->mnt, - file->f_flags, current_cred()); -#endif + CDEBUG(D_CACHE, "opening pcc file '%pd'\n", path->dentry); + + pcc_file = dentry_open(path, file->f_flags, + pcc_super_cred(inode->i_sb)); if (IS_ERR_OR_NULL(pcc_file)) { rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file); pcc_inode_put(pcci); @@ -496,40 +1537,146 @@ out_unlock: void pcc_file_release(struct inode *inode, struct file *file) { struct pcc_inode *pcci; - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_file_data *fd = file->private_data; struct pcc_file *pccf; struct path *path; - struct qstr *dname; - ENTRY; + ENTRY; + + if (!S_ISREG(inode->i_mode) || fd == NULL) + RETURN_EXIT; + + pccf = &fd->fd_pcc_file; + pcc_inode_lock(inode); + if (pccf->pccf_file == NULL) + goto out; + + pcci = ll_i2pcci(inode); + LASSERT(pcci); + path = &pcci->pcci_path; + CDEBUG(D_CACHE, "releasing pcc file \"%pd\"\n", path->dentry); + pcc_inode_put(pcci); + fput(pccf->pccf_file); + pccf->pccf_file = NULL; +out: + pcc_inode_unlock(inode); + RETURN_EXIT; +} + +/* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */ +static bool pcc_io_tolerate(struct pcc_inode *pcci, + enum pcc_io_type iot, int rc) +{ + if (pcci->pcci_type == LU_PCC_READWRITE) { + if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT)) + return false; + /* Handle the ->page_mkwrite failure tolerance separately + * in pcc_page_mkwrite(). + */ + } else if (pcci->pcci_type == LU_PCC_READONLY) { + if ((iot == PIT_READ || iot == PIT_GETATTR || + iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM) + return false; + if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) && + !(rc & VM_FAULT_OOM)) + return false; + } + + return true; +} + +static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached) +{ + struct pcc_inode *pcci; + + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + if (pcci && pcc_inode_has_layout(pcci)) { + LASSERT(atomic_read(&pcci->pcci_refcount) > 0); + if (pcci->pcci_type == LU_PCC_READONLY && + (iot == PIT_WRITE || iot == PIT_SETATTR || + iot == PIT_PAGE_MKWRITE)) { + /* Fall back to normal I/O path */ + *cached = false; + /* For mmap write, we need to detach the file from + * RO-PCC, release the page got from ->fault(), and + * then retry the memory fault handling (->fault() + * and ->page_mkwrite()). + * These are done in pcc_page_mkwrite(); + */ + } else { + atomic_inc(&pcci->pcci_active_ios); + *cached = true; + } + } else { + *cached = false; + if (pcc_may_auto_attach(inode, iot)) { + (void) pcc_try_auto_attach(inode, cached, iot); + if (*cached) { + pcci = ll_i2pcci(inode); + LASSERT(atomic_read(&pcci->pcci_refcount) > 0); + atomic_inc(&pcci->pcci_active_ios); + } + } + } + pcc_inode_unlock(inode); +} + +static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot, + int rc, bool *cached) +{ + struct pcc_inode *pcci = ll_i2pcci(inode); + + LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached); + + *cached = pcc_io_tolerate(pcci, iot, rc); + if (atomic_dec_and_test(&pcci->pcci_active_ios)) + wake_up(&pcci->pcci_waitq); +} + + +static ssize_t +__pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter) +{ + struct file *file = iocb->ki_filp; + +#ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER + return file->f_op->read_iter(iocb, iter); +#else + struct iovec iov; + struct iov_iter i; + ssize_t bytes = 0; - if (!S_ISREG(inode->i_mode) || fd == NULL) - RETURN_EXIT; + iov_for_each(iov, i, *iter) { + ssize_t res; - pccf = &fd->fd_pcc_file; - pcc_inode_lock(inode); - if (pccf->pccf_file == NULL) - goto out; + res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos); + if (-EIOCBQUEUED == res) + res = wait_on_sync_kiocb(iocb); + if (res <= 0) { + if (bytes == 0) + bytes = res; + break; + } - pcci = ll_i2pcci(inode); - LASSERT(pcci); - path = &pcci->pcci_path; - dname = &path->dentry->d_name; - CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len, - dname->name); - pcc_inode_put(pcci); - fput(pccf->pccf_file); - pccf->pccf_file = NULL; -out: - pcc_inode_unlock(inode); + bytes += res; + if (res < iov.iov_len) + break; + } + + if (bytes > 0) + iov_iter_advance(iter, bytes); + return bytes; +#endif } ssize_t pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter, bool *cached) { struct file *file = iocb->ki_filp; - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_file_data *fd = file->private_data; struct pcc_file *pccf = &fd->fd_pcc_file; + struct inode *inode = file_inode(file); ssize_t result; ENTRY; @@ -538,12 +1685,24 @@ ssize_t pcc_file_read_iter(struct kiocb *iocb, *cached = false; RETURN(0); } - *cached = true; - iocb->ki_filp = pccf->pccf_file; - result = generic_file_read_iter(iocb, iter); - iocb->ki_filp = file; + pcc_io_init(inode, PIT_READ, cached); + if (!*cached) + RETURN(0); + /* Fake I/O error on RO-PCC */ + if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR)) + GOTO(out, result = -EIO); + + iocb->ki_filp = pccf->pccf_file; + /* generic_file_aio_read does not support ext4-dax, + * __pcc_file_read_iter uses ->aio_read hook directly + * to add support for ext4-dax. + */ + result = __pcc_file_read_iter(iocb, iter); + iocb->ki_filp = file; +out: + pcc_io_fini(inode, PIT_READ, result, cached); RETURN(result); } @@ -586,8 +1745,9 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter, bool *cached) { struct file *file = iocb->ki_filp; - struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + struct ll_file_data *fd = file->private_data; struct pcc_file *pccf = &fd->fd_pcc_file; + struct inode *inode = file_inode(file); ssize_t result; ENTRY; @@ -596,10 +1756,18 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb, *cached = false; RETURN(0); } - *cached = true; - if (pccf->pccf_type != LU_PCC_READWRITE) - RETURN(-EWOULDBLOCK); + if (pccf->pccf_type != LU_PCC_READWRITE) { + *cached = false; + RETURN(-EAGAIN); + } + + pcc_io_init(inode, PIT_WRITE, cached); + if (!*cached) + RETURN(0); + + if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR)) + GOTO(out, result = -ENOSPC); iocb->ki_filp = pccf->pccf_file; @@ -609,6 +1777,8 @@ ssize_t pcc_file_write_iter(struct kiocb *iocb, */ result = __pcc_file_write_iter(iocb, iter); iocb->ki_filp = file; +out: + pcc_io_fini(inode, PIT_WRITE, result, cached); RETURN(result); } @@ -616,9 +1786,10 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr, bool *cached) { int rc; - struct pcc_inode *pcci; + const struct cred *old_cred; struct iattr attr2 = *attr; struct dentry *pcc_dentry; + struct pcc_inode *pcci; ENTRY; @@ -627,28 +1798,35 @@ int pcc_inode_setattr(struct inode *inode, struct iattr *attr, RETURN(0); } - pcc_inode_lock(inode); - pcci = ll_i2pcci(inode); - if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0) - GOTO(out_unlock, rc = 0); + pcc_io_init(inode, PIT_SETATTR, cached); + if (!*cached) + RETURN(0); - *cached = true; attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME | ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET | - ATTR_CTIME); + ATTR_CTIME | ATTR_UID | ATTR_GID); + pcci = ll_i2pcci(inode); pcc_dentry = pcci->pcci_path.dentry; inode_lock(pcc_dentry->d_inode); + old_cred = override_creds(pcc_super_cred(inode->i_sb)); +#ifdef HAVE_USER_NAMESPACE_ARG + rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry, + &attr2); +#else rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2); +#endif + revert_creds(old_cred); inode_unlock(pcc_dentry->d_inode); -out_unlock: - pcc_inode_unlock(inode); + + pcc_io_fini(inode, PIT_SETATTR, rc, cached); RETURN(rc); } -int pcc_inode_getattr(struct inode *inode, bool *cached) +int pcc_inode_getattr(struct inode *inode, u32 request_mask, + unsigned int flags, bool *cached) { struct ll_inode_info *lli = ll_i2info(inode); - struct pcc_inode *pcci; + const struct cred *old_cred; struct kstat stat; s64 atime; s64 mtime; @@ -662,28 +1840,28 @@ int pcc_inode_getattr(struct inode *inode, bool *cached) RETURN(0); } - pcc_inode_lock(inode); - pcci = ll_i2pcci(inode); - if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0) - GOTO(out_unlock, rc = 0); + pcc_io_init(inode, PIT_GETATTR, cached); + if (!*cached) + RETURN(0); - *cached = true; - rc = ll_vfs_getattr(&pcci->pcci_path, &stat); + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat, request_mask, + flags); + revert_creds(old_cred); if (rc) - GOTO(out_unlock, rc); + GOTO(out, rc); ll_inode_size_lock(inode); - if (inode->i_atime.tv_sec < lli->lli_atime || - lli->lli_update_atime) { - inode->i_atime.tv_sec = lli->lli_atime; - lli->lli_update_atime = 0; - } - inode->i_mtime.tv_sec = lli->lli_mtime; - inode->i_ctime.tv_sec = lli->lli_ctime; + if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) || + inode_get_atime_sec(inode) < lli->lli_atime) + inode_set_atime(inode, lli->lli_atime, 0); + + inode_set_mtime(inode, lli->lli_mtime, 0); + inode_set_ctime(inode, lli->lli_ctime, 0); - atime = inode->i_atime.tv_sec; - mtime = inode->i_mtime.tv_sec; - ctime = inode->i_ctime.tv_sec; + atime = inode_get_atime_sec(inode); + mtime = inode_get_mtime_sec(inode); + ctime = inode_get_ctime_sec(inode); if (atime < stat.atime.tv_sec) atime = stat.atime.tv_sec; @@ -697,17 +1875,350 @@ int pcc_inode_getattr(struct inode *inode, bool *cached) i_size_write(inode, stat.size); inode->i_blocks = stat.blocks; - inode->i_atime.tv_sec = atime; - inode->i_mtime.tv_sec = mtime; - inode->i_ctime.tv_sec = ctime; + inode_set_atime(inode, atime, 0); + inode_set_mtime(inode, mtime, 0); + inode_set_ctime(inode, ctime, 0); ll_inode_size_unlock(inode); +out: + pcc_io_fini(inode, PIT_GETATTR, rc, cached); + RETURN(rc); +} -out_unlock: +#ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT +ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos, + struct pipe_inode_info *pipe, + size_t count, unsigned int flags) +{ + struct inode *inode = file_inode(in_file); + struct ll_file_data *fd = in_file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + bool cached = false; + ssize_t result; + + ENTRY; + + if (!pcc_file) + RETURN(default_file_splice_read(in_file, ppos, pipe, + count, flags)); + + pcc_io_init(inode, PIT_SPLICE_READ, &cached); + if (!cached) + RETURN(default_file_splice_read(in_file, ppos, pipe, + count, flags)); + + result = default_file_splice_read(pcc_file, ppos, pipe, count, flags); + + pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached); + RETURN(result); +} +#endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */ + +int pcc_fsync(struct file *file, loff_t start, loff_t end, + int datasync, bool *cached) +{ + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct pcc_file *pccf = &fd->fd_pcc_file; + struct file *pcc_file = pccf->pccf_file; + int rc; + + ENTRY; + + if (!pcc_file) { + *cached = false; + RETURN(0); + } + + if (!S_ISREG(inode->i_mode)) { + *cached = false; + RETURN(0); + } + + /* + * After the file is attached into RO-PCC, its dirty pages on this + * client may not be flushed. So fsync() should fall back to normal + * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC + * copy is meaningless. + */ + if (pccf->pccf_type == LU_PCC_READONLY) { + *cached = false; + RETURN(-EAGAIN); + } + + pcc_io_init(inode, PIT_FSYNC, cached); + if (!*cached) + RETURN(0); + + rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, + start, end, datasync); + + pcc_io_fini(inode, PIT_FSYNC, rc, cached); + RETURN(rc); +} + +int pcc_file_mmap(struct file *file, struct vm_area_struct *vma, + bool *cached) +{ + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + struct pcc_inode *pcci; + int rc = 0; + + ENTRY; + + if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) { + *cached = false; + RETURN(0); + } + + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + if (pcci && pcc_inode_has_layout(pcci)) { + LASSERT(atomic_read(&pcci->pcci_refcount) > 1); + *cached = true; + vma->vm_file = pcc_file; + rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma); + vma->vm_file = file; + /* Save the vm ops of backend PCC */ + vma->vm_private_data = (void *)vma->vm_ops; + } else { + *cached = false; + } + pcc_inode_unlock(inode); + + RETURN(rc); +} + +void pcc_vm_open(struct vm_area_struct *vma) +{ + struct pcc_inode *pcci; + struct file *file = vma->vm_file; + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data; + + ENTRY; + + if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open) + RETURN_EXIT; + + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + if (pcci && pcc_inode_has_layout(pcci)) { + vma->vm_file = pcc_file; + pcc_vm_ops->open(vma); + vma->vm_file = file; + } + pcc_inode_unlock(inode); + EXIT; +} + +void pcc_vm_close(struct vm_area_struct *vma) +{ + struct file *file = vma->vm_file; + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data; + + ENTRY; + + if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close) + RETURN_EXIT; + + pcc_inode_lock(inode); + /* Layout lock maybe revoked here */ + vma->vm_file = pcc_file; + pcc_vm_ops->close(vma); + vma->vm_file = file; pcc_inode_unlock(inode); + EXIT; +} + +int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf, + bool *cached) +{ + struct page *page = vmf->page; + struct mm_struct *mm = vma->vm_mm; + struct file *file = vma->vm_file; + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data; + int rc; + + ENTRY; + + if (!pcc_file || !pcc_vm_ops) { + *cached = false; + RETURN(0); + } + + if (!pcc_vm_ops->page_mkwrite && + page->mapping == pcc_file->f_mapping) { + CDEBUG(D_MMAP, + "%s: PCC backend fs not support ->page_mkwrite()\n", + ll_i2sbi(inode)->ll_fsname); + pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE); + mmap_read_unlock(mm); + *cached = true; + RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE); + } + /* Pause to allow for a race with concurrent detach */ + CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val); + + pcc_io_init(inode, PIT_PAGE_MKWRITE, cached); + if (!*cached) { + /* This happens when the file is detached from PCC after got + * the fault page via ->fault() on the inode of the PCC copy. + * Here it can not simply fall back to normal Lustre I/O path. + * The reason is that the address space of fault page used by + * ->page_mkwrite() is still the one of PCC inode. In the + * normal Lustre ->page_mkwrite() I/O path, it will be wrongly + * handled as the address space of the fault page is not + * consistent with the one of the Lustre inode (though the + * fault page was truncated). + * As the file is detached from PCC, the fault page must + * be released frist, and retry the mmap write (->fault() and + * ->page_mkwrite). + * We use an ugly and tricky method by returning + * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller + * __do_page_fault and retry the memory fault handling. + */ + if (page->mapping == pcc_file->f_mapping) { + pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE); + *cached = true; + mmap_read_unlock(mm); + RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE); + } + + RETURN(0); + } + + /* + * This fault injection can also be used to simulate -ENOSPC and + * -EDQUOT failure of underlying PCC backend fs. + */ + if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) + GOTO(out, rc = VM_FAULT_SIGBUS); + + vma->vm_file = pcc_file; +#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY + rc = pcc_vm_ops->page_mkwrite(vmf); +#else + rc = pcc_vm_ops->page_mkwrite(vma, vmf); +#endif + vma->vm_file = file; + +out: + pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached); + + /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns + * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal + * Lustre I/O path. + */ + if (rc & VM_FAULT_SIGBUS) { + pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE); + mmap_read_unlock(mm); + RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE); + } + RETURN(rc); +} + +int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf, + bool *cached) +{ + struct file *file = vma->vm_file; + struct inode *inode = file_inode(file); + struct ll_file_data *fd = file->private_data; + struct file *pcc_file = fd->fd_pcc_file.pccf_file; + struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data; + int rc; + + ENTRY; + + if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) { + *cached = false; + RETURN(0); + } + + if (!S_ISREG(inode->i_mode)) { + *cached = false; + RETURN(0); + } + + pcc_io_init(inode, PIT_FAULT, cached); + if (!*cached) + RETURN(0); + + /* Tolerate the mmap read failure for RO-PCC */ + if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR)) + GOTO(out, rc = VM_FAULT_SIGBUS); + + vma->vm_file = pcc_file; +#ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY + rc = pcc_vm_ops->fault(vmf); +#else + rc = pcc_vm_ops->fault(vma, vmf); +#endif + vma->vm_file = file; +out: + pcc_io_fini(inode, PIT_FAULT, rc, cached); RETURN(rc); } +static void __pcc_layout_invalidate(struct pcc_inode *pcci) +{ + pcci->pcci_type = LU_PCC_NONE; + pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE); + if (atomic_read(&pcci->pcci_active_ios) == 0) + return; + + CDEBUG(D_CACHE, "Waiting for IO completion: %d\n", + atomic_read(&pcci->pcci_active_ios)); + wait_event_idle(pcci->pcci_waitq, + atomic_read(&pcci->pcci_active_ios) == 0); +} + +void pcc_layout_invalidate(struct inode *inode) +{ + struct pcc_inode *pcci; + + ENTRY; + + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + if (pcci && pcc_inode_has_layout(pcci)) { + LASSERT(atomic_read(&pcci->pcci_refcount) > 0); + __pcc_layout_invalidate(pcci); + + CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n", + PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen); + + pcc_inode_put(pcci); + } + pcc_inode_unlock(inode); + + EXIT; +} + +static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry) +{ + struct dentry *parent = dget_parent(pcc_dentry); + int rc; + + rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry); + if (rc) + CWARN("%s: failed to unlink PCC file %pd, rc = %d\n", + ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc); + + dput(parent); + return rc; +} + /* Create directory under base if directory does not exist */ static struct dentry * pcc_mkdir(struct dentry *base, const char *name, umode_t mode) @@ -724,7 +2235,7 @@ pcc_mkdir(struct dentry *base, const char *name, umode_t mode) if (d_is_positive(dentry)) goto out; - rc = vfs_mkdir(dir, dentry, mode); + rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode); if (rc) { dput(dentry); dentry = ERR_PTR(rc); @@ -752,9 +2263,10 @@ pcc_mkdir_p(struct dentry *root, char *path, umode_t mode) *ptr = '\0'; child = pcc_mkdir(parent, entry_name, mode); *ptr = '/'; + dput(parent); if (IS_ERR(child)) break; - dput(parent); + parent = child; ptr++; entry_name = ptr; @@ -779,29 +2291,15 @@ pcc_create(struct dentry *base, const char *name, umode_t mode) if (d_is_positive(dentry)) goto out; - rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE); + rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false); if (rc) { dput(dentry); dentry = ERR_PTR(rc); - goto out; - } -out: - inode_unlock(dir); - return dentry; -} - -/* Must be called with pcci->pcci_lock held */ -static void pcc_inode_attach_init(struct pcc_dataset *dataset, - struct pcc_inode *pcci, - struct dentry *dentry, - enum lu_pcc_type type) -{ - pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt); - pcci->pcci_path.dentry = dentry; - LASSERT(atomic_read(&pcci->pcci_refcount) == 0); - atomic_set(&pcci->pcci_refcount, 1); - pcci->pcci_type = type; - pcci->pcci_attr_valid = false; + goto out; + } +out: + inode_unlock(dir); + return dentry; } static int __pcc_inode_create(struct pcc_dataset *dataset, @@ -813,20 +2311,20 @@ static int __pcc_inode_create(struct pcc_dataset *dataset, struct dentry *child; int rc = 0; - OBD_ALLOC(path, MAX_PCC_DATABASE_PATH); + OBD_ALLOC(path, PCC_DATASET_MAX_PATH); if (path == NULL) return -ENOMEM; - pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid); + pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid); - base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700); + base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0); if (IS_ERR(base)) { rc = PTR_ERR(base); GOTO(out, rc); } - snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid)); - child = pcc_create(base, path, 0600); + snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid)); + child = pcc_create(base, path, 0); if (IS_ERR(child)) { rc = PTR_ERR(child); GOTO(out_base, rc); @@ -836,36 +2334,135 @@ static int __pcc_inode_create(struct pcc_dataset *dataset, out_base: dput(base); out: - OBD_FREE(path, MAX_PCC_DATABASE_PATH); + OBD_FREE(path, PCC_DATASET_MAX_PATH); return rc; } -int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid, - struct dentry **pcc_dentry) +/* + * Reset uid, gid or size for the PCC copy masked by @valid. + * TODO: Set the project ID for PCC copy. + */ +static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid, + kuid_t uid, kgid_t gid, loff_t size) { - return __pcc_inode_create(dataset, fid, pcc_dentry); + struct inode *inode = dentry->d_inode; + struct iattr attr; + int rc; + + ENTRY; + + attr.ia_valid = valid; + attr.ia_uid = uid; + attr.ia_gid = gid; + attr.ia_size = size; + + inode_lock(inode); + rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL); + inode_unlock(inode); + + RETURN(rc); } -int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode, - struct dentry *pcc_dentry) +int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset, + struct lu_fid *fid, struct dentry **pcc_dentry) { - struct ll_inode_info *lli = ll_i2info(inode); + const struct cred *old_cred; + int rc; + + old_cred = override_creds(pcc_super_cred(sb)); + rc = __pcc_inode_create(dataset, fid, pcc_dentry); + revert_creds(old_cred); + return rc; +} + +int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca) +{ + struct dentry *pcc_dentry = pca->pca_dentry; + const struct cred *old_cred; + struct pcc_super *super; struct pcc_inode *pcci; + int rc; ENTRY; + if (!pca->pca_dataset) + RETURN(0); + + if (!inode) + GOTO(out_dataset_put, rc = 0); + + super = ll_i2pccs(inode); + + LASSERT(pcc_dentry); + + old_cred = override_creds(super->pccs_cred); + pcc_inode_lock(inode); LASSERT(ll_i2pcci(inode) == NULL); OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); if (pcci == NULL) - RETURN(-ENOMEM); + GOTO(out_put, rc = -ENOMEM); - pcc_inode_init(pcci); - pcc_inode_lock(inode); - pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE); - lli->lli_pcc_inode = pcci; + rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID, + old_cred->suid, old_cred->sgid, 0); + if (rc) + GOTO(out_put, rc); + + pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode), + pcci, pcc_dentry, LU_PCC_READWRITE); + + rc = pcc_layout_xattr_set(pcci, 0); + if (rc) { + if (!pcci->pcci_unlinked) + (void) pcc_inode_remove(inode, pcci->pcci_path.dentry); + pcc_inode_put(pcci); + GOTO(out_unlock, rc); + } + + /* Set the layout generation of newly created file with 0 */ + pcc_layout_gen_set(pcci, 0); + +out_put: + if (rc) { + (void) pcc_inode_remove(inode, pcc_dentry); + dput(pcc_dentry); + + if (pcci) + OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab); + } +out_unlock: pcc_inode_unlock(inode); + revert_creds(old_cred); +out_dataset_put: + pcc_dataset_put(pca->pca_dataset); + RETURN(rc); +} + +void pcc_create_attach_cleanup(struct super_block *sb, + struct pcc_create_attach *pca) +{ + if (!pca->pca_dataset) + return; + + if (pca->pca_dentry) { + struct dentry *parent; + struct inode *i_dir; + const struct cred *old_cred; + int rc; + + old_cred = override_creds(pcc_super_cred(sb)); + parent = dget_parent(pca->pca_dentry); + i_dir = d_inode(parent); + rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry); + dput(parent); + if (rc) + CWARN("%s: failed to unlink PCC file %pd: rc = %d\n", + ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc); + /* ignore the unlink failure */ + revert_creds(old_cred); + dput(pca->pca_dentry); + } - RETURN(0); + pcc_dataset_put(pca->pca_dataset); } static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count, @@ -874,7 +2471,7 @@ static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count, while (count > 0) { ssize_t size; - size = vfs_write(filp, (const void __user *)buf, count, offset); + size = cfs_kernel_write(filp, buf, count, offset); if (size < 0) return size; count -= size; @@ -883,11 +2480,10 @@ static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count, return 0; } -static int pcc_copy_data(struct file *src, struct file *dst) +static ssize_t pcc_copy_data(struct file *src, struct file *dst) { - int rc = 0; + ssize_t rc = 0; ssize_t rc2; - mm_segment_t oldfs; loff_t pos, offset = 0; size_t buf_len = 1048576; void *buf; @@ -898,154 +2494,496 @@ static int pcc_copy_data(struct file *src, struct file *dst) if (buf == NULL) RETURN(-ENOMEM); - oldfs = get_fs(); - set_fs(KERNEL_DS); while (1) { + if (signal_pending(current)) + GOTO(out_free, rc = -EINTR); + pos = offset; - rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos); + rc2 = cfs_kernel_read(src, buf, buf_len, &pos); if (rc2 < 0) - GOTO(out_fs, rc = rc2); + GOTO(out_free, rc = rc2); else if (rc2 == 0) break; pos = offset; rc = pcc_filp_write(dst, buf, rc2, &pos); if (rc < 0) - GOTO(out_fs, rc); + GOTO(out_free, rc); offset += rc2; } -out_fs: - set_fs(oldfs); + rc = offset; +out_free: OBD_FREE_LARGE(buf, buf_len); RETURN(rc); } -int pcc_readwrite_attach(struct file *file, struct inode *inode, - __u32 archive_id) +static int pcc_attach_allowed_check(struct inode *inode) { - struct pcc_dataset *dataset; struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci; - struct dentry *dentry; - struct file *pcc_filp; - struct path path; - int rc; + int rc = 0; ENTRY; pcc_inode_lock(inode); + if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING) + GOTO(out_unlock, rc = -EBUSY); + pcci = ll_i2pcci(inode); - if (pcci == NULL) { - OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); - if (pcci == NULL) { - pcc_inode_unlock(inode); - RETURN(-ENOMEM); - } + if (pcci && pcc_inode_has_layout(pcci)) + GOTO(out_unlock, rc = -EEXIST); - pcc_inode_init(pcci); - } else if (atomic_read(&pcci->pcci_refcount) > 0) { - pcc_inode_unlock(inode); - RETURN(-EEXIST); - } + lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING; +out_unlock: pcc_inode_unlock(inode); + RETURN(rc); +} - dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0, - archive_id); - if (dataset == NULL) - GOTO(out_free_pcci, rc = -ENOENT); +static int pcc_attach_data_archive(struct file *file, struct inode *inode, + struct pcc_dataset *dataset, + struct dentry **dentry) +{ + const struct cred *old_cred; + struct file *pcc_filp; + struct path path; + ssize_t ret; + int rc; - rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry); + ENTRY; + + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry); if (rc) - GOTO(out_dataset_put, rc); + GOTO(out_cred, rc); path.mnt = dataset->pccd_path.mnt; - path.dentry = dentry; -#ifdef HAVE_DENTRY_OPEN_USE_PATH - pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE, - current_cred()); -#else - pcc_filp = dentry_open(path.dentry, path.mnt, - O_TRUNC | O_WRONLY | O_LARGEFILE, - current_cred()); -#endif + path.dentry = *dentry; + pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred()); if (IS_ERR_OR_NULL(pcc_filp)) { rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp); GOTO(out_dentry, rc); } - rc = pcc_copy_data(file, pcc_filp); + rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID, + old_cred->uid, old_cred->gid, 0); if (rc) GOTO(out_fput, rc); - pcc_inode_lock(inode); - if (lli->lli_pcc_inode) - GOTO(out_unlock, rc = -EEXIST); - pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE); - lli->lli_pcc_inode = pcci; -out_unlock: - pcc_inode_unlock(inode); + ret = pcc_copy_data(file, pcc_filp); + if (ret < 0) + GOTO(out_fput, rc = ret); + + /* + * It must to truncate the PCC copy to the same size of the Lustre + * copy after copy data. Otherwise, it may get wrong file size after + * re-attach a file. See LU-13023 for details. + */ + rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0), + KGIDT_INIT(0), ret); out_fput: fput(pcc_filp); out_dentry: + if (rc) { + pcc_inode_remove(inode, *dentry); + dput(*dentry); + } +out_cred: + revert_creds(old_cred); + RETURN(rc); +} + +int pcc_readwrite_attach(struct file *file, struct inode *inode, + __u32 archive_id) +{ + struct pcc_dataset *dataset; + struct ll_inode_info *lli = ll_i2info(inode); + struct pcc_super *super = ll_i2pccs(inode); + struct pcc_inode *pcci; + struct dentry *dentry; + int rc; + + ENTRY; + + rc = pcc_attach_allowed_check(inode); + if (rc) + RETURN(rc); + + dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, + LU_PCC_READWRITE, archive_id); + if (dataset == NULL) + RETURN(-ENOENT); + + rc = pcc_attach_data_archive(file, inode, dataset, &dentry); if (rc) + GOTO(out_dataset_put, rc); + + /* Pause to allow for a race with concurrent HSM remove */ + CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val); + + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + LASSERT(!pcci); + OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); + if (pcci == NULL) + GOTO(out_unlock, rc = -ENOMEM); + + pcc_inode_attach_set(super, dataset, lli, pcci, + dentry, LU_PCC_READWRITE); +out_unlock: + pcc_inode_unlock(inode); + if (rc) { + const struct cred *old_cred; + + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + (void) pcc_inode_remove(inode, dentry); + revert_creds(old_cred); dput(dentry); + } out_dataset_put: pcc_dataset_put(dataset); -out_free_pcci: - if (rc) - OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab); - RETURN(rc); + RETURN(rc); } int pcc_readwrite_attach_fini(struct file *file, struct inode *inode, - bool lease_broken, int rc, bool attached) + __u32 gen, bool lease_broken, int rc, + bool attached) { - struct pcc_inode *pcci = ll_i2pcci(inode); + struct ll_inode_info *lli = ll_i2info(inode); + const struct cred *old_cred; + struct pcc_inode *pcci; + __u32 gen2; ENTRY; - if ((rc || lease_broken) && attached && pcci) + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + pcc_inode_lock(inode); + pcci = ll_i2pcci(inode); + if (rc || lease_broken) { + if (attached && pcci) + pcc_inode_put(pcci); + + GOTO(out_unlock, rc); + } + + /* PCC inode may be released due to layout lock revocatioin */ + if (!pcci) + GOTO(out_unlock, rc = -ESTALE); + + LASSERT(attached); + rc = pcc_layout_xattr_set(pcci, gen); + if (rc) + GOTO(out_put, rc); + + LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING); + rc = ll_layout_refresh(inode, &gen2); + if (!rc) { + if (gen2 == gen) { + pcc_layout_gen_set(pcci, gen); + } else { + CDEBUG(D_CACHE, + DFID" layout changed from %d to %d.\n", + PFID(ll_inode2fid(inode)), gen, gen2); + GOTO(out_put, rc = -ESTALE); + } + } + +out_put: + if (rc) { + if (!pcci->pcci_unlinked) + (void) pcc_inode_remove(inode, pcci->pcci_path.dentry); pcc_inode_put(pcci); + } +out_unlock: + lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING; + pcc_inode_unlock(inode); + revert_creds(old_cred); + RETURN(rc); +} + +static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen) + +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct lu_extent ext = { + .e_start = 0, + .e_end = OBD_OBJECT_EOF, + }; + struct cl_layout clt = { + .cl_layout_gen = 0, + .cl_is_released = false, + .cl_is_rdonly = false, + }; + int retries = 0; + int rc; + + ENTRY; + +repeat: + rc = pcc_get_layout_info(inode, &clt); + if (rc) + RETURN(rc); + + /* + * For the HSM released file, restore the data first. + */ + if (clt.cl_is_released) { + retries++; + if (retries > 2) + RETURN(-EBUSY); + + if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) { + rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF); + if (rc) { + CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n", + PFID(&lli->lli_fid), rc); + RETURN(rc); + } + } + rc = ll_layout_refresh(inode, gen); + if (rc) + RETURN(rc); + + goto repeat; + } + + + if (!clt.cl_is_rdonly) { + rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET, + &ext); + if (rc) + RETURN(rc); + + rc = ll_layout_refresh(inode, gen); + if (rc) + RETURN(rc); + } else { /* Readonly layout */ + *gen = clt.cl_layout_gen; + } RETURN(rc); } -int pcc_ioctl_detach(struct inode *inode) +static int pcc_readonly_ioctl_attach(struct file *file, + struct inode *inode, + struct lu_pcc_attach *attach) { + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct pcc_super *super = ll_i2pccs(inode); struct ll_inode_info *lli = ll_i2info(inode); - struct pcc_inode *pcci = lli->lli_pcc_inode; + const struct cred *old_cred; + struct pcc_dataset *dataset; + struct pcc_inode *pcci; + struct dentry *dentry; + bool attached = false; + bool unlinked = false; + __u32 gen; + int rc; + + ENTRY; + + if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags)) + RETURN(-EOPNOTSUPP); + + rc = pcc_attach_allowed_check(inode); + if (rc) + RETURN(rc); + + rc = pcc_layout_rdonly_set(inode, &gen); + if (rc) + RETURN(rc); + + dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super, + LU_PCC_READONLY, attach->pcca_id); + if (dataset == NULL) + RETURN(-ENOENT); + + rc = pcc_attach_data_archive(file, inode, dataset, &dentry); + if (rc) + GOTO(out_dataset_put, rc); + + mutex_lock(&lli->lli_layout_mutex); + pcc_inode_lock(inode); + old_cred = override_creds(super->pccs_cred); + lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING; + if (gen != ll_layout_version_get(lli)) + GOTO(out_put_unlock, rc = -ESTALE); + + pcci = ll_i2pcci(inode); + if (!pcci) { + OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); + if (pcci == NULL) + GOTO(out_put_unlock, rc = -ENOMEM); + + pcc_inode_attach_set(super, dataset, lli, pcci, + dentry, LU_PCC_READONLY); + } else { + atomic_inc(&pcci->pcci_refcount); + path_put(&pcci->pcci_path); + pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt); + pcci->pcci_path.dentry = dentry; + pcci->pcci_type = LU_PCC_READONLY; + } + attached = true; + rc = pcc_layout_xattr_set(pcci, gen); + if (rc) { + pcci->pcci_type = LU_PCC_NONE; + unlinked = pcci->pcci_unlinked; + GOTO(out_put_unlock, rc); + } + + pcc_layout_gen_set(pcci, gen); +out_put_unlock: + if (rc) { + if (!unlinked) + (void) pcc_inode_remove(inode, dentry); + if (attached) + pcc_inode_put(pcci); + else + dput(dentry); + } + revert_creds(old_cred); + pcc_inode_unlock(inode); + mutex_unlock(&lli->lli_layout_mutex); +out_dataset_put: + pcc_dataset_put(dataset); + + RETURN(rc); +} + +int pcc_ioctl_attach(struct file *file, struct inode *inode, + struct lu_pcc_attach *attach) +{ + int rc = 0; + + ENTRY; + + switch (attach->pcca_type) { + case LU_PCC_READWRITE: + rc = -EOPNOTSUPP; + break; + case LU_PCC_READONLY: + rc = pcc_readonly_ioctl_attach(file, inode, attach); + break; + default: + rc = -EINVAL; + break; + } + + RETURN(rc); +} + +static int pcc_hsm_remove(struct inode *inode) +{ + struct hsm_user_request *hur; + __u32 gen; + int len; + int rc; + + ENTRY; + + rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF); + if (rc) { + CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n", + PFID(&ll_i2info(inode)->lli_fid), rc); + RETURN(rc); + } + + ll_layout_refresh(inode, &gen); + + len = sizeof(struct hsm_user_request) + + sizeof(struct hsm_user_item); + OBD_ALLOC(hur, len); + if (hur == NULL) + RETURN(-ENOMEM); + + hur->hur_request.hr_action = HUA_REMOVE; + hur->hur_request.hr_archive_id = 0; + hur->hur_request.hr_flags = 0; + memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid, + sizeof(hur->hur_user_item[0].hui_fid)); + hur->hur_user_item[0].hui_extent.offset = 0; + hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF; + hur->hur_request.hr_itemcount = 1; + rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp, + len, hur, NULL); + if (rc) + CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n", + PFID(&ll_i2info(inode)->lli_fid), rc); + + OBD_FREE(hur, len); + RETURN(rc); +} + +int pcc_ioctl_detach(struct inode *inode, __u32 opt) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct pcc_inode *pcci; + const struct cred *old_cred; + bool hsm_remove = false; int rc = 0; - int count; ENTRY; pcc_inode_lock(inode); - if (pcci == NULL) + pcci = lli->lli_pcc_inode; + if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING || + !pcc_inode_has_layout(pcci)) GOTO(out_unlock, rc = 0); - count = atomic_read(&pcci->pcci_refcount); - if (count > 1) - GOTO(out_unlock, rc = -EBUSY); - else if (count == 0) - GOTO(out_unlock, rc = 0); + LASSERT(atomic_read(&pcci->pcci_refcount) > 0); + + if (pcci->pcci_type == LU_PCC_READWRITE) { + if (opt == PCC_DETACH_OPT_UNCACHE) { + hsm_remove = true; + /* + * The file will be removed from PCC, set the flags + * with PCC_DATASET_NONE even the later removal of the + * PCC copy fails. + */ + lli->lli_pcc_dsflags = PCC_DATASET_NONE; + } + + __pcc_layout_invalidate(pcci); + pcc_inode_put(pcci); + } else if (pcci->pcci_type == LU_PCC_READONLY) { + __pcc_layout_invalidate(pcci); + + if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) { + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + rc = pcc_inode_remove(inode, pcci->pcci_path.dentry); + revert_creds(old_cred); + if (!rc) + pcci->pcci_unlinked = true; + } + + pcc_inode_put(pcci); + } else { + rc = -EOPNOTSUPP; + } - pcc_inode_put(pcci); - lli->lli_pcc_inode = NULL; out_unlock: pcc_inode_unlock(inode); + if (hsm_remove) { + old_cred = override_creds(pcc_super_cred(inode->i_sb)); + rc = pcc_hsm_remove(inode); + revert_creds(old_cred); + } RETURN(rc); } -int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state) +int pcc_ioctl_state(struct file *file, struct inode *inode, + struct lu_pcc_state *state) { int rc = 0; int count; char *buf; char *path; int buf_len = sizeof(state->pccs_path); + struct ll_file_data *fd = file->private_data; + struct pcc_file *pccf = &fd->fd_pcc_file; struct pcc_inode *pcci; ENTRY; @@ -1067,21 +3005,22 @@ int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state) count = atomic_read(&pcci->pcci_refcount); if (count == 0) { state->pccs_type = LU_PCC_NONE; + state->pccs_open_count = 0; GOTO(out_unlock, rc = 0); } + + if (pcc_inode_has_layout(pcci)) + count--; + if (pccf->pccf_file != NULL) + count--; state->pccs_type = pcci->pcci_type; - state->pccs_open_count = count - 1; - state->pccs_flags = pcci->pcci_attr_valid ? - PCC_STATE_FLAG_ATTR_VALID : 0; -#ifdef HAVE_DENTRY_PATH_RAW + state->pccs_open_count = count; + state->pccs_flags = ll_i2info(inode)->lli_pcc_state; path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len); if (IS_ERR(path)) GOTO(out_unlock, rc = PTR_ERR(path)); -#else - path = "UNKNOWN"; -#endif - if (strlcpy(state->pccs_path, path, buf_len) >= buf_len) + if (strscpy(state->pccs_path, path, buf_len) < 0) GOTO(out_unlock, rc = -ENAMETOOLONG); out_unlock: