4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 int pcc_super_init(struct pcc_super *super)
119 super->pccs_cred = cred = prepare_creds();
123 /* Never override disk quota limits or use reserved space */
124 cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125 init_rwsem(&super->pccs_rw_sem);
126 INIT_LIST_HEAD(&super->pccs_datasets);
131 /* Rule based auto caching */
132 static void pcc_id_list_free(struct list_head *id_list)
134 struct pcc_match_id *id, *n;
136 list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
137 list_del_init(&id->pmi_linkage);
142 static void pcc_fname_list_free(struct list_head *fname_list)
144 struct pcc_match_fname *fname, *n;
146 list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
147 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
148 list_del_init(&fname->pmf_linkage);
153 static void pcc_expression_free(struct pcc_expression *expr)
155 LASSERT(expr->pe_field >= PCC_FIELD_UID &&
156 expr->pe_field < PCC_FIELD_MAX);
157 switch (expr->pe_field) {
160 case PCC_FIELD_PROJID:
161 pcc_id_list_free(&expr->pe_cond);
163 case PCC_FIELD_FNAME:
164 pcc_fname_list_free(&expr->pe_cond);
172 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
174 struct pcc_expression *expression, *n;
176 LASSERT(list_empty(&conjunction->pc_linkage));
177 list_for_each_entry_safe(expression, n,
178 &conjunction->pc_expressions,
180 list_del_init(&expression->pe_linkage);
181 pcc_expression_free(expression);
183 OBD_FREE_PTR(conjunction);
186 static void pcc_rule_conds_free(struct list_head *cond_list)
188 struct pcc_conjunction *conjunction, *n;
190 list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
191 list_del_init(&conjunction->pc_linkage);
192 pcc_conjunction_free(conjunction);
196 static void pcc_cmd_fini(struct pcc_cmd *cmd)
198 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
199 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
200 pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
201 if (cmd->u.pccc_add.pccc_conds_str)
202 OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
203 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
207 #define PCC_DISJUNCTION_DELIM (',')
208 #define PCC_CONJUNCTION_DELIM ('&')
209 #define PCC_EXPRESSION_DELIM ('=')
212 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
214 struct pcc_match_fname *fname;
216 OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
220 OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
221 if (fname->pmf_name == NULL) {
222 OBD_FREE(fname, sizeof(struct pcc_match_fname));
226 memcpy(fname->pmf_name, id->ls_str, id->ls_len);
227 list_add_tail(&fname->pmf_linkage, fname_list);
232 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
242 INIT_LIST_HEAD(fname_list);
244 rc = cfs_gettok(&src, ' ', &res);
249 rc = pcc_fname_list_add(&res, fname_list);
254 pcc_fname_list_free(fname_list);
259 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
268 if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
269 type != PCC_FIELD_PROJID)
274 INIT_LIST_HEAD(id_list);
276 struct pcc_match_id *id;
279 if (cfs_gettok(&src, ' ', &res) == 0)
280 GOTO(out, rc = -EINVAL);
282 if (!cfs_str2num_check(res.ls_str, res.ls_len,
283 &id_val, 0, (u32)~0U))
284 GOTO(out, rc = -EINVAL);
288 GOTO(out, rc = -ENOMEM);
291 list_add_tail(&id->pmi_linkage, id_list);
295 pcc_id_list_free(id_list);
300 pcc_check_field(struct cfs_lstr *field, char *str)
302 int len = strlen(str);
304 return (field->ls_len == len &&
305 strncmp(field->ls_str, str, len) == 0);
309 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
311 struct pcc_expression *expr;
312 struct cfs_lstr field;
315 OBD_ALLOC(expr, sizeof(struct pcc_expression));
319 rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
320 if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
321 src->ls_str[src->ls_len - 1] != '}')
322 GOTO(out, rc = -EINVAL);
324 /* Skip '{' and '}' */
328 if (pcc_check_field(&field, "uid")) {
329 if (pcc_id_list_parse(src->ls_str,
333 GOTO(out, rc = -EINVAL);
334 expr->pe_field = PCC_FIELD_UID;
335 } else if (pcc_check_field(&field, "gid")) {
336 if (pcc_id_list_parse(src->ls_str,
340 GOTO(out, rc = -EINVAL);
341 expr->pe_field = PCC_FIELD_GID;
342 } else if (pcc_check_field(&field, "projid")) {
343 if (pcc_id_list_parse(src->ls_str,
346 PCC_FIELD_PROJID) < 0)
347 GOTO(out, rc = -EINVAL);
348 expr->pe_field = PCC_FIELD_PROJID;
349 } else if (pcc_check_field(&field, "fname")) {
350 if (pcc_fname_list_parse(src->ls_str,
353 GOTO(out, rc = -EINVAL);
354 expr->pe_field = PCC_FIELD_FNAME;
356 GOTO(out, rc = -EINVAL);
359 list_add_tail(&expr->pe_linkage, cond_list);
367 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
369 struct pcc_conjunction *conjunction;
370 struct cfs_lstr expr;
373 OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
374 if (conjunction == NULL)
377 INIT_LIST_HEAD(&conjunction->pc_expressions);
378 list_add_tail(&conjunction->pc_linkage, cond_list);
380 while (src->ls_str) {
381 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
386 rc = pcc_expression_parse(&expr,
387 &conjunction->pc_expressions);
394 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
402 INIT_LIST_HEAD(cond_list);
404 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
409 rc = pcc_conjunction_parse(&res, cond_list);
416 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
420 OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
421 if (cmd->u.pccc_add.pccc_conds_str == NULL)
424 memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
426 rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
427 strlen(cmd->u.pccc_add.pccc_conds_str),
428 &cmd->u.pccc_add.pccc_conds);
436 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
443 key = strsep(&val, "=");
444 if (val == NULL || strlen(val) == 0)
447 /* Key of the value pair */
448 if (strcmp(key, "rwid") == 0) {
449 rc = kstrtoul(val, 10, &id);
454 cmd->u.pccc_add.pccc_rwid = id;
455 } else if (strcmp(key, "roid") == 0) {
456 rc = kstrtoul(val, 10, &id);
461 cmd->u.pccc_add.pccc_roid = id;
462 } else if (strcmp(key, "auto_attach") == 0) {
463 rc = kstrtoul(val, 10, &id);
467 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
468 } else if (strcmp(key, "open_attach") == 0) {
469 rc = kstrtoul(val, 10, &id);
473 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
474 } else if (strcmp(key, "io_attach") == 0) {
475 rc = kstrtoul(val, 10, &id);
479 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
480 } else if (strcmp(key, "stat_attach") == 0) {
481 rc = kstrtoul(val, 10, &id);
485 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
486 } else if (strcmp(key, "rwpcc") == 0) {
487 rc = kstrtoul(val, 10, &id);
491 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
492 } else if (strcmp(key, "ropcc") == 0) {
493 rc = kstrtoul(val, 10, &id);
497 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
506 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
512 switch (cmd->pccc_cmd) {
513 case PCC_ADD_DATASET:
514 /* Enable auto attach by default */
515 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
517 case PCC_DEL_DATASET:
525 while (val != NULL && strlen(val) != 0) {
526 token = strsep(&val, " ");
527 rc = pcc_parse_value_pair(cmd, token);
532 switch (cmd->pccc_cmd) {
533 case PCC_ADD_DATASET:
534 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
535 cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
538 * By default, a PCC backend can provide caching service for
539 * both RW-PCC and RO-PCC.
541 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
542 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
544 case PCC_DEL_DATASET:
554 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
556 if (!list_empty(&rule->pmr_conds))
557 pcc_rule_conds_free(&rule->pmr_conds);
558 LASSERT(rule->pmr_conds_str != NULL);
559 OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
563 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
567 LASSERT(cmd->u.pccc_add.pccc_conds_str);
568 OBD_ALLOC(rule->pmr_conds_str,
569 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
570 if (rule->pmr_conds_str == NULL)
573 memcpy(rule->pmr_conds_str,
574 cmd->u.pccc_add.pccc_conds_str,
575 strlen(cmd->u.pccc_add.pccc_conds_str));
577 INIT_LIST_HEAD(&rule->pmr_conds);
578 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
579 rc = pcc_conds_parse(rule->pmr_conds_str,
580 strlen(rule->pmr_conds_str),
584 pcc_dataset_rule_fini(rule);
591 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
593 struct pcc_match_id *id;
595 list_for_each_entry(id, id_list, pmi_linkage) {
596 if (id->pmi_id == id_val)
603 cfs_match_wildcard(const char *pattern, const char *content)
605 if (*pattern == '\0' && *content == '\0')
608 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
611 while (*pattern == *content) {
614 if (*pattern == '\0' && *content == '\0')
617 if (*pattern == '*' && *(pattern + 1) != '\0' &&
623 return (cfs_match_wildcard(pattern + 1, content) ||
624 cfs_match_wildcard(pattern, content + 1));
630 pcc_fname_list_match(struct list_head *fname_list, const char *name)
632 struct pcc_match_fname *fname;
634 list_for_each_entry(fname, fname_list, pmf_linkage) {
635 if (cfs_match_wildcard(fname->pmf_name, name))
642 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
644 switch (expr->pe_field) {
646 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
648 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
649 case PCC_FIELD_PROJID:
650 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
651 case PCC_FIELD_FNAME:
652 return pcc_fname_list_match(&expr->pe_cond,
653 matcher->pm_name->name);
660 pcc_conjunction_match(struct pcc_conjunction *conjunction,
661 struct pcc_matcher *matcher)
663 struct pcc_expression *expr;
666 list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
667 matched = pcc_expression_match(expr, matcher);
676 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
678 struct pcc_conjunction *conjunction;
681 list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
682 matched = pcc_conjunction_match(conjunction, matcher);
691 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
693 struct pcc_dataset *dataset;
694 struct pcc_dataset *selected = NULL;
696 down_read(&super->pccs_rw_sem);
697 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
698 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
701 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
702 atomic_inc(&dataset->pccd_refcount);
707 up_read(&super->pccs_rw_sem);
709 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
710 dataset->pccd_rule.pmr_conds_str,
711 matcher->pm_uid, matcher->pm_gid,
712 matcher->pm_projid, matcher->pm_name->name);
718 * pcc_dataset_add - Add a Cache policy to control which files need be
719 * cached and where it will be cached.
721 * @super: superblock of pcc
725 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
727 char *pathname = cmd->pccc_pathname;
728 struct pcc_dataset *dataset;
729 struct pcc_dataset *tmp;
733 OBD_ALLOC_PTR(dataset);
737 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
739 OBD_FREE_PTR(dataset);
742 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
743 dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
744 dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
745 dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
746 atomic_set(&dataset->pccd_refcount, 1);
748 rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
750 pcc_dataset_put(dataset);
754 down_write(&super->pccs_rw_sem);
755 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
756 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
757 (dataset->pccd_rwid != 0 &&
758 dataset->pccd_rwid == tmp->pccd_rwid) ||
759 (dataset->pccd_roid != 0 &&
760 dataset->pccd_roid == tmp->pccd_roid)) {
766 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
767 up_write(&super->pccs_rw_sem);
770 pcc_dataset_put(dataset);
778 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
780 struct pcc_dataset *dataset;
781 struct pcc_dataset *selected = NULL;
787 * archive ID (read-write ID) or read-only ID is unique in the list,
788 * we just return last added one as first priority.
790 down_read(&super->pccs_rw_sem);
791 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
792 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
793 !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
795 atomic_inc(&dataset->pccd_refcount);
799 up_read(&super->pccs_rw_sem);
801 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
807 pcc_dataset_put(struct pcc_dataset *dataset)
809 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
810 pcc_dataset_rule_fini(&dataset->pccd_rule);
811 path_put(&dataset->pccd_path);
812 OBD_FREE_PTR(dataset);
817 pcc_dataset_del(struct pcc_super *super, char *pathname)
819 struct list_head *l, *tmp;
820 struct pcc_dataset *dataset;
823 down_write(&super->pccs_rw_sem);
824 list_for_each_safe(l, tmp, &super->pccs_datasets) {
825 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
826 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
827 list_del_init(&dataset->pccd_linkage);
828 pcc_dataset_put(dataset);
833 up_write(&super->pccs_rw_sem);
838 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
840 seq_printf(m, "%s:\n", dataset->pccd_pathname);
841 seq_printf(m, " rwid: %u\n", dataset->pccd_rwid);
842 seq_printf(m, " flags: %x\n", dataset->pccd_flags);
843 seq_printf(m, " autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
847 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
849 struct pcc_dataset *dataset;
851 down_read(&super->pccs_rw_sem);
852 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
853 pcc_dataset_dump(dataset, m);
855 up_read(&super->pccs_rw_sem);
859 static void pcc_remove_datasets(struct pcc_super *super)
861 struct pcc_dataset *dataset, *tmp;
863 down_write(&super->pccs_rw_sem);
864 list_for_each_entry_safe(dataset, tmp,
865 &super->pccs_datasets, pccd_linkage) {
866 list_del(&dataset->pccd_linkage);
867 pcc_dataset_put(dataset);
869 up_write(&super->pccs_rw_sem);
872 void pcc_super_fini(struct pcc_super *super)
874 pcc_remove_datasets(super);
875 put_cred(super->pccs_cred);
878 static bool pathname_is_valid(const char *pathname)
880 /* Needs to be absolute path */
881 if (pathname == NULL || strlen(pathname) == 0 ||
882 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
887 static struct pcc_cmd *
888 pcc_cmd_parse(char *buffer, unsigned long count)
890 static struct pcc_cmd *cmd;
897 GOTO(out, rc = -ENOMEM);
899 /* clear all setting */
900 if (strncmp(buffer, "clear", 5) == 0) {
901 cmd->pccc_cmd = PCC_CLEAR_ALL;
906 token = strsep(&val, " ");
907 if (val == NULL || strlen(val) == 0)
908 GOTO(out_free_cmd, rc = -EINVAL);
910 /* Type of the command */
911 if (strcmp(token, "add") == 0)
912 cmd->pccc_cmd = PCC_ADD_DATASET;
913 else if (strcmp(token, "del") == 0)
914 cmd->pccc_cmd = PCC_DEL_DATASET;
916 GOTO(out_free_cmd, rc = -EINVAL);
918 /* Pathname of the dataset */
919 token = strsep(&val, " ");
920 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
921 !pathname_is_valid(token))
922 GOTO(out_free_cmd, rc = -EINVAL);
923 cmd->pccc_pathname = token;
925 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
929 val = strrchr(token, '}');
931 GOTO(out_free_cmd, rc = -EINVAL);
937 } else if (*val == ' ') {
941 GOTO(out_free_cmd, rc = -EINVAL);
944 rc = pcc_id_parse(cmd, token);
946 GOTO(out_free_cmd, rc);
948 rc = pcc_parse_value_pairs(cmd, val);
950 GOTO(out_cmd_fini, rc = -EINVAL);
963 int pcc_cmd_handle(char *buffer, unsigned long count,
964 struct pcc_super *super)
969 cmd = pcc_cmd_parse(buffer, count);
973 switch (cmd->pccc_cmd) {
974 case PCC_ADD_DATASET:
975 rc = pcc_dataset_add(super, cmd);
977 case PCC_DEL_DATASET:
978 rc = pcc_dataset_del(super, cmd->pccc_pathname);
981 pcc_remove_datasets(super);
993 static inline void pcc_inode_lock(struct inode *inode)
995 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
998 static inline void pcc_inode_unlock(struct inode *inode)
1000 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1003 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1005 pcci->pcci_lli = lli;
1006 lli->lli_pcc_inode = pcci;
1007 atomic_set(&pcci->pcci_refcount, 0);
1008 pcci->pcci_type = LU_PCC_NONE;
1009 pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1010 atomic_set(&pcci->pcci_active_ios, 0);
1011 init_waitqueue_head(&pcci->pcci_waitq);
1014 static void pcc_inode_fini(struct pcc_inode *pcci)
1016 struct ll_inode_info *lli = pcci->pcci_lli;
1018 path_put(&pcci->pcci_path);
1019 pcci->pcci_type = LU_PCC_NONE;
1020 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1021 lli->lli_pcc_inode = NULL;
1024 static void pcc_inode_get(struct pcc_inode *pcci)
1026 atomic_inc(&pcci->pcci_refcount);
1029 static void pcc_inode_put(struct pcc_inode *pcci)
1031 if (atomic_dec_and_test(&pcci->pcci_refcount))
1032 pcc_inode_fini(pcci);
1035 void pcc_inode_free(struct inode *inode)
1037 struct pcc_inode *pcci = ll_i2pcci(inode);
1040 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1041 pcc_inode_put(pcci);
1047 * As Andreas suggested, we'd better use new layout to
1049 * (fid->f_oid >> 16 & oxFFFF)/FID
1051 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1052 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1054 return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1056 (fid)->f_oid & 0xFFFF,
1057 (fid)->f_oid >> 16 & 0xFFFF,
1058 (unsigned int)((fid)->f_seq & 0xFFFF),
1059 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1060 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1061 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1065 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1067 return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1070 void pcc_file_init(struct pcc_file *pccf)
1072 pccf->pccf_file = NULL;
1073 pccf->pccf_type = LU_PCC_NONE;
1076 static inline bool pcc_auto_attach_enabled(struct pcc_dataset *dataset)
1078 return dataset->pccd_flags & PCC_DATASET_AUTO_ATTACH;
1081 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1083 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1085 struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1086 struct ll_inode_info *lli = pcci->pcci_lli;
1091 if (!(lli->lli_pcc_state & PCC_STATE_FL_AUTO_ATTACH))
1094 #ifndef HAVE_VFS_SETXATTR
1095 if (!pcc_dentry->d_inode->i_op->setxattr)
1098 rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
1099 &gen, sizeof(gen), 0);
1101 rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1102 &gen, sizeof(gen), 0);
1107 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1110 struct ll_inode_info *lli = ll_i2info(inode);
1119 env = cl_env_get(&refcheck);
1121 RETURN(PTR_ERR(env));
1123 rc = cl_object_layout_get(env, lli->lli_clob, clt);
1125 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1126 PFID(ll_inode2fid(inode)));
1128 cl_env_put(env, &refcheck);
1132 static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
1133 struct pcc_dataset *dataset)
1135 return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
1137 dataset->pccd_pathname,
1138 (fid)->f_oid & 0xFFFF,
1139 (fid)->f_oid >> 16 & 0xFFFF,
1140 (unsigned int)((fid)->f_seq & 0xFFFF),
1141 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1142 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1143 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1147 /* Must be called with pcci->pcci_lock held */
1148 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1149 struct pcc_inode *pcci,
1150 struct dentry *dentry,
1151 enum lu_pcc_type type)
1153 struct ll_inode_info *lli = pcci->pcci_lli;
1155 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1156 pcci->pcci_path.dentry = dentry;
1157 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1158 atomic_set(&pcci->pcci_refcount, 1);
1159 pcci->pcci_type = type;
1160 pcci->pcci_attr_valid = false;
1162 if (dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH)
1163 lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
1164 if (dataset->pccd_flags & PCC_DATASET_IO_ATTACH)
1165 lli->lli_pcc_state |= PCC_STATE_FL_IO_ATTACH;
1166 if (dataset->pccd_flags & PCC_DATASET_STAT_ATTACH)
1167 lli->lli_pcc_state |= PCC_STATE_FL_STAT_ATTACH;
1170 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1173 pcci->pcci_layout_gen = gen;
1176 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1178 return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1181 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1182 enum lu_pcc_type type,
1183 struct pcc_dataset *dataset,
1186 struct ll_inode_info *lli = ll_i2info(inode);
1187 struct pcc_inode *pcci = lli->lli_pcc_inode;
1188 const struct cred *old_cred;
1189 struct dentry *pcc_dentry;
1197 if (type == LU_PCC_READWRITE &&
1198 !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1201 OBD_ALLOC(pathname, PATH_MAX);
1202 if (pathname == NULL)
1205 pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
1207 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1208 rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
1210 /* ignore this error */
1213 pcc_dentry = path.dentry;
1214 #ifndef HAVE_VFS_SETXATTR
1215 if (!pcc_dentry->d_inode->i_op->getxattr)
1216 /* ignore this error */
1217 GOTO(out_put_path, rc = 0);
1219 rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
1220 &pcc_gen, sizeof(pcc_gen));
1222 rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1223 &pcc_gen, sizeof(pcc_gen));
1227 /* ignore this error */
1228 GOTO(out_put_path, rc = 0);
1231 /* The file is still valid cached in PCC, attach it immediately. */
1232 if (pcc_gen == gen) {
1233 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1234 PFID(&lli->lli_fid), gen);
1236 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1238 GOTO(out_put_path, rc = -ENOMEM);
1240 pcc_inode_init(pcci, lli);
1242 pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1245 * This happened when a file was once attached into
1246 * PCC, and some processes keep this file opened
1247 * (pcci->refcount > 1) and corresponding PCC file
1248 * without any I/O activity, and then this file was
1249 * detached by the manual detach command or the
1250 * revocation of the layout lock (i.e. cached LRU lock
1253 pcc_inode_get(pcci);
1254 pcci->pcci_type = type;
1256 pcc_layout_gen_set(pcci, gen);
1262 revert_creds(old_cred);
1263 OBD_FREE(pathname, PATH_MAX);
1267 static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
1268 enum lu_pcc_type type, bool *cached)
1270 struct pcc_dataset *dataset, *tmp;
1271 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1276 down_read(&super->pccs_rw_sem);
1277 list_for_each_entry_safe(dataset, tmp,
1278 &super->pccs_datasets, pccd_linkage) {
1279 if (!pcc_auto_attach_enabled(dataset))
1281 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1282 if (rc < 0 || (!rc && *cached))
1285 up_read(&super->pccs_rw_sem);
1290 static int pcc_try_auto_attach(struct inode *inode, bool *cached, bool is_open)
1292 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1293 struct cl_layout clt = {
1295 .cl_is_released = false,
1297 struct ll_inode_info *lli = ll_i2info(inode);
1304 * Quick check whether there is PCC device.
1306 if (list_empty(&super->pccs_datasets))
1310 * The file layout lock was cancelled. And this open does not
1311 * obtain valid layout lock from MDT (i.e. the file is being
1315 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1318 rc = ll_layout_refresh(inode, &gen);
1323 rc = pcc_get_layout_info(inode, &clt);
1327 if (!is_open && gen != clt.cl_layout_gen) {
1328 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1329 PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1333 if (clt.cl_is_released)
1334 rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
1335 LU_PCC_READWRITE, cached);
1340 int pcc_file_open(struct inode *inode, struct file *file)
1342 struct pcc_inode *pcci;
1343 struct ll_inode_info *lli = ll_i2info(inode);
1344 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1345 struct pcc_file *pccf = &fd->fd_pcc_file;
1346 struct file *pcc_file;
1349 bool cached = false;
1354 if (!S_ISREG(inode->i_mode))
1357 pcc_inode_lock(inode);
1358 pcci = ll_i2pcci(inode);
1360 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1361 GOTO(out_unlock, rc = 0);
1363 if (!pcci || !pcc_inode_has_layout(pcci)) {
1364 if (lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH)
1365 rc = pcc_try_auto_attach(inode, &cached, true);
1367 if (rc < 0 || !cached)
1368 GOTO(out_unlock, rc);
1371 pcci = ll_i2pcci(inode);
1374 pcc_inode_get(pcci);
1375 WARN_ON(pccf->pccf_file);
1377 path = &pcci->pcci_path;
1378 dname = &path->dentry->d_name;
1379 CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1382 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1383 pcc_file = dentry_open(path, file->f_flags,
1384 pcc_super_cred(inode->i_sb));
1386 pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1387 pcc_super_cred(inode->i_sb));
1389 if (IS_ERR_OR_NULL(pcc_file)) {
1390 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1391 pcc_inode_put(pcci);
1393 pccf->pccf_file = pcc_file;
1394 pccf->pccf_type = pcci->pcci_type;
1398 pcc_inode_unlock(inode);
1402 void pcc_file_release(struct inode *inode, struct file *file)
1404 struct pcc_inode *pcci;
1405 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1406 struct pcc_file *pccf;
1412 if (!S_ISREG(inode->i_mode) || fd == NULL)
1415 pccf = &fd->fd_pcc_file;
1416 pcc_inode_lock(inode);
1417 if (pccf->pccf_file == NULL)
1420 pcci = ll_i2pcci(inode);
1422 path = &pcci->pcci_path;
1423 dname = &path->dentry->d_name;
1424 CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1426 pcc_inode_put(pcci);
1427 fput(pccf->pccf_file);
1428 pccf->pccf_file = NULL;
1430 pcc_inode_unlock(inode);
1434 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1436 struct ll_inode_info *lli = ll_i2info(inode);
1437 struct pcc_inode *pcci;
1439 pcc_inode_lock(inode);
1440 pcci = ll_i2pcci(inode);
1441 if (pcci && pcc_inode_has_layout(pcci)) {
1442 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1443 atomic_inc(&pcci->pcci_active_ios);
1447 if ((lli->lli_pcc_state & PCC_STATE_FL_IO_ATTACH &&
1448 iot != PIT_GETATTR) ||
1449 (iot == PIT_GETATTR &&
1450 lli->lli_pcc_state & PCC_STATE_FL_STAT_ATTACH)) {
1451 (void) pcc_try_auto_attach(inode, cached, false);
1453 pcci = ll_i2pcci(inode);
1454 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1455 atomic_inc(&pcci->pcci_active_ios);
1459 pcc_inode_unlock(inode);
1462 static void pcc_io_fini(struct inode *inode)
1464 struct pcc_inode *pcci = ll_i2pcci(inode);
1466 LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1467 if (atomic_dec_and_test(&pcci->pcci_active_ios))
1468 wake_up_all(&pcci->pcci_waitq);
1473 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1475 struct file *file = iocb->ki_filp;
1477 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1478 return file->f_op->read_iter(iocb, iter);
1484 iov_for_each(iov, i, *iter) {
1487 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1488 if (-EIOCBQUEUED == res)
1489 res = wait_on_sync_kiocb(iocb);
1497 if (res < iov.iov_len)
1502 iov_iter_advance(iter, bytes);
1507 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1508 struct iov_iter *iter, bool *cached)
1510 struct file *file = iocb->ki_filp;
1511 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1512 struct pcc_file *pccf = &fd->fd_pcc_file;
1513 struct inode *inode = file_inode(file);
1518 if (pccf->pccf_file == NULL) {
1523 pcc_io_init(inode, PIT_READ, cached);
1527 iocb->ki_filp = pccf->pccf_file;
1528 /* generic_file_aio_read does not support ext4-dax,
1529 * __pcc_file_read_iter uses ->aio_read hook directly
1530 * to add support for ext4-dax.
1532 result = __pcc_file_read_iter(iocb, iter);
1533 iocb->ki_filp = file;
1540 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1542 struct file *file = iocb->ki_filp;
1544 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1545 return file->f_op->write_iter(iocb, iter);
1551 iov_for_each(iov, i, *iter) {
1554 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1555 if (-EIOCBQUEUED == res)
1556 res = wait_on_sync_kiocb(iocb);
1564 if (res < iov.iov_len)
1569 iov_iter_advance(iter, bytes);
1574 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1575 struct iov_iter *iter, bool *cached)
1577 struct file *file = iocb->ki_filp;
1578 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1579 struct pcc_file *pccf = &fd->fd_pcc_file;
1580 struct inode *inode = file_inode(file);
1585 if (pccf->pccf_file == NULL) {
1590 if (pccf->pccf_type != LU_PCC_READWRITE) {
1595 pcc_io_init(inode, PIT_WRITE, cached);
1599 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1600 GOTO(out, result = -ENOSPC);
1602 iocb->ki_filp = pccf->pccf_file;
1604 /* Since __pcc_file_write_iter makes write calls via
1605 * the normal vfs interface to the local PCC file system,
1606 * the inode lock is not needed.
1608 result = __pcc_file_write_iter(iocb, iter);
1609 iocb->ki_filp = file;
1615 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1619 const struct cred *old_cred;
1620 struct iattr attr2 = *attr;
1621 struct dentry *pcc_dentry;
1622 struct pcc_inode *pcci;
1626 if (!S_ISREG(inode->i_mode)) {
1631 pcc_io_init(inode, PIT_SETATTR, cached);
1635 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1636 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1637 ATTR_CTIME | ATTR_UID | ATTR_GID);
1638 pcci = ll_i2pcci(inode);
1639 pcc_dentry = pcci->pcci_path.dentry;
1640 inode_lock(pcc_dentry->d_inode);
1641 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1642 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1643 revert_creds(old_cred);
1644 inode_unlock(pcc_dentry->d_inode);
1650 int pcc_inode_getattr(struct inode *inode, bool *cached)
1652 struct ll_inode_info *lli = ll_i2info(inode);
1653 const struct cred *old_cred;
1662 if (!S_ISREG(inode->i_mode)) {
1667 pcc_io_init(inode, PIT_GETATTR, cached);
1671 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1672 rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1673 revert_creds(old_cred);
1677 ll_inode_size_lock(inode);
1678 if (inode->i_atime.tv_sec < lli->lli_atime ||
1679 lli->lli_update_atime) {
1680 inode->i_atime.tv_sec = lli->lli_atime;
1681 lli->lli_update_atime = 0;
1683 inode->i_mtime.tv_sec = lli->lli_mtime;
1684 inode->i_ctime.tv_sec = lli->lli_ctime;
1686 atime = inode->i_atime.tv_sec;
1687 mtime = inode->i_mtime.tv_sec;
1688 ctime = inode->i_ctime.tv_sec;
1690 if (atime < stat.atime.tv_sec)
1691 atime = stat.atime.tv_sec;
1693 if (ctime < stat.ctime.tv_sec)
1694 ctime = stat.ctime.tv_sec;
1696 if (mtime < stat.mtime.tv_sec)
1697 mtime = stat.mtime.tv_sec;
1699 i_size_write(inode, stat.size);
1700 inode->i_blocks = stat.blocks;
1702 inode->i_atime.tv_sec = atime;
1703 inode->i_mtime.tv_sec = mtime;
1704 inode->i_ctime.tv_sec = ctime;
1706 ll_inode_size_unlock(inode);
1712 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1713 struct pipe_inode_info *pipe,
1714 size_t count, unsigned int flags,
1717 struct inode *inode = file_inode(in_file);
1718 struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1719 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1728 if (!file_inode(pcc_file)->i_fop->splice_read)
1731 pcc_io_init(inode, PIT_SPLICE_READ, cached);
1735 result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1743 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1744 int datasync, bool *cached)
1746 struct inode *inode = file_inode(file);
1747 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1748 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1758 pcc_io_init(inode, PIT_FSYNC, cached);
1762 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1763 start, end, datasync);
1769 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1772 struct inode *inode = file_inode(file);
1773 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1774 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1775 struct pcc_inode *pcci;
1780 if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1785 pcc_inode_lock(inode);
1786 pcci = ll_i2pcci(inode);
1787 if (pcci && pcc_inode_has_layout(pcci)) {
1788 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1790 vma->vm_file = pcc_file;
1791 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1792 vma->vm_file = file;
1793 /* Save the vm ops of backend PCC */
1794 vma->vm_private_data = (void *)vma->vm_ops;
1798 pcc_inode_unlock(inode);
1803 void pcc_vm_open(struct vm_area_struct *vma)
1805 struct pcc_inode *pcci;
1806 struct file *file = vma->vm_file;
1807 struct inode *inode = file_inode(file);
1808 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1809 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1810 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1814 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1817 pcc_inode_lock(inode);
1818 pcci = ll_i2pcci(inode);
1819 if (pcci && pcc_inode_has_layout(pcci)) {
1820 vma->vm_file = pcc_file;
1821 pcc_vm_ops->open(vma);
1822 vma->vm_file = file;
1824 pcc_inode_unlock(inode);
1828 void pcc_vm_close(struct vm_area_struct *vma)
1830 struct file *file = vma->vm_file;
1831 struct inode *inode = file_inode(file);
1832 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1833 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1834 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1838 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1841 pcc_inode_lock(inode);
1842 /* Layout lock maybe revoked here */
1843 vma->vm_file = pcc_file;
1844 pcc_vm_ops->close(vma);
1845 vma->vm_file = file;
1846 pcc_inode_unlock(inode);
1850 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1853 struct page *page = vmf->page;
1854 struct mm_struct *mm = vma->vm_mm;
1855 struct file *file = vma->vm_file;
1856 struct inode *inode = file_inode(file);
1857 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1858 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1859 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1864 if (!pcc_file || !pcc_vm_ops) {
1869 if (!pcc_vm_ops->page_mkwrite &&
1870 page->mapping == pcc_file->f_mapping) {
1872 "%s: PCC backend fs not support ->page_mkwrite()\n",
1873 ll_i2sbi(inode)->ll_fsname);
1874 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1875 up_read(&mm->mmap_sem);
1877 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1879 /* Pause to allow for a race with concurrent detach */
1880 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1882 pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
1884 /* This happens when the file is detached from PCC after got
1885 * the fault page via ->fault() on the inode of the PCC copy.
1886 * Here it can not simply fall back to normal Lustre I/O path.
1887 * The reason is that the address space of fault page used by
1888 * ->page_mkwrite() is still the one of PCC inode. In the
1889 * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1890 * handled as the address space of the fault page is not
1891 * consistent with the one of the Lustre inode (though the
1892 * fault page was truncated).
1893 * As the file is detached from PCC, the fault page must
1894 * be released frist, and retry the mmap write (->fault() and
1896 * We use an ugly and tricky method by returning
1897 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1898 * __do_page_fault and retry the memory fault handling.
1900 if (page->mapping == pcc_file->f_mapping) {
1902 up_read(&mm->mmap_sem);
1903 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1910 * This fault injection can also be used to simulate -ENOSPC and
1911 * -EDQUOT failure of underlying PCC backend fs.
1913 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1915 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1916 up_read(&mm->mmap_sem);
1917 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1920 vma->vm_file = pcc_file;
1921 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1922 rc = pcc_vm_ops->page_mkwrite(vmf);
1924 rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1926 vma->vm_file = file;
1932 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1935 struct file *file = vma->vm_file;
1936 struct inode *inode = file_inode(file);
1937 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1938 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1939 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1944 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1949 pcc_io_init(inode, PIT_FAULT, cached);
1953 vma->vm_file = pcc_file;
1954 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1955 rc = pcc_vm_ops->fault(vmf);
1957 rc = pcc_vm_ops->fault(vma, vmf);
1959 vma->vm_file = file;
1965 static void pcc_layout_wait(struct pcc_inode *pcci)
1967 struct l_wait_info lwi = { 0 };
1969 while (atomic_read(&pcci->pcci_active_ios) > 0) {
1970 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1971 atomic_read(&pcci->pcci_active_ios));
1972 l_wait_event(pcci->pcci_waitq,
1973 atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1977 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1979 pcci->pcci_type = LU_PCC_NONE;
1980 pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1981 pcc_layout_wait(pcci);
1984 void pcc_layout_invalidate(struct inode *inode)
1986 struct pcc_inode *pcci;
1990 pcc_inode_lock(inode);
1991 pcci = ll_i2pcci(inode);
1992 if (pcci && pcc_inode_has_layout(pcci)) {
1993 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1994 __pcc_layout_invalidate(pcci);
1996 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1997 PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1999 pcc_inode_put(pcci);
2001 pcc_inode_unlock(inode);
2006 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2010 rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
2012 CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
2013 ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
2014 pcc_dentry->d_name.name, rc);
2019 /* Create directory under base if directory does not exist */
2020 static struct dentry *
2021 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2024 struct dentry *dentry;
2025 struct inode *dir = base->d_inode;
2028 dentry = lookup_one_len(name, base, strlen(name));
2032 if (d_is_positive(dentry))
2035 rc = vfs_mkdir(dir, dentry, mode);
2038 dentry = ERR_PTR(rc);
2046 static struct dentry *
2047 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2049 char *ptr, *entry_name;
2050 struct dentry *parent;
2051 struct dentry *child = ERR_PTR(-EINVAL);
2058 parent = dget(root);
2059 while ((ptr = strchr(ptr, '/')) != NULL) {
2061 child = pcc_mkdir(parent, entry_name, mode);
2075 /* Create file under base. If file already exist, return failure */
2076 static struct dentry *
2077 pcc_create(struct dentry *base, const char *name, umode_t mode)
2080 struct dentry *dentry;
2081 struct inode *dir = base->d_inode;
2084 dentry = lookup_one_len(name, base, strlen(name));
2088 if (d_is_positive(dentry))
2091 rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
2094 dentry = ERR_PTR(rc);
2102 static int __pcc_inode_create(struct pcc_dataset *dataset,
2104 struct dentry **dentry)
2107 struct dentry *base;
2108 struct dentry *child;
2111 OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
2115 pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
2117 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2123 snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
2124 child = pcc_create(base, path, 0);
2125 if (IS_ERR(child)) {
2126 rc = PTR_ERR(child);
2134 OBD_FREE(path, MAX_PCC_DATABASE_PATH);
2139 * Reset uid, gid or size for the PCC copy masked by @valid.
2140 * TODO: Set the project ID for PCC copy.
2142 int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2143 kuid_t uid, kgid_t gid, loff_t size)
2145 struct inode *inode = dentry->d_inode;
2151 attr.ia_valid = valid;
2154 attr.ia_size = size;
2157 rc = notify_change(dentry, &attr, NULL);
2158 inode_unlock(inode);
2163 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2164 struct lu_fid *fid, struct dentry **pcc_dentry)
2166 const struct cred *old_cred;
2169 old_cred = override_creds(pcc_super_cred(sb));
2170 rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2171 revert_creds(old_cred);
2175 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2177 struct dentry *pcc_dentry = pca->pca_dentry;
2178 const struct cred *old_cred;
2179 struct pcc_inode *pcci;
2184 if (!pca->pca_dataset)
2188 GOTO(out_dataset_put, rc = 0);
2190 LASSERT(pcc_dentry);
2192 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2193 pcc_inode_lock(inode);
2194 LASSERT(ll_i2pcci(inode) == NULL);
2195 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2197 GOTO(out_put, rc = -ENOMEM);
2199 rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2200 old_cred->suid, old_cred->sgid, 0);
2204 pcc_inode_init(pcci, ll_i2info(inode));
2205 pcc_inode_attach_init(pca->pca_dataset, pcci, pcc_dentry,
2208 rc = pcc_layout_xattr_set(pcci, 0);
2210 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2211 pcc_inode_put(pcci);
2212 GOTO(out_unlock, rc);
2215 /* Set the layout generation of newly created file with 0 */
2216 pcc_layout_gen_set(pcci, 0);
2220 (void) pcc_inode_remove(inode, pcc_dentry);
2224 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2227 pcc_inode_unlock(inode);
2228 revert_creds(old_cred);
2230 pcc_dataset_put(pca->pca_dataset);
2234 void pcc_create_attach_cleanup(struct super_block *sb,
2235 struct pcc_create_attach *pca)
2237 if (!pca->pca_dataset)
2240 if (pca->pca_dentry) {
2241 const struct cred *old_cred;
2244 old_cred = override_creds(pcc_super_cred(sb));
2245 rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
2248 CWARN("failed to unlink PCC file %.*s, rc = %d\n",
2249 pca->pca_dentry->d_name.len,
2250 pca->pca_dentry->d_name.name, rc);
2251 /* ignore the unlink failure */
2252 revert_creds(old_cred);
2253 dput(pca->pca_dentry);
2256 pcc_dataset_put(pca->pca_dataset);
2259 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2265 size = cfs_kernel_write(filp, buf, count, offset);
2274 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2278 loff_t pos, offset = 0;
2279 size_t buf_len = 1048576;
2284 OBD_ALLOC_LARGE(buf, buf_len);
2289 if (signal_pending(current))
2290 GOTO(out_free, rc = -EINTR);
2293 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2295 GOTO(out_free, rc = rc2);
2300 rc = pcc_filp_write(dst, buf, rc2, &pos);
2308 OBD_FREE_LARGE(buf, buf_len);
2312 static int pcc_attach_allowed_check(struct inode *inode)
2314 struct ll_inode_info *lli = ll_i2info(inode);
2315 struct pcc_inode *pcci;
2320 pcc_inode_lock(inode);
2321 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2322 GOTO(out_unlock, rc = -EBUSY);
2324 pcci = ll_i2pcci(inode);
2325 if (pcci && pcc_inode_has_layout(pcci))
2326 GOTO(out_unlock, rc = -EEXIST);
2328 lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2330 pcc_inode_unlock(inode);
2334 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2337 struct pcc_dataset *dataset;
2338 struct ll_inode_info *lli = ll_i2info(inode);
2339 struct pcc_inode *pcci;
2340 const struct cred *old_cred;
2341 struct dentry *dentry;
2342 struct file *pcc_filp;
2349 rc = pcc_attach_allowed_check(inode);
2353 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2354 LU_PCC_READWRITE, archive_id);
2355 if (dataset == NULL)
2358 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2359 rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2361 GOTO(out_dataset_put, rc);
2363 path.mnt = dataset->pccd_path.mnt;
2364 path.dentry = dentry;
2365 #ifdef HAVE_DENTRY_OPEN_USE_PATH
2366 pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2368 pcc_filp = dentry_open(path.dentry, path.mnt, O_WRONLY | O_LARGEFILE,
2371 if (IS_ERR_OR_NULL(pcc_filp)) {
2372 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2373 GOTO(out_dentry, rc);
2376 rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
2377 old_cred->uid, old_cred->gid, 0);
2381 ret = pcc_copy_data(file, pcc_filp);
2383 GOTO(out_fput, rc = ret);
2386 * It must to truncate the PCC copy to the same size of the Lustre
2387 * copy after copy data. Otherwise, it may get wrong file size after
2388 * re-attach a file. See LU-13023 for details.
2390 rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
2391 KGIDT_INIT(0), ret);
2395 /* Pause to allow for a race with concurrent HSM remove */
2396 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2398 pcc_inode_lock(inode);
2399 pcci = ll_i2pcci(inode);
2401 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2403 GOTO(out_unlock, rc = -ENOMEM);
2405 pcc_inode_init(pcci, lli);
2406 pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
2408 pcc_inode_unlock(inode);
2413 (void) pcc_inode_remove(inode, dentry);
2417 pcc_dataset_put(dataset);
2418 revert_creds(old_cred);
2423 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2424 __u32 gen, bool lease_broken, int rc,
2427 struct ll_inode_info *lli = ll_i2info(inode);
2428 const struct cred *old_cred;
2429 struct pcc_inode *pcci;
2434 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2435 pcc_inode_lock(inode);
2436 pcci = ll_i2pcci(inode);
2437 if (rc || lease_broken) {
2438 if (attached && pcci)
2439 pcc_inode_put(pcci);
2441 GOTO(out_unlock, rc);
2444 /* PCC inode may be released due to layout lock revocatioin */
2446 GOTO(out_unlock, rc = -ESTALE);
2449 rc = pcc_layout_xattr_set(pcci, gen);
2453 LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2454 rc = ll_layout_refresh(inode, &gen2);
2457 pcc_layout_gen_set(pcci, gen);
2460 DFID" layout changed from %d to %d.\n",
2461 PFID(ll_inode2fid(inode)), gen, gen2);
2462 GOTO(out_put, rc = -ESTALE);
2468 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2469 pcc_inode_put(pcci);
2472 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2473 pcc_inode_unlock(inode);
2474 revert_creds(old_cred);
2478 static int pcc_hsm_remove(struct inode *inode)
2480 struct hsm_user_request *hur;
2487 rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2489 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2490 PFID(&ll_i2info(inode)->lli_fid), rc);
2494 ll_layout_refresh(inode, &gen);
2496 len = sizeof(struct hsm_user_request) +
2497 sizeof(struct hsm_user_item);
2498 OBD_ALLOC(hur, len);
2502 hur->hur_request.hr_action = HUA_REMOVE;
2503 hur->hur_request.hr_archive_id = 0;
2504 hur->hur_request.hr_flags = 0;
2505 memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2506 sizeof(hur->hur_user_item[0].hui_fid));
2507 hur->hur_user_item[0].hui_extent.offset = 0;
2508 hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2509 hur->hur_request.hr_itemcount = 1;
2510 rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2513 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2514 PFID(&ll_i2info(inode)->lli_fid), rc);
2520 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2522 struct ll_inode_info *lli = ll_i2info(inode);
2523 struct pcc_inode *pcci;
2524 bool hsm_remove = false;
2529 pcc_inode_lock(inode);
2530 pcci = lli->lli_pcc_inode;
2531 if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2532 !pcc_inode_has_layout(pcci))
2533 GOTO(out_unlock, rc = 0);
2535 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2537 if (pcci->pcci_type == LU_PCC_READWRITE) {
2538 if (opt == PCC_DETACH_OPT_UNCACHE)
2541 __pcc_layout_invalidate(pcci);
2542 pcc_inode_put(pcci);
2546 pcc_inode_unlock(inode);
2548 const struct cred *old_cred;
2550 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2551 rc = pcc_hsm_remove(inode);
2552 revert_creds(old_cred);
2558 int pcc_ioctl_state(struct file *file, struct inode *inode,
2559 struct lu_pcc_state *state)
2565 int buf_len = sizeof(state->pccs_path);
2566 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2567 struct pcc_file *pccf = &fd->fd_pcc_file;
2568 struct pcc_inode *pcci;
2575 OBD_ALLOC(buf, buf_len);
2579 pcc_inode_lock(inode);
2580 pcci = ll_i2pcci(inode);
2582 state->pccs_type = LU_PCC_NONE;
2583 GOTO(out_unlock, rc = 0);
2586 count = atomic_read(&pcci->pcci_refcount);
2588 state->pccs_type = LU_PCC_NONE;
2589 state->pccs_open_count = 0;
2590 GOTO(out_unlock, rc = 0);
2593 if (pcc_inode_has_layout(pcci))
2595 if (pccf->pccf_file != NULL)
2597 state->pccs_type = pcci->pcci_type;
2598 state->pccs_open_count = count;
2599 state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2600 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2602 GOTO(out_unlock, rc = PTR_ERR(path));
2604 if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2605 GOTO(out_unlock, rc = -ENAMETOOLONG);
2608 pcc_inode_unlock(inode);
2609 OBD_FREE(buf, buf_len);