4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 int pcc_super_init(struct pcc_super *super)
119 super->pccs_cred = cred = prepare_creds();
123 /* Never override disk quota limits or use reserved space */
124 cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125 init_rwsem(&super->pccs_rw_sem);
126 INIT_LIST_HEAD(&super->pccs_datasets);
127 super->pccs_generation = 1;
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
135 struct pcc_match_id *id, *n;
137 list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138 list_del_init(&id->pmi_linkage);
143 static void pcc_fname_list_free(struct list_head *fname_list)
145 struct pcc_match_fname *fname, *n;
147 list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149 list_del_init(&fname->pmf_linkage);
154 static void pcc_expression_free(struct pcc_expression *expr)
156 LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157 expr->pe_field < PCC_FIELD_MAX);
158 switch (expr->pe_field) {
161 case PCC_FIELD_PROJID:
162 pcc_id_list_free(&expr->pe_cond);
164 case PCC_FIELD_FNAME:
165 pcc_fname_list_free(&expr->pe_cond);
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
175 struct pcc_expression *expression, *n;
177 LASSERT(list_empty(&conjunction->pc_linkage));
178 list_for_each_entry_safe(expression, n,
179 &conjunction->pc_expressions,
181 list_del_init(&expression->pe_linkage);
182 pcc_expression_free(expression);
184 OBD_FREE_PTR(conjunction);
187 static void pcc_rule_conds_free(struct list_head *cond_list)
189 struct pcc_conjunction *conjunction, *n;
191 list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192 list_del_init(&conjunction->pc_linkage);
193 pcc_conjunction_free(conjunction);
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
199 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201 pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202 if (cmd->u.pccc_add.pccc_conds_str)
203 OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
208 #define PCC_DISJUNCTION_DELIM (",")
209 #define PCC_CONJUNCTION_DELIM ("&")
210 #define PCC_EXPRESSION_DELIM ("=")
213 pcc_fname_list_add(char *id, struct list_head *fname_list)
215 struct pcc_match_fname *fname;
217 OBD_ALLOC_PTR(fname);
221 OBD_ALLOC(fname->pmf_name, strlen(id) + 1);
222 if (fname->pmf_name == NULL) {
227 strcpy(fname->pmf_name, id);
228 list_add_tail(&fname->pmf_linkage, fname_list);
233 pcc_fname_list_parse(char *str, struct list_head *fname_list)
239 INIT_LIST_HEAD(fname_list);
240 while (rc == 0 && str) {
241 char *fname = strsep(&str, " ");
244 rc = pcc_fname_list_add(fname, fname_list);
246 if (list_empty(fname_list))
249 pcc_fname_list_free(fname_list);
254 pcc_id_list_parse(char *str, struct list_head *id_list,
261 if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
262 type != PCC_FIELD_PROJID)
265 INIT_LIST_HEAD(id_list);
268 struct pcc_match_id *id;
269 unsigned long id_val;
271 num = strsep(&str, " ");
274 rc = kstrtoul(num, 0, &id_val);
280 GOTO(out, rc = -ENOMEM);
283 list_add_tail(&id->pmi_linkage, id_list);
285 if (list_empty(id_list))
289 pcc_id_list_free(id_list);
294 pcc_expression_parse(char *str, struct list_head *cond_list)
296 struct pcc_expression *expr;
305 field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
307 /* No LHS or no '=' */
308 GOTO(out, rc = -EINVAL);
309 str = skip_spaces(str);
311 if (str[0] != '{' || str[len - 1] != '}')
312 GOTO(out, rc = -EINVAL);
314 /* Skip '{' and '}' */
318 if (strcmp(field, "uid") == 0) {
319 if (pcc_id_list_parse(str,
322 GOTO(out, rc = -EINVAL);
323 expr->pe_field = PCC_FIELD_UID;
324 } else if (strcmp(field, "gid") == 0) {
325 if (pcc_id_list_parse(str,
328 GOTO(out, rc = -EINVAL);
329 expr->pe_field = PCC_FIELD_GID;
330 } else if (strcmp(field, "projid") == 0) {
331 if (pcc_id_list_parse(str,
333 PCC_FIELD_PROJID) < 0)
334 GOTO(out, rc = -EINVAL);
335 expr->pe_field = PCC_FIELD_PROJID;
336 } else if (strcmp(field, "fname") == 0) {
337 if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
338 GOTO(out, rc = -EINVAL);
339 expr->pe_field = PCC_FIELD_FNAME;
341 GOTO(out, rc = -EINVAL);
344 list_add_tail(&expr->pe_linkage, cond_list);
352 pcc_conjunction_parse(char *str, struct list_head *cond_list)
354 struct pcc_conjunction *conjunction;
357 OBD_ALLOC_PTR(conjunction);
358 if (conjunction == NULL)
361 INIT_LIST_HEAD(&conjunction->pc_expressions);
362 list_add_tail(&conjunction->pc_linkage, cond_list);
364 while (rc == 0 && str) {
365 char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
367 rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
372 static int pcc_conds_parse(char *orig, struct list_head *cond_list)
377 orig = kstrdup(orig, GFP_KERNEL);
382 INIT_LIST_HEAD(cond_list);
383 while (rc == 0 && str) {
384 char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
386 rc = pcc_conjunction_parse(term, cond_list);
392 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
396 OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
397 if (cmd->u.pccc_add.pccc_conds_str == NULL)
400 memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
402 rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
403 &cmd->u.pccc_add.pccc_conds);
411 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
418 key = strsep(&val, "=");
419 if (val == NULL || strlen(val) == 0)
422 /* Key of the value pair */
423 if (strcmp(key, "rwid") == 0) {
424 rc = kstrtoul(val, 10, &id);
429 cmd->u.pccc_add.pccc_rwid = id;
430 } else if (strcmp(key, "roid") == 0) {
431 rc = kstrtoul(val, 10, &id);
436 cmd->u.pccc_add.pccc_roid = id;
437 } else if (strcmp(key, "auto_attach") == 0) {
438 rc = kstrtoul(val, 10, &id);
442 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
443 } else if (strcmp(key, "open_attach") == 0) {
444 rc = kstrtoul(val, 10, &id);
448 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
449 } else if (strcmp(key, "io_attach") == 0) {
450 rc = kstrtoul(val, 10, &id);
454 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
455 } else if (strcmp(key, "stat_attach") == 0) {
456 rc = kstrtoul(val, 10, &id);
460 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
461 } else if (strcmp(key, "rwpcc") == 0 || strcmp(key, "pccrw") == 0) {
462 rc = kstrtoul(val, 10, &id);
466 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCCRW;
467 } else if (strcmp(key, "ropcc") == 0 || strcmp(key, "pccro") == 0) {
468 rc = kstrtoul(val, 10, &id);
472 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCCRO;
481 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
487 switch (cmd->pccc_cmd) {
488 case PCC_ADD_DATASET:
489 /* Enable auto attach by default */
490 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
492 case PCC_DEL_DATASET:
500 while (val != NULL && strlen(val) != 0) {
501 token = strsep(&val, " ");
502 rc = pcc_parse_value_pair(cmd, token);
511 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
513 if (!list_empty(&rule->pmr_conds))
514 pcc_rule_conds_free(&rule->pmr_conds);
515 LASSERT(rule->pmr_conds_str != NULL);
516 OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
520 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
524 LASSERT(cmd->u.pccc_add.pccc_conds_str);
525 OBD_ALLOC(rule->pmr_conds_str,
526 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
527 if (rule->pmr_conds_str == NULL)
530 memcpy(rule->pmr_conds_str,
531 cmd->u.pccc_add.pccc_conds_str,
532 strlen(cmd->u.pccc_add.pccc_conds_str));
534 INIT_LIST_HEAD(&rule->pmr_conds);
535 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
536 rc = pcc_conds_parse(rule->pmr_conds_str,
540 pcc_dataset_rule_fini(rule);
547 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
549 struct pcc_match_id *id;
551 list_for_each_entry(id, id_list, pmi_linkage) {
552 if (id->pmi_id == id_val)
559 cfs_match_wildcard(const char *pattern, const char *content)
561 if (*pattern == '\0' && *content == '\0')
564 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
567 while (*pattern == *content) {
570 if (*pattern == '\0' && *content == '\0')
573 if (*pattern == '*' && *(pattern + 1) != '\0' &&
579 return (cfs_match_wildcard(pattern + 1, content) ||
580 cfs_match_wildcard(pattern, content + 1));
586 pcc_fname_list_match(struct list_head *fname_list, const char *name)
588 struct pcc_match_fname *fname;
590 list_for_each_entry(fname, fname_list, pmf_linkage) {
591 if (cfs_match_wildcard(fname->pmf_name, name))
598 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
600 switch (expr->pe_field) {
602 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
604 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
605 case PCC_FIELD_PROJID:
606 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
607 case PCC_FIELD_FNAME:
608 return pcc_fname_list_match(&expr->pe_cond,
609 matcher->pm_name->name);
616 pcc_conjunction_match(struct pcc_conjunction *conjunction,
617 struct pcc_matcher *matcher)
619 struct pcc_expression *expr;
622 list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
623 matched = pcc_expression_match(expr, matcher);
632 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
634 struct pcc_conjunction *conjunction;
637 list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
638 matched = pcc_conjunction_match(conjunction, matcher);
647 pcc_dataset_attach_allowed(struct pcc_dataset *dataset, enum lu_pcc_type type)
649 if (type == LU_PCC_READWRITE && dataset->pccd_flags & PCC_DATASET_PCCRW)
652 if (type == LU_PCC_READONLY && dataset->pccd_flags & PCC_DATASET_PCCRO)
659 pcc_dataset_match_get(struct pcc_super *super, enum lu_pcc_type type,
660 struct pcc_matcher *matcher)
662 struct pcc_dataset *dataset;
663 struct pcc_dataset *selected = NULL;
665 down_read(&super->pccs_rw_sem);
666 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
667 if (pcc_dataset_attach_allowed(dataset, type) &&
668 pcc_cond_match(&dataset->pccd_rule, matcher)) {
669 atomic_inc(&dataset->pccd_refcount);
674 up_read(&super->pccs_rw_sem);
676 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
677 dataset->pccd_rule.pmr_conds_str,
678 matcher->pm_uid, matcher->pm_gid,
679 matcher->pm_projid, matcher->pm_name->name);
685 pcc_dataset_flags_check(struct pcc_super *super, struct pcc_cmd *cmd)
687 struct ll_sb_info *sbi;
689 sbi = container_of(super, struct ll_sb_info, ll_pcc_super);
692 * A PCC backend can provide caching service for both PCC-RW and PCC-RO.
693 * It defaults to readonly PCC as long as the server supports it.
695 if (!(exp_connect_flags2(sbi->ll_md_exp) & OBD_CONNECT2_PCCRO)) {
696 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRO ||
697 !(cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRW))
699 } else if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0) {
700 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_DEFAULT;
701 } /* else RWPCC or ROPCC must have been given */
703 if (cmd->u.pccc_add.pccc_rwid == 0 &&
704 cmd->u.pccc_add.pccc_roid == 0)
707 if (cmd->u.pccc_add.pccc_rwid == 0 &&
708 cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRW)
709 cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
711 if (cmd->u.pccc_add.pccc_roid == 0 &&
712 cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRO)
713 cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
719 * pcc_dataset_add - Add a Cache policy to control which files need be
720 * cached and where it will be cached.
722 * @super: superblock of pcc
726 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
728 char *pathname = cmd->pccc_pathname;
729 struct pcc_dataset *dataset;
730 struct pcc_dataset *tmp;
734 rc = pcc_dataset_flags_check(super, cmd);
738 OBD_ALLOC_PTR(dataset);
742 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
744 OBD_FREE_PTR(dataset);
747 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
748 dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
749 dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
750 dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
751 atomic_set(&dataset->pccd_refcount, 1);
753 rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
755 pcc_dataset_put(dataset);
759 down_write(&super->pccs_rw_sem);
760 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
761 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
762 (dataset->pccd_rwid != 0 &&
763 dataset->pccd_rwid == tmp->pccd_rwid) ||
764 (dataset->pccd_roid != 0 &&
765 dataset->pccd_roid == tmp->pccd_roid)) {
771 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
772 up_write(&super->pccs_rw_sem);
775 pcc_dataset_put(dataset);
782 static struct pcc_dataset *
783 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
785 struct pcc_dataset *dataset;
786 struct pcc_dataset *selected = NULL;
792 * archive ID (read-write ID) or read-only ID is unique in the list,
793 * we just return last added one as first priority.
795 down_read(&super->pccs_rw_sem);
796 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
797 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
798 !(dataset->pccd_flags & PCC_DATASET_PCCRW)))
800 if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
801 !(dataset->pccd_flags & PCC_DATASET_PCCRO)))
803 atomic_inc(&dataset->pccd_refcount);
807 up_read(&super->pccs_rw_sem);
809 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
815 pcc_dataset_put(struct pcc_dataset *dataset)
817 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
818 pcc_dataset_rule_fini(&dataset->pccd_rule);
819 path_put(&dataset->pccd_path);
820 OBD_FREE_PTR(dataset);
825 pcc_dataset_del(struct pcc_super *super, char *pathname)
827 struct list_head *l, *tmp;
828 struct pcc_dataset *dataset;
831 down_write(&super->pccs_rw_sem);
832 list_for_each_safe(l, tmp, &super->pccs_datasets) {
833 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
834 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
835 list_del_init(&dataset->pccd_linkage);
836 pcc_dataset_put(dataset);
837 super->pccs_generation++;
842 up_write(&super->pccs_rw_sem);
847 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
849 seq_printf(m, "%s:\n", dataset->pccd_pathname);
850 seq_printf(m, " rwid: %u\n", dataset->pccd_rwid);
851 seq_printf(m, " flags: %x\n", dataset->pccd_flags);
852 seq_printf(m, " autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
856 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
858 struct pcc_dataset *dataset;
860 down_read(&super->pccs_rw_sem);
861 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
862 pcc_dataset_dump(dataset, m);
864 up_read(&super->pccs_rw_sem);
868 static void pcc_remove_datasets(struct pcc_super *super)
870 struct pcc_dataset *dataset, *tmp;
872 down_write(&super->pccs_rw_sem);
873 list_for_each_entry_safe(dataset, tmp,
874 &super->pccs_datasets, pccd_linkage) {
875 list_del(&dataset->pccd_linkage);
876 pcc_dataset_put(dataset);
878 super->pccs_generation++;
879 up_write(&super->pccs_rw_sem);
882 void pcc_super_fini(struct pcc_super *super)
884 pcc_remove_datasets(super);
885 put_cred(super->pccs_cred);
888 static bool pathname_is_valid(const char *pathname)
890 /* Needs to be absolute path */
891 if (pathname == NULL || strlen(pathname) == 0 ||
892 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
897 static struct pcc_cmd *
898 pcc_cmd_parse(char *buffer, unsigned long count)
900 static struct pcc_cmd *cmd;
907 GOTO(out, rc = -ENOMEM);
909 /* clear all setting */
910 if (strncmp(buffer, "clear", 5) == 0) {
911 cmd->pccc_cmd = PCC_CLEAR_ALL;
916 token = strsep(&val, " ");
917 if (val == NULL || strlen(val) == 0)
918 GOTO(out_free_cmd, rc = -EINVAL);
920 /* Type of the command */
921 if (strcmp(token, "add") == 0)
922 cmd->pccc_cmd = PCC_ADD_DATASET;
923 else if (strcmp(token, "del") == 0)
924 cmd->pccc_cmd = PCC_DEL_DATASET;
926 GOTO(out_free_cmd, rc = -EINVAL);
928 /* Pathname of the dataset */
929 token = strsep(&val, " ");
930 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
931 !pathname_is_valid(token))
932 GOTO(out_free_cmd, rc = -EINVAL);
933 cmd->pccc_pathname = token;
935 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
939 val = strrchr(token, '}');
941 GOTO(out_free_cmd, rc = -EINVAL);
947 } else if (*val == ' ') {
951 GOTO(out_free_cmd, rc = -EINVAL);
954 rc = pcc_id_parse(cmd, token);
956 GOTO(out_free_cmd, rc);
958 rc = pcc_parse_value_pairs(cmd, val);
960 GOTO(out_cmd_fini, rc = -EINVAL);
973 int pcc_cmd_handle(char *buffer, unsigned long count,
974 struct pcc_super *super)
979 cmd = pcc_cmd_parse(buffer, count);
983 switch (cmd->pccc_cmd) {
984 case PCC_ADD_DATASET:
985 rc = pcc_dataset_add(super, cmd);
987 case PCC_DEL_DATASET:
988 rc = pcc_dataset_del(super, cmd->pccc_pathname);
991 pcc_remove_datasets(super);
1003 static inline void pcc_inode_lock(struct inode *inode)
1005 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
1008 static inline void pcc_inode_unlock(struct inode *inode)
1010 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1013 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1015 pcci->pcci_lli = lli;
1016 lli->lli_pcc_inode = pcci;
1017 atomic_set(&pcci->pcci_refcount, 0);
1018 pcci->pcci_type = LU_PCC_NONE;
1019 pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1020 atomic_set(&pcci->pcci_active_ios, 0);
1021 init_waitqueue_head(&pcci->pcci_waitq);
1024 static void pcc_inode_fini(struct pcc_inode *pcci)
1026 struct ll_inode_info *lli = pcci->pcci_lli;
1028 path_put(&pcci->pcci_path);
1029 pcci->pcci_type = LU_PCC_NONE;
1030 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1031 lli->lli_pcc_inode = NULL;
1034 static void pcc_inode_get(struct pcc_inode *pcci)
1036 atomic_inc(&pcci->pcci_refcount);
1039 static void pcc_inode_put(struct pcc_inode *pcci)
1041 if (atomic_dec_and_test(&pcci->pcci_refcount))
1042 pcc_inode_fini(pcci);
1045 void pcc_inode_free(struct inode *inode)
1047 struct pcc_inode *pcci = ll_i2pcci(inode);
1050 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1051 pcc_inode_put(pcci);
1057 * As Andreas suggested, we'd better use new layout to
1059 * (fid->f_oid >> 16 & oxFFFF)/FID
1061 #define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1062 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1064 return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1066 (fid)->f_oid & 0xFFFF,
1067 (fid)->f_oid >> 16 & 0xFFFF,
1068 (unsigned int)((fid)->f_seq & 0xFFFF),
1069 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1070 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1071 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1075 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1077 return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1080 void pcc_file_init(struct pcc_file *pccf)
1082 pccf->pccf_file = NULL;
1083 pccf->pccf_type = LU_PCC_NONE;
1086 static inline bool pcc_auto_attach_enabled(struct pcc_dataset *dataset,
1087 enum lu_pcc_type type,
1088 enum pcc_io_type iot)
1090 if (pcc_dataset_attach_allowed(dataset, type)) {
1091 if (iot == PIT_OPEN)
1092 return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
1093 if (iot == PIT_GETATTR)
1094 return dataset->pccd_flags & PCC_DATASET_STAT_ATTACH;
1096 return dataset->pccd_flags & PCC_DATASET_AUTO_ATTACH;
1102 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1104 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1106 struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1107 struct ll_inode_info *lli = pcci->pcci_lli;
1112 if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1115 rc = ll_vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1116 &gen, sizeof(gen), 0);
1121 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1124 struct ll_inode_info *lli = ll_i2info(inode);
1133 env = cl_env_get(&refcheck);
1135 RETURN(PTR_ERR(env));
1137 rc = cl_object_layout_get(env, lli->lli_clob, clt);
1139 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1140 PFID(ll_inode2fid(inode)));
1142 cl_env_put(env, &refcheck);
1143 RETURN(rc < 0 ? rc : 0);
1146 /* Must be called with pcci->pcci_lock held */
1147 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1148 struct pcc_inode *pcci,
1149 struct dentry *dentry,
1150 enum lu_pcc_type type)
1152 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1153 pcci->pcci_path.dentry = dentry;
1154 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1155 atomic_set(&pcci->pcci_refcount, 1);
1156 pcci->pcci_type = type;
1157 pcci->pcci_attr_valid = false;
1160 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1161 struct pcc_dataset *dataset)
1163 lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1164 lli->lli_pcc_dsflags = dataset->pccd_flags;
1167 static void pcc_inode_attach_set(struct pcc_super *super,
1168 struct pcc_dataset *dataset,
1169 struct ll_inode_info *lli,
1170 struct pcc_inode *pcci,
1171 struct dentry *dentry,
1172 enum lu_pcc_type type)
1174 pcc_inode_init(pcci, lli);
1175 pcc_inode_attach_init(dataset, pcci, dentry, type);
1176 down_read(&super->pccs_rw_sem);
1177 pcc_inode_dsflags_set(lli, dataset);
1178 up_read(&super->pccs_rw_sem);
1181 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1184 pcci->pcci_layout_gen = gen;
1187 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1189 return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1192 static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
1194 char *ptr = NULL, *component;
1195 struct dentry *parent;
1196 struct dentry *child = ERR_PTR(-ENOENT);
1200 /* move past any initial '/' to the start of the first path component*/
1204 /* store the start of the first path component */
1207 parent = dget(base);
1209 /* find the start of the next component - if we don't find it,
1210 * the current component is the last component
1212 ptr = strchr(ptr, '/');
1213 /* put a NUL char in place of the '/' before the next compnent
1214 * so we can treat this component as a string; note the full
1215 * path string is NUL terminated to this is not needed for the
1221 /* look up the current component */
1222 inode_lock(parent->d_inode);
1223 child = lookup_one_len(component, parent, strlen(component));
1224 inode_unlock(parent->d_inode);
1226 /* repair the path string: put '/' back in place of the NUL */
1232 if (IS_ERR_OR_NULL(child))
1235 /* we may find a cached negative dentry */
1236 if (!d_is_positive(child)) {
1242 /* descend in to the next level of the path */
1245 /* move the pointer past the '/' to the next component */
1251 /* NULL child means we didn't find anything */
1253 child = ERR_PTR(-ENOENT);
1258 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1259 enum lu_pcc_type type,
1260 struct pcc_dataset *dataset,
1263 struct ll_inode_info *lli = ll_i2info(inode);
1264 struct pcc_inode *pcci = lli->lli_pcc_inode;
1265 const struct cred *old_cred;
1266 struct dentry *pcc_dentry = NULL;
1267 char pathname[PCC_DATASET_MAX_PATH];
1273 if (type == LU_PCC_READWRITE &&
1274 !(dataset->pccd_flags & PCC_DATASET_PCCRW))
1277 if (type == LU_PCC_READONLY &&
1278 !(dataset->pccd_flags & PCC_DATASET_PCCRO))
1281 rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
1284 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1285 pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
1286 if (IS_ERR(pcc_dentry)) {
1287 rc = PTR_ERR(pcc_dentry);
1288 CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
1289 ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
1291 /* ignore this error */
1295 rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1296 &pcc_gen, sizeof(pcc_gen));
1298 /* ignore this error */
1299 GOTO(out_put_pcc_dentry, rc = 0);
1302 /* The file is still valid cached in PCC, attach it immediately. */
1303 if (pcc_gen == gen) {
1304 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1305 PFID(&lli->lli_fid), gen);
1307 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1309 GOTO(out_put_pcc_dentry, rc = -ENOMEM);
1311 pcc_inode_init(pcci, lli);
1313 pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1316 * This happened when a file was once attached into
1317 * PCC, and some processes keep this file opened
1318 * (pcci->refcount > 1) and corresponding PCC file
1319 * without any I/O activity, and then this file was
1320 * detached by the manual detach command or the
1321 * revocation of the layout lock (i.e. cached LRU lock
1324 pcc_inode_get(pcci);
1325 pcci->pcci_type = type;
1327 pcc_inode_dsflags_set(lli, dataset);
1328 pcc_layout_gen_set(pcci, gen);
1334 revert_creds(old_cred);
1338 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1339 __u32 gen, enum lu_pcc_type type,
1342 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1343 struct ll_inode_info *lli = ll_i2info(inode);
1344 struct pcc_dataset *dataset = NULL, *tmp;
1349 down_read(&super->pccs_rw_sem);
1350 list_for_each_entry_safe(dataset, tmp,
1351 &super->pccs_datasets, pccd_linkage) {
1352 if (!pcc_auto_attach_enabled(dataset, type, iot))
1355 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1356 if (rc < 0 || (!rc && *cached))
1361 * Update the saved dataset flags for the inode accordingly if failed.
1363 if (!rc && !*cached) {
1365 * Currently auto attach strategy for a PCC backend is
1366 * unchangeable once once it was added into the PCC datasets on
1367 * a client as the support to change auto attach strategy is
1368 * not implemented yet.
1371 * If tried to attach from one PCC backend:
1372 * @lli_pcc_generation > 0:
1373 * 1) The file was once attached into PCC, but now the
1374 * corresponding PCC backend should be removed from the client;
1375 * 2) The layout generation was changed, the data has been
1377 * 3) The corresponding PCC copy is not existed on PCC
1378 * @lli_pcc_generation == 0:
1379 * The file is never attached into PCC but in a HSM released
1380 * state, or once attached into PCC but the inode was evicted
1381 * from icache later.
1382 * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1383 * file will skip from the candidates to try auto attach until
1384 * the file is attached into PCC again.
1386 * If the file was never attached into PCC, or once attached but
1387 * its inode was evicted from icache (lli_pcc_generation == 0),
1388 * or the corresponding dataset was removed from the client,
1389 * set the saved dataset flags with PCC_DATASET_NONE.
1391 * TODO: If the file was once attached into PCC but not try to
1392 * auto attach due to the change of the configuration parameters
1393 * for this dataset (i.e. change from auto attach enabled to
1394 * auto attach disabled for this dataset), update the saved
1395 * dataset flags with the found one.
1397 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1399 up_read(&super->pccs_rw_sem);
1404 static int pcc_readonly_ioctl_attach(struct file *file, struct inode *inode,
1407 /* Call with pcci_mutex hold */
1408 static int pcc_try_readonly_open_attach(struct inode *inode, struct file *file,
1411 struct dentry *dentry = file->f_path.dentry;
1412 struct pcc_dataset *dataset;
1413 struct pcc_matcher item;
1414 struct pcc_inode *pcci;
1419 if (!((file->f_flags & O_ACCMODE) == O_RDONLY))
1422 item.pm_uid = from_kuid(&init_user_ns, current_uid());
1423 item.pm_gid = from_kgid(&init_user_ns, current_gid());
1424 item.pm_projid = ll_i2info(inode)->lli_projid;
1425 item.pm_name = &dentry->d_name;
1426 dataset = pcc_dataset_match_get(&ll_i2sbi(inode)->ll_pcc_super,
1427 LU_PCC_READONLY, &item);
1428 if (dataset == NULL)
1431 if ((dataset->pccd_flags & PCC_DATASET_PCC_ALL) == PCC_DATASET_PCCRO) {
1432 pcc_inode_unlock(inode);
1433 rc = pcc_readonly_ioctl_attach(file, inode, dataset->pccd_roid);
1434 pcc_inode_lock(inode);
1435 pcci = ll_i2pcci(inode);
1436 if (pcci && pcc_inode_has_layout(pcci))
1440 "Failed to try PCC-RO attach "DFID", rc = %d\n",
1441 PFID(&ll_i2info(inode)->lli_fid), rc);
1442 /* ignore the error during auto PCC-RO attach. */
1447 pcc_dataset_put(dataset);
1452 * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1453 * Thus the client can get archive ID from the layout directly. When try to
1454 * attach the file automatically which is in HSM released state (according to
1455 * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1456 * valid cached on PCC more precisely according to the @rwid (archive ID) in
1457 * the PCC dataset and the archive ID in HSM attrs.
1459 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1460 enum pcc_io_type iot)
1462 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1463 struct cl_layout clt = {
1465 .cl_is_released = false,
1467 struct ll_inode_info *lli = ll_i2info(inode);
1474 * Quick check whether there is PCC device.
1476 if (list_empty(&super->pccs_datasets))
1480 * The file layout lock was cancelled. And this open does not
1481 * obtain valid layout lock from MDT (i.e. the file is being
1484 if (iot == PIT_OPEN) {
1485 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1488 rc = ll_layout_refresh(inode, &gen);
1493 rc = pcc_get_layout_info(inode, &clt);
1497 if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1498 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1499 PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1503 if (clt.cl_is_released)
1504 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1505 LU_PCC_READWRITE, cached);
1506 else if (clt.cl_is_rdonly)
1507 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1508 LU_PCC_READONLY, cached);
1513 static inline bool pcc_may_auto_attach(struct inode *inode,
1514 enum pcc_io_type iot)
1516 struct ll_inode_info *lli = ll_i2info(inode);
1517 struct pcc_super *super = ll_i2pccs(inode);
1521 /* Known the file was not in any PCC backend. */
1522 if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1526 * lli_pcc_generation == 0 means that the file was never attached into
1527 * PCC, or may be once attached into PCC but detached as the inode is
1528 * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1529 * icache shrinking due to the memory pressure), which will cause the
1530 * file detach from PCC when releasing the inode from icache.
1531 * In either case, we still try to attach.
1533 /* lli_pcc_generation == 0, or the PCC setting was changed,
1534 * or there is no PCC setup on the client and the try will return
1535 * immediately in pcc_try_auto_attach().
1537 if (super->pccs_generation != lli->lli_pcc_generation)
1540 /* The cached setting @lli_pcc_dsflags is valid */
1541 if (iot == PIT_OPEN)
1542 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
1544 if (iot == PIT_GETATTR)
1545 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
1547 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
1550 int pcc_file_open(struct inode *inode, struct file *file)
1552 struct pcc_inode *pcci;
1553 struct ll_inode_info *lli = ll_i2info(inode);
1554 struct ll_file_data *fd = file->private_data;
1555 struct pcc_file *pccf = &fd->fd_pcc_file;
1556 struct file *pcc_file;
1558 bool cached = false;
1563 if (!S_ISREG(inode->i_mode))
1566 if (IS_ENCRYPTED(inode))
1569 pcc_inode_lock(inode);
1570 pcci = ll_i2pcci(inode);
1572 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1573 GOTO(out_unlock, rc = 0);
1575 if (!pcci || !pcc_inode_has_layout(pcci)) {
1576 if (pcc_may_auto_attach(inode, PIT_OPEN))
1577 rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1579 if (rc == 0 && !cached)
1580 rc = pcc_try_readonly_open_attach(inode, file, &cached);
1582 if (rc < 0 || !cached)
1583 GOTO(out_unlock, rc);
1586 pcci = ll_i2pcci(inode);
1589 pcc_inode_get(pcci);
1590 WARN_ON(pccf->pccf_file);
1592 path = &pcci->pcci_path;
1593 CDEBUG(D_CACHE, "opening pcc file '%pd'\n", path->dentry);
1595 pcc_file = dentry_open(path, file->f_flags,
1596 pcc_super_cred(inode->i_sb));
1597 if (IS_ERR_OR_NULL(pcc_file)) {
1598 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1599 pcc_inode_put(pcci);
1601 pccf->pccf_file = pcc_file;
1602 pccf->pccf_type = pcci->pcci_type;
1606 pcc_inode_unlock(inode);
1610 void pcc_file_release(struct inode *inode, struct file *file)
1612 struct pcc_inode *pcci;
1613 struct ll_file_data *fd = file->private_data;
1614 struct pcc_file *pccf;
1619 if (!S_ISREG(inode->i_mode) || fd == NULL)
1622 pccf = &fd->fd_pcc_file;
1623 pcc_inode_lock(inode);
1624 if (pccf->pccf_file == NULL)
1627 pcci = ll_i2pcci(inode);
1629 path = &pcci->pcci_path;
1630 CDEBUG(D_CACHE, "releasing pcc file \"%pd\"\n", path->dentry);
1631 pcc_inode_put(pcci);
1632 fput(pccf->pccf_file);
1633 pccf->pccf_file = NULL;
1635 pcc_inode_unlock(inode);
1639 /* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
1640 static bool pcc_io_tolerate(struct pcc_inode *pcci,
1641 enum pcc_io_type iot, int rc)
1643 if (pcci->pcci_type == LU_PCC_READWRITE) {
1644 if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
1646 /* Handle the ->page_mkwrite failure tolerance separately
1647 * in pcc_page_mkwrite().
1649 } else if (pcci->pcci_type == LU_PCC_READONLY) {
1651 * For async I/O engine such as libaio and io_uring, PCC read
1652 * should not tolerate -EAGAIN/-EIOCBQUEUED errors, return
1653 * the error code to the caller directly.
1655 if ((iot == PIT_READ || iot == PIT_GETATTR ||
1656 iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM &&
1657 rc != -EAGAIN && rc != -EIOCBQUEUED)
1659 if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
1660 !(rc & VM_FAULT_OOM))
1667 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1669 struct pcc_inode *pcci;
1671 pcc_inode_lock(inode);
1672 pcci = ll_i2pcci(inode);
1673 if (pcci && pcc_inode_has_layout(pcci)) {
1674 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1675 if (pcci->pcci_type == LU_PCC_READONLY &&
1676 (iot == PIT_WRITE || iot == PIT_SETATTR ||
1677 iot == PIT_PAGE_MKWRITE)) {
1678 /* Fall back to normal I/O path */
1680 /* For mmap write, we need to detach the file from
1681 * RO-PCC, release the page got from ->fault(), and
1682 * then retry the memory fault handling (->fault()
1683 * and ->page_mkwrite()).
1684 * These are done in pcc_page_mkwrite();
1687 atomic_inc(&pcci->pcci_active_ios);
1692 if (pcc_may_auto_attach(inode, iot)) {
1693 (void) pcc_try_auto_attach(inode, cached, iot);
1695 pcci = ll_i2pcci(inode);
1696 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1697 atomic_inc(&pcci->pcci_active_ios);
1701 pcc_inode_unlock(inode);
1704 static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
1705 int rc, bool *cached)
1707 struct pcc_inode *pcci = ll_i2pcci(inode);
1709 LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
1711 *cached = pcc_io_tolerate(pcci, iot, rc);
1712 if (atomic_dec_and_test(&pcci->pcci_active_ios))
1713 wake_up(&pcci->pcci_waitq);
1718 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1720 struct file *file = iocb->ki_filp;
1722 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1723 return file->f_op->read_iter(iocb, iter);
1729 iov_for_each(iov, i, *iter) {
1732 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1733 if (-EIOCBQUEUED == res)
1734 res = wait_on_sync_kiocb(iocb);
1742 if (res < iov.iov_len)
1747 iov_iter_advance(iter, bytes);
1752 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1753 struct iov_iter *iter, bool *cached)
1755 struct file *file = iocb->ki_filp;
1756 struct ll_file_data *fd = file->private_data;
1757 struct pcc_file *pccf = &fd->fd_pcc_file;
1758 struct inode *inode = file_inode(file);
1763 if (pccf->pccf_file == NULL) {
1768 pcc_io_init(inode, PIT_READ, cached);
1772 /* Fake I/O error on RO-PCC */
1773 if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1774 GOTO(out, result = -EIO);
1776 iocb->ki_filp = pccf->pccf_file;
1777 /* generic_file_aio_read does not support ext4-dax,
1778 * __pcc_file_read_iter uses ->aio_read hook directly
1779 * to add support for ext4-dax.
1781 result = __pcc_file_read_iter(iocb, iter);
1782 iocb->ki_filp = file;
1784 pcc_io_fini(inode, PIT_READ, result, cached);
1789 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1791 struct file *file = iocb->ki_filp;
1793 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1794 return file->f_op->write_iter(iocb, iter);
1800 iov_for_each(iov, i, *iter) {
1803 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1804 if (-EIOCBQUEUED == res)
1805 res = wait_on_sync_kiocb(iocb);
1813 if (res < iov.iov_len)
1818 iov_iter_advance(iter, bytes);
1823 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1824 struct iov_iter *iter, bool *cached)
1826 struct file *file = iocb->ki_filp;
1827 struct ll_file_data *fd = file->private_data;
1828 struct pcc_file *pccf = &fd->fd_pcc_file;
1829 struct inode *inode = file_inode(file);
1834 if (pccf->pccf_file == NULL) {
1839 if (pccf->pccf_type != LU_PCC_READWRITE) {
1844 pcc_io_init(inode, PIT_WRITE, cached);
1848 if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1849 GOTO(out, result = -ENOSPC);
1851 iocb->ki_filp = pccf->pccf_file;
1853 /* Since __pcc_file_write_iter makes write calls via
1854 * the normal vfs interface to the local PCC file system,
1855 * the inode lock is not needed.
1857 result = __pcc_file_write_iter(iocb, iter);
1858 iocb->ki_filp = file;
1860 pcc_io_fini(inode, PIT_WRITE, result, cached);
1864 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1868 const struct cred *old_cred;
1869 struct iattr attr2 = *attr;
1870 struct dentry *pcc_dentry;
1871 struct pcc_inode *pcci;
1875 if (!S_ISREG(inode->i_mode)) {
1880 pcc_io_init(inode, PIT_SETATTR, cached);
1884 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1885 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1886 ATTR_CTIME | ATTR_UID | ATTR_GID);
1887 pcci = ll_i2pcci(inode);
1888 pcc_dentry = pcci->pcci_path.dentry;
1889 inode_lock(pcc_dentry->d_inode);
1890 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1891 #ifdef HAVE_USER_NAMESPACE_ARG
1892 rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
1895 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1897 revert_creds(old_cred);
1898 inode_unlock(pcc_dentry->d_inode);
1900 pcc_io_fini(inode, PIT_SETATTR, rc, cached);
1904 int pcc_inode_getattr(struct inode *inode, u32 request_mask,
1905 unsigned int flags, bool *cached)
1907 struct ll_inode_info *lli = ll_i2info(inode);
1908 const struct cred *old_cred;
1917 if (!S_ISREG(inode->i_mode)) {
1922 pcc_io_init(inode, PIT_GETATTR, cached);
1926 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1927 rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat, request_mask,
1929 revert_creds(old_cred);
1933 ll_inode_size_lock(inode);
1934 if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
1935 inode_get_atime_sec(inode) < lli->lli_atime)
1936 inode_set_atime(inode, lli->lli_atime, 0);
1938 inode_set_mtime(inode, lli->lli_mtime, 0);
1939 inode_set_ctime(inode, lli->lli_ctime, 0);
1941 atime = inode_get_atime_sec(inode);
1942 mtime = inode_get_mtime_sec(inode);
1943 ctime = inode_get_ctime_sec(inode);
1945 if (atime < stat.atime.tv_sec)
1946 atime = stat.atime.tv_sec;
1948 if (ctime < stat.ctime.tv_sec)
1949 ctime = stat.ctime.tv_sec;
1951 if (mtime < stat.mtime.tv_sec)
1952 mtime = stat.mtime.tv_sec;
1954 i_size_write(inode, stat.size);
1955 inode->i_blocks = stat.blocks;
1957 inode_set_atime(inode, atime, 0);
1958 inode_set_mtime(inode, mtime, 0);
1959 inode_set_ctime(inode, ctime, 0);
1961 ll_inode_size_unlock(inode);
1963 pcc_io_fini(inode, PIT_GETATTR, rc, cached);
1967 #ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
1968 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1969 struct pipe_inode_info *pipe,
1970 size_t count, unsigned int flags)
1972 struct inode *inode = file_inode(in_file);
1973 struct ll_file_data *fd = in_file->private_data;
1974 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1975 bool cached = false;
1981 RETURN(default_file_splice_read(in_file, ppos, pipe,
1984 pcc_io_init(inode, PIT_SPLICE_READ, &cached);
1986 RETURN(default_file_splice_read(in_file, ppos, pipe,
1989 result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
1991 pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
1994 #endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
1996 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1997 int datasync, bool *cached)
1999 struct inode *inode = file_inode(file);
2000 struct ll_file_data *fd = file->private_data;
2001 struct pcc_file *pccf = &fd->fd_pcc_file;
2002 struct file *pcc_file = pccf->pccf_file;
2012 if (!S_ISREG(inode->i_mode)) {
2018 * After the file is attached into RO-PCC, its dirty pages on this
2019 * client may not be flushed. So fsync() should fall back to normal
2020 * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
2021 * copy is meaningless.
2023 if (pccf->pccf_type == LU_PCC_READONLY) {
2028 pcc_io_init(inode, PIT_FSYNC, cached);
2032 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
2033 start, end, datasync);
2035 pcc_io_fini(inode, PIT_FSYNC, rc, cached);
2039 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
2042 struct inode *inode = file_inode(file);
2043 struct ll_file_data *fd = file->private_data;
2044 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2045 struct pcc_inode *pcci;
2050 if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
2055 pcc_inode_lock(inode);
2056 pcci = ll_i2pcci(inode);
2057 if (pcci && pcc_inode_has_layout(pcci)) {
2058 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
2060 vma->vm_file = pcc_file;
2061 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
2062 vma->vm_file = file;
2063 /* Save the vm ops of backend PCC */
2064 vma->vm_private_data = (void *)vma->vm_ops;
2068 pcc_inode_unlock(inode);
2073 void pcc_vm_open(struct vm_area_struct *vma)
2075 struct pcc_inode *pcci;
2076 struct file *file = vma->vm_file;
2077 struct inode *inode = file_inode(file);
2078 struct ll_file_data *fd = file->private_data;
2079 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2080 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2084 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
2087 pcc_inode_lock(inode);
2088 pcci = ll_i2pcci(inode);
2089 if (pcci && pcc_inode_has_layout(pcci)) {
2090 vma->vm_file = pcc_file;
2091 pcc_vm_ops->open(vma);
2092 vma->vm_file = file;
2094 pcc_inode_unlock(inode);
2098 void pcc_vm_close(struct vm_area_struct *vma)
2100 struct file *file = vma->vm_file;
2101 struct inode *inode = file_inode(file);
2102 struct ll_file_data *fd = file->private_data;
2103 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2104 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2108 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
2111 pcc_inode_lock(inode);
2112 /* Layout lock maybe revoked here */
2113 vma->vm_file = pcc_file;
2114 pcc_vm_ops->close(vma);
2115 vma->vm_file = file;
2116 pcc_inode_unlock(inode);
2120 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
2123 struct page *page = vmf->page;
2124 struct mm_struct *mm = vma->vm_mm;
2125 struct file *file = vma->vm_file;
2126 struct inode *inode = file_inode(file);
2127 struct ll_file_data *fd = file->private_data;
2128 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2129 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2134 if (!pcc_file || !pcc_vm_ops) {
2139 if (!pcc_vm_ops->page_mkwrite &&
2140 page->mapping == pcc_file->f_mapping) {
2142 "%s: PCC backend fs not support ->page_mkwrite()\n",
2143 ll_i2sbi(inode)->ll_fsname);
2144 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2145 mmap_read_unlock(mm);
2147 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2149 /* Pause to allow for a race with concurrent detach */
2150 CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
2152 pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
2154 /* This happens when the file is detached from PCC after got
2155 * the fault page via ->fault() on the inode of the PCC copy.
2156 * Here it can not simply fall back to normal Lustre I/O path.
2157 * The reason is that the address space of fault page used by
2158 * ->page_mkwrite() is still the one of PCC inode. In the
2159 * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2160 * handled as the address space of the fault page is not
2161 * consistent with the one of the Lustre inode (though the
2162 * fault page was truncated).
2163 * As the file is detached from PCC, the fault page must
2164 * be released frist, and retry the mmap write (->fault() and
2166 * We use an ugly and tricky method by returning
2167 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2168 * __do_page_fault and retry the memory fault handling.
2170 if (page->mapping == pcc_file->f_mapping) {
2171 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2173 mmap_read_unlock(mm);
2174 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2181 * This fault injection can also be used to simulate -ENOSPC and
2182 * -EDQUOT failure of underlying PCC backend fs.
2184 if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
2185 GOTO(out, rc = VM_FAULT_SIGBUS);
2187 vma->vm_file = pcc_file;
2188 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2189 rc = pcc_vm_ops->page_mkwrite(vmf);
2191 rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2193 vma->vm_file = file;
2196 pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
2198 /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
2199 * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
2202 if (rc & VM_FAULT_SIGBUS) {
2203 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2204 mmap_read_unlock(mm);
2205 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2210 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2213 struct file *file = vma->vm_file;
2214 struct inode *inode = file_inode(file);
2215 struct ll_file_data *fd = file->private_data;
2216 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2217 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2222 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2227 if (!S_ISREG(inode->i_mode)) {
2232 pcc_io_init(inode, PIT_FAULT, cached);
2236 /* Tolerate the mmap read failure for RO-PCC */
2237 if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
2238 GOTO(out, rc = VM_FAULT_SIGBUS);
2240 vma->vm_file = pcc_file;
2241 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2242 rc = pcc_vm_ops->fault(vmf);
2244 rc = pcc_vm_ops->fault(vma, vmf);
2246 vma->vm_file = file;
2248 pcc_io_fini(inode, PIT_FAULT, rc, cached);
2252 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2254 pcci->pcci_type = LU_PCC_NONE;
2255 pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2256 if (atomic_read(&pcci->pcci_active_ios) == 0)
2259 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2260 atomic_read(&pcci->pcci_active_ios));
2261 wait_event_idle(pcci->pcci_waitq,
2262 atomic_read(&pcci->pcci_active_ios) == 0);
2265 void pcc_layout_invalidate(struct inode *inode)
2267 struct pcc_inode *pcci;
2271 pcc_inode_lock(inode);
2272 pcci = ll_i2pcci(inode);
2273 if (pcci && pcc_inode_has_layout(pcci)) {
2274 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2275 __pcc_layout_invalidate(pcci);
2277 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2278 PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2280 pcc_inode_put(pcci);
2282 pcc_inode_unlock(inode);
2287 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2289 struct dentry *parent = dget_parent(pcc_dentry);
2292 rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry);
2294 CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
2295 ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
2301 /* Create directory under base if directory does not exist */
2302 static struct dentry *
2303 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2306 struct dentry *dentry;
2307 struct inode *dir = base->d_inode;
2310 dentry = lookup_one_len(name, base, strlen(name));
2314 if (d_is_positive(dentry))
2317 rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
2320 dentry = ERR_PTR(rc);
2328 static struct dentry *
2329 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2331 char *ptr, *entry_name;
2332 struct dentry *parent;
2333 struct dentry *child = ERR_PTR(-EINVAL);
2340 parent = dget(root);
2341 while ((ptr = strchr(ptr, '/')) != NULL) {
2343 child = pcc_mkdir(parent, entry_name, mode);
2357 /* Create file under base. If file already exist, return failure */
2358 static struct dentry *
2359 pcc_create(struct dentry *base, const char *name, umode_t mode)
2362 struct dentry *dentry;
2363 struct inode *dir = base->d_inode;
2366 dentry = lookup_one_len(name, base, strlen(name));
2370 if (d_is_positive(dentry))
2373 rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
2376 dentry = ERR_PTR(rc);
2384 static int __pcc_inode_create(struct pcc_dataset *dataset,
2386 struct dentry **dentry)
2389 struct dentry *base;
2390 struct dentry *child;
2393 OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
2397 pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
2399 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2405 snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
2406 child = pcc_create(base, path, 0);
2407 if (IS_ERR(child)) {
2408 rc = PTR_ERR(child);
2416 OBD_FREE(path, PCC_DATASET_MAX_PATH);
2421 * Reset uid, gid or size for the PCC copy masked by @valid.
2422 * TODO: Set the project ID for PCC copy.
2424 static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2425 kuid_t uid, kgid_t gid, loff_t size)
2427 struct inode *inode = dentry->d_inode;
2433 attr.ia_valid = valid;
2436 attr.ia_size = size;
2439 rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
2440 inode_unlock(inode);
2445 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2446 struct lu_fid *fid, struct dentry **pcc_dentry)
2448 const struct cred *old_cred;
2451 old_cred = override_creds(pcc_super_cred(sb));
2452 rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2453 revert_creds(old_cred);
2457 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2459 struct dentry *pcc_dentry = pca->pca_dentry;
2460 const struct cred *old_cred;
2461 struct pcc_super *super;
2462 struct pcc_inode *pcci;
2467 if (!pca->pca_dataset)
2471 GOTO(out_dataset_put, rc = 0);
2473 super = ll_i2pccs(inode);
2475 LASSERT(pcc_dentry);
2477 old_cred = override_creds(super->pccs_cred);
2478 pcc_inode_lock(inode);
2479 LASSERT(ll_i2pcci(inode) == NULL);
2480 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2482 GOTO(out_put, rc = -ENOMEM);
2484 rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2485 old_cred->suid, old_cred->sgid, 0);
2489 pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2490 pcci, pcc_dentry, LU_PCC_READWRITE);
2492 rc = pcc_layout_xattr_set(pcci, 0);
2494 if (!pcci->pcci_unlinked)
2495 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2496 pcc_inode_put(pcci);
2497 GOTO(out_unlock, rc);
2500 /* Set the layout generation of newly created file with 0 */
2501 pcc_layout_gen_set(pcci, 0);
2505 (void) pcc_inode_remove(inode, pcc_dentry);
2509 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2512 pcc_inode_unlock(inode);
2513 revert_creds(old_cred);
2515 pcc_dataset_put(pca->pca_dataset);
2519 void pcc_create_attach_cleanup(struct super_block *sb,
2520 struct pcc_create_attach *pca)
2522 if (!pca->pca_dataset)
2525 if (pca->pca_dentry) {
2526 struct dentry *parent;
2527 struct inode *i_dir;
2528 const struct cred *old_cred;
2531 old_cred = override_creds(pcc_super_cred(sb));
2532 parent = dget_parent(pca->pca_dentry);
2533 i_dir = d_inode(parent);
2534 rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry);
2537 CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
2538 ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
2539 /* ignore the unlink failure */
2540 revert_creds(old_cred);
2541 dput(pca->pca_dentry);
2544 pcc_dataset_put(pca->pca_dataset);
2547 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2553 size = cfs_kernel_write(filp, buf, count, offset);
2562 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2566 loff_t pos, offset = 0;
2567 size_t buf_len = 1048576;
2572 #ifdef FMODE_CAN_READ
2573 /* Need to add FMODE_CAN_READ flags here, otherwise the check in
2574 * kernel_read() during open() for auto PCC-RO attach will fail.
2576 if ((src->f_mode & FMODE_READ) &&
2577 likely(src->f_op->read || src->f_op->read_iter))
2578 src->f_mode |= FMODE_CAN_READ;
2581 OBD_ALLOC_LARGE(buf, buf_len);
2586 if (signal_pending(current))
2587 GOTO(out_free, rc = -EINTR);
2590 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2592 GOTO(out_free, rc = rc2);
2597 rc = pcc_filp_write(dst, buf, rc2, &pos);
2605 OBD_FREE_LARGE(buf, buf_len);
2609 static int pcc_attach_allowed_check(struct inode *inode)
2611 struct ll_inode_info *lli = ll_i2info(inode);
2612 struct pcc_inode *pcci;
2617 pcc_inode_lock(inode);
2618 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2619 GOTO(out_unlock, rc = -EBUSY);
2621 pcci = ll_i2pcci(inode);
2622 if (pcci && pcc_inode_has_layout(pcci))
2623 GOTO(out_unlock, rc = -EEXIST);
2625 lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2627 pcc_inode_unlock(inode);
2631 static int pcc_attach_data_archive(struct file *file, struct inode *inode,
2632 struct pcc_dataset *dataset,
2633 struct dentry **dentry)
2635 const struct cred *old_cred;
2636 struct file *pcc_filp;
2637 bool direct = false;
2644 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2645 rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
2649 path.mnt = dataset->pccd_path.mnt;
2650 path.dentry = *dentry;
2651 pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2652 if (IS_ERR_OR_NULL(pcc_filp)) {
2653 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2654 GOTO(out_dentry, rc);
2657 rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
2658 old_cred->uid, old_cred->gid, 0);
2663 * When attach a file at file open() time with direct I/O mode, the
2664 * data copy from Lustre OSTs to PCC copy in kernel will report
2665 * -EFAULT error as the buffer is allocated in the kernel space, not
2666 * from the user space.
2667 * Thus it needs to unmask O_DIRECT flag from the file handle during
2668 * data copy. After finished data copying, restore the flag in the
2671 if (file->f_flags & O_DIRECT) {
2672 file->f_flags &= ~O_DIRECT;
2676 ret = pcc_copy_data(file, pcc_filp);
2678 file->f_flags |= O_DIRECT;
2680 GOTO(out_fput, rc = ret);
2683 * It must to truncate the PCC copy to the same size of the Lustre
2684 * copy after copy data. Otherwise, it may get wrong file size after
2685 * re-attach a file. See LU-13023 for details.
2687 rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
2688 KGIDT_INIT(0), ret);
2693 pcc_inode_remove(inode, *dentry);
2697 revert_creds(old_cred);
2701 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2704 struct pcc_dataset *dataset;
2705 struct ll_inode_info *lli = ll_i2info(inode);
2706 struct pcc_super *super = ll_i2pccs(inode);
2707 struct pcc_inode *pcci;
2708 struct dentry *dentry;
2713 rc = pcc_attach_allowed_check(inode);
2717 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2718 LU_PCC_READWRITE, archive_id);
2719 if (dataset == NULL)
2722 rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2724 GOTO(out_dataset_put, rc);
2726 /* Pause to allow for a race with concurrent HSM remove */
2727 CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2729 pcc_inode_lock(inode);
2730 pcci = ll_i2pcci(inode);
2732 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2734 GOTO(out_unlock, rc = -ENOMEM);
2736 pcc_inode_attach_set(super, dataset, lli, pcci,
2737 dentry, LU_PCC_READWRITE);
2739 pcc_inode_unlock(inode);
2741 const struct cred *old_cred;
2743 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2744 (void) pcc_inode_remove(inode, dentry);
2745 revert_creds(old_cred);
2749 pcc_dataset_put(dataset);
2753 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2754 __u32 gen, bool lease_broken, int rc,
2757 struct ll_inode_info *lli = ll_i2info(inode);
2758 const struct cred *old_cred;
2759 struct pcc_inode *pcci;
2764 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2765 pcc_inode_lock(inode);
2766 pcci = ll_i2pcci(inode);
2767 if (rc || lease_broken) {
2768 if (attached && pcci)
2769 pcc_inode_put(pcci);
2771 GOTO(out_unlock, rc);
2774 /* PCC inode may be released due to layout lock revocatioin */
2776 GOTO(out_unlock, rc = -ESTALE);
2779 rc = pcc_layout_xattr_set(pcci, gen);
2783 LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2784 rc = ll_layout_refresh(inode, &gen2);
2787 pcc_layout_gen_set(pcci, gen);
2790 DFID" layout changed from %d to %d.\n",
2791 PFID(ll_inode2fid(inode)), gen, gen2);
2792 GOTO(out_put, rc = -ESTALE);
2798 if (!pcci->pcci_unlinked)
2799 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2800 pcc_inode_put(pcci);
2803 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2804 pcc_inode_unlock(inode);
2805 revert_creds(old_cred);
2809 static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
2812 struct ll_inode_info *lli = ll_i2info(inode);
2813 struct lu_extent ext = {
2815 .e_end = OBD_OBJECT_EOF,
2817 struct cl_layout clt = {
2819 .cl_is_released = false,
2820 .cl_is_rdonly = false,
2828 rc = pcc_get_layout_info(inode, &clt);
2833 * For the HSM released file, restore the data first.
2835 if (clt.cl_is_released) {
2840 if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
2841 rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2843 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2844 PFID(&lli->lli_fid), rc);
2848 rc = ll_layout_refresh(inode, gen);
2856 if (!clt.cl_is_rdonly) {
2857 rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
2862 rc = ll_layout_refresh(inode, gen);
2865 } else { /* Readonly layout */
2866 *gen = clt.cl_layout_gen;
2872 static void pcc_readonly_attach_fini(struct inode *inode)
2874 pcc_inode_lock(inode);
2875 ll_i2info(inode)->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2876 pcc_inode_unlock(inode);
2879 static int pcc_readonly_ioctl_attach(struct file *file,
2880 struct inode *inode,
2883 struct ll_sb_info *sbi = ll_i2sbi(inode);
2884 struct pcc_super *super = ll_i2pccs(inode);
2885 struct ll_inode_info *lli = ll_i2info(inode);
2886 const struct cred *old_cred;
2887 struct pcc_dataset *dataset;
2888 struct pcc_inode *pcci;
2889 struct dentry *dentry;
2890 bool attached = false;
2891 bool unlinked = false;
2897 if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
2898 RETURN(-EOPNOTSUPP);
2900 rc = pcc_attach_allowed_check(inode);
2904 rc = pcc_layout_rdonly_set(inode, &gen);
2908 dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
2909 LU_PCC_READONLY, roid);
2910 if (dataset == NULL)
2911 GOTO(out_fini, rc = -ENOENT);
2913 rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2915 GOTO(out_dataset_put, rc);
2917 mutex_lock(&lli->lli_layout_mutex);
2918 pcc_inode_lock(inode);
2919 old_cred = override_creds(super->pccs_cred);
2920 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2921 if (gen != ll_layout_version_get(lli))
2922 GOTO(out_put_unlock, rc = -ESTALE);
2924 pcci = ll_i2pcci(inode);
2926 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2928 GOTO(out_put_unlock, rc = -ENOMEM);
2930 pcc_inode_attach_set(super, dataset, lli, pcci,
2931 dentry, LU_PCC_READONLY);
2933 atomic_inc(&pcci->pcci_refcount);
2934 path_put(&pcci->pcci_path);
2935 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
2936 pcci->pcci_path.dentry = dentry;
2937 pcci->pcci_type = LU_PCC_READONLY;
2940 rc = pcc_layout_xattr_set(pcci, gen);
2942 pcci->pcci_type = LU_PCC_NONE;
2943 unlinked = pcci->pcci_unlinked;
2944 GOTO(out_put_unlock, rc);
2947 pcc_layout_gen_set(pcci, gen);
2951 (void) pcc_inode_remove(inode, dentry);
2953 pcc_inode_put(pcci);
2957 revert_creds(old_cred);
2958 pcc_inode_unlock(inode);
2959 mutex_unlock(&lli->lli_layout_mutex);
2961 pcc_dataset_put(dataset);
2963 pcc_readonly_attach_fini(inode);
2968 int pcc_ioctl_attach(struct file *file, struct inode *inode,
2969 struct lu_pcc_attach *attach)
2975 switch (attach->pcca_type) {
2976 case LU_PCC_READWRITE:
2979 case LU_PCC_READONLY:
2980 rc = pcc_readonly_ioctl_attach(file, inode,
2991 static int pcc_hsm_remove(struct inode *inode)
2993 struct hsm_user_request *hur;
3000 rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
3002 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
3003 PFID(&ll_i2info(inode)->lli_fid), rc);
3007 ll_layout_refresh(inode, &gen);
3009 len = sizeof(struct hsm_user_request) +
3010 sizeof(struct hsm_user_item);
3011 OBD_ALLOC(hur, len);
3015 hur->hur_request.hr_action = HUA_REMOVE;
3016 hur->hur_request.hr_archive_id = 0;
3017 hur->hur_request.hr_flags = 0;
3018 memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
3019 sizeof(hur->hur_user_item[0].hui_fid));
3020 hur->hur_user_item[0].hui_extent.offset = 0;
3021 hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
3022 hur->hur_request.hr_itemcount = 1;
3023 rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
3026 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
3027 PFID(&ll_i2info(inode)->lli_fid), rc);
3033 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
3035 struct ll_inode_info *lli = ll_i2info(inode);
3036 struct pcc_inode *pcci;
3037 const struct cred *old_cred;
3038 bool hsm_remove = false;
3043 pcc_inode_lock(inode);
3044 pcci = lli->lli_pcc_inode;
3045 if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
3046 !pcc_inode_has_layout(pcci))
3047 GOTO(out_unlock, rc = 0);
3049 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
3051 if (pcci->pcci_type == LU_PCC_READWRITE) {
3052 if (opt == PCC_DETACH_OPT_UNCACHE) {
3055 * The file will be removed from PCC, set the flags
3056 * with PCC_DATASET_NONE even the later removal of the
3059 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
3062 __pcc_layout_invalidate(pcci);
3063 pcc_inode_put(pcci);
3064 } else if (pcci->pcci_type == LU_PCC_READONLY) {
3065 __pcc_layout_invalidate(pcci);
3067 if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
3068 old_cred = override_creds(pcc_super_cred(inode->i_sb));
3069 rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
3070 revert_creds(old_cred);
3072 pcci->pcci_unlinked = true;
3075 pcc_inode_put(pcci);
3081 pcc_inode_unlock(inode);
3083 old_cred = override_creds(pcc_super_cred(inode->i_sb));
3084 rc = pcc_hsm_remove(inode);
3085 revert_creds(old_cred);
3091 int pcc_ioctl_state(struct file *file, struct inode *inode,
3092 struct lu_pcc_state *state)
3098 int buf_len = sizeof(state->pccs_path);
3099 struct ll_file_data *fd = file->private_data;
3100 struct pcc_file *pccf = &fd->fd_pcc_file;
3101 struct pcc_inode *pcci;
3108 OBD_ALLOC(buf, buf_len);
3112 pcc_inode_lock(inode);
3113 pcci = ll_i2pcci(inode);
3115 state->pccs_type = LU_PCC_NONE;
3116 GOTO(out_unlock, rc = 0);
3119 count = atomic_read(&pcci->pcci_refcount);
3121 state->pccs_type = LU_PCC_NONE;
3122 state->pccs_open_count = 0;
3123 GOTO(out_unlock, rc = 0);
3126 if (pcc_inode_has_layout(pcci))
3128 if (pccf->pccf_file != NULL)
3130 state->pccs_type = pcci->pcci_type;
3131 state->pccs_open_count = count;
3132 state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
3133 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
3135 GOTO(out_unlock, rc = PTR_ERR(path));
3137 if (strscpy(state->pccs_path, path, buf_len) < 0)
3138 GOTO(out_unlock, rc = -ENAMETOOLONG);
3141 pcc_inode_unlock(inode);
3142 OBD_FREE(buf, buf_len);