4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 int pcc_super_init(struct pcc_super *super)
119 super->pccs_cred = cred = prepare_creds();
123 /* Never override disk quota limits or use reserved space */
124 cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125 init_rwsem(&super->pccs_rw_sem);
126 INIT_LIST_HEAD(&super->pccs_datasets);
127 super->pccs_generation = 1;
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
135 struct pcc_match_id *id, *n;
137 list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138 list_del_init(&id->pmi_linkage);
143 static void pcc_fname_list_free(struct list_head *fname_list)
145 struct pcc_match_fname *fname, *n;
147 list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149 list_del_init(&fname->pmf_linkage);
154 static void pcc_expression_free(struct pcc_expression *expr)
156 LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157 expr->pe_field < PCC_FIELD_MAX);
158 switch (expr->pe_field) {
161 case PCC_FIELD_PROJID:
162 pcc_id_list_free(&expr->pe_cond);
164 case PCC_FIELD_FNAME:
165 pcc_fname_list_free(&expr->pe_cond);
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
175 struct pcc_expression *expression, *n;
177 LASSERT(list_empty(&conjunction->pc_linkage));
178 list_for_each_entry_safe(expression, n,
179 &conjunction->pc_expressions,
181 list_del_init(&expression->pe_linkage);
182 pcc_expression_free(expression);
184 OBD_FREE_PTR(conjunction);
187 static void pcc_rule_conds_free(struct list_head *cond_list)
189 struct pcc_conjunction *conjunction, *n;
191 list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192 list_del_init(&conjunction->pc_linkage);
193 pcc_conjunction_free(conjunction);
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
199 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201 pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202 if (cmd->u.pccc_add.pccc_conds_str)
203 OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
208 #define PCC_DISJUNCTION_DELIM (',')
209 #define PCC_CONJUNCTION_DELIM ('&')
210 #define PCC_EXPRESSION_DELIM ('=')
213 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
215 struct pcc_match_fname *fname;
217 OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
221 OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
222 if (fname->pmf_name == NULL) {
223 OBD_FREE(fname, sizeof(struct pcc_match_fname));
227 memcpy(fname->pmf_name, id->ls_str, id->ls_len);
228 list_add_tail(&fname->pmf_linkage, fname_list);
233 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
243 INIT_LIST_HEAD(fname_list);
245 rc = cfs_gettok(&src, ' ', &res);
250 rc = pcc_fname_list_add(&res, fname_list);
255 pcc_fname_list_free(fname_list);
260 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
269 if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
270 type != PCC_FIELD_PROJID)
275 INIT_LIST_HEAD(id_list);
277 struct pcc_match_id *id;
280 if (cfs_gettok(&src, ' ', &res) == 0)
281 GOTO(out, rc = -EINVAL);
283 if (!cfs_str2num_check(res.ls_str, res.ls_len,
284 &id_val, 0, (u32)~0U))
285 GOTO(out, rc = -EINVAL);
289 GOTO(out, rc = -ENOMEM);
292 list_add_tail(&id->pmi_linkage, id_list);
296 pcc_id_list_free(id_list);
301 pcc_check_field(struct cfs_lstr *field, char *str)
303 int len = strlen(str);
305 return (field->ls_len == len &&
306 strncmp(field->ls_str, str, len) == 0);
310 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
312 struct pcc_expression *expr;
313 struct cfs_lstr field;
316 OBD_ALLOC(expr, sizeof(struct pcc_expression));
320 rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
321 if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
322 src->ls_str[src->ls_len - 1] != '}')
323 GOTO(out, rc = -EINVAL);
325 /* Skip '{' and '}' */
329 if (pcc_check_field(&field, "uid")) {
330 if (pcc_id_list_parse(src->ls_str,
334 GOTO(out, rc = -EINVAL);
335 expr->pe_field = PCC_FIELD_UID;
336 } else if (pcc_check_field(&field, "gid")) {
337 if (pcc_id_list_parse(src->ls_str,
341 GOTO(out, rc = -EINVAL);
342 expr->pe_field = PCC_FIELD_GID;
343 } else if (pcc_check_field(&field, "projid")) {
344 if (pcc_id_list_parse(src->ls_str,
347 PCC_FIELD_PROJID) < 0)
348 GOTO(out, rc = -EINVAL);
349 expr->pe_field = PCC_FIELD_PROJID;
350 } else if (pcc_check_field(&field, "fname")) {
351 if (pcc_fname_list_parse(src->ls_str,
354 GOTO(out, rc = -EINVAL);
355 expr->pe_field = PCC_FIELD_FNAME;
357 GOTO(out, rc = -EINVAL);
360 list_add_tail(&expr->pe_linkage, cond_list);
368 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
370 struct pcc_conjunction *conjunction;
371 struct cfs_lstr expr;
374 OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
375 if (conjunction == NULL)
378 INIT_LIST_HEAD(&conjunction->pc_expressions);
379 list_add_tail(&conjunction->pc_linkage, cond_list);
381 while (src->ls_str) {
382 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
387 rc = pcc_expression_parse(&expr,
388 &conjunction->pc_expressions);
395 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
403 INIT_LIST_HEAD(cond_list);
405 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
410 rc = pcc_conjunction_parse(&res, cond_list);
417 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
421 OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
422 if (cmd->u.pccc_add.pccc_conds_str == NULL)
425 memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
427 rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
428 strlen(cmd->u.pccc_add.pccc_conds_str),
429 &cmd->u.pccc_add.pccc_conds);
437 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
444 key = strsep(&val, "=");
445 if (val == NULL || strlen(val) == 0)
448 /* Key of the value pair */
449 if (strcmp(key, "rwid") == 0) {
450 rc = kstrtoul(val, 10, &id);
455 cmd->u.pccc_add.pccc_rwid = id;
456 } else if (strcmp(key, "roid") == 0) {
457 rc = kstrtoul(val, 10, &id);
462 cmd->u.pccc_add.pccc_roid = id;
463 } else if (strcmp(key, "auto_attach") == 0) {
464 rc = kstrtoul(val, 10, &id);
468 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
469 } else if (strcmp(key, "open_attach") == 0) {
470 rc = kstrtoul(val, 10, &id);
474 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
475 } else if (strcmp(key, "io_attach") == 0) {
476 rc = kstrtoul(val, 10, &id);
480 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
481 } else if (strcmp(key, "stat_attach") == 0) {
482 rc = kstrtoul(val, 10, &id);
486 cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
487 } else if (strcmp(key, "rwpcc") == 0) {
488 rc = kstrtoul(val, 10, &id);
492 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
493 } else if (strcmp(key, "ropcc") == 0) {
494 rc = kstrtoul(val, 10, &id);
498 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
507 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
513 switch (cmd->pccc_cmd) {
514 case PCC_ADD_DATASET:
515 /* Enable auto attach by default */
516 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
518 case PCC_DEL_DATASET:
526 while (val != NULL && strlen(val) != 0) {
527 token = strsep(&val, " ");
528 rc = pcc_parse_value_pair(cmd, token);
533 switch (cmd->pccc_cmd) {
534 case PCC_ADD_DATASET:
535 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
536 cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
539 * By default, a PCC backend can provide caching service for
540 * both RW-PCC and RO-PCC.
542 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
543 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
545 /* For RW-PCC, the value of @rwid must be non zero. */
546 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
547 cmd->u.pccc_add.pccc_rwid == 0)
551 case PCC_DEL_DATASET:
561 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
563 if (!list_empty(&rule->pmr_conds))
564 pcc_rule_conds_free(&rule->pmr_conds);
565 LASSERT(rule->pmr_conds_str != NULL);
566 OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
570 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
574 LASSERT(cmd->u.pccc_add.pccc_conds_str);
575 OBD_ALLOC(rule->pmr_conds_str,
576 strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
577 if (rule->pmr_conds_str == NULL)
580 memcpy(rule->pmr_conds_str,
581 cmd->u.pccc_add.pccc_conds_str,
582 strlen(cmd->u.pccc_add.pccc_conds_str));
584 INIT_LIST_HEAD(&rule->pmr_conds);
585 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
586 rc = pcc_conds_parse(rule->pmr_conds_str,
587 strlen(rule->pmr_conds_str),
591 pcc_dataset_rule_fini(rule);
598 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
600 struct pcc_match_id *id;
602 list_for_each_entry(id, id_list, pmi_linkage) {
603 if (id->pmi_id == id_val)
610 cfs_match_wildcard(const char *pattern, const char *content)
612 if (*pattern == '\0' && *content == '\0')
615 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
618 while (*pattern == *content) {
621 if (*pattern == '\0' && *content == '\0')
624 if (*pattern == '*' && *(pattern + 1) != '\0' &&
630 return (cfs_match_wildcard(pattern + 1, content) ||
631 cfs_match_wildcard(pattern, content + 1));
637 pcc_fname_list_match(struct list_head *fname_list, const char *name)
639 struct pcc_match_fname *fname;
641 list_for_each_entry(fname, fname_list, pmf_linkage) {
642 if (cfs_match_wildcard(fname->pmf_name, name))
649 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
651 switch (expr->pe_field) {
653 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
655 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
656 case PCC_FIELD_PROJID:
657 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
658 case PCC_FIELD_FNAME:
659 return pcc_fname_list_match(&expr->pe_cond,
660 matcher->pm_name->name);
667 pcc_conjunction_match(struct pcc_conjunction *conjunction,
668 struct pcc_matcher *matcher)
670 struct pcc_expression *expr;
673 list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
674 matched = pcc_expression_match(expr, matcher);
683 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
685 struct pcc_conjunction *conjunction;
688 list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
689 matched = pcc_conjunction_match(conjunction, matcher);
698 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
700 struct pcc_dataset *dataset;
701 struct pcc_dataset *selected = NULL;
703 down_read(&super->pccs_rw_sem);
704 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
705 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
708 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
709 atomic_inc(&dataset->pccd_refcount);
714 up_read(&super->pccs_rw_sem);
716 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
717 dataset->pccd_rule.pmr_conds_str,
718 matcher->pm_uid, matcher->pm_gid,
719 matcher->pm_projid, matcher->pm_name->name);
725 * pcc_dataset_add - Add a Cache policy to control which files need be
726 * cached and where it will be cached.
728 * @super: superblock of pcc
732 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
734 char *pathname = cmd->pccc_pathname;
735 struct pcc_dataset *dataset;
736 struct pcc_dataset *tmp;
740 OBD_ALLOC_PTR(dataset);
744 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
746 OBD_FREE_PTR(dataset);
749 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
750 dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
751 dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
752 dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
753 atomic_set(&dataset->pccd_refcount, 1);
755 rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
757 pcc_dataset_put(dataset);
761 down_write(&super->pccs_rw_sem);
762 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
763 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
764 (dataset->pccd_rwid != 0 &&
765 dataset->pccd_rwid == tmp->pccd_rwid) ||
766 (dataset->pccd_roid != 0 &&
767 dataset->pccd_roid == tmp->pccd_roid)) {
773 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
774 up_write(&super->pccs_rw_sem);
777 pcc_dataset_put(dataset);
785 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
787 struct pcc_dataset *dataset;
788 struct pcc_dataset *selected = NULL;
794 * archive ID (read-write ID) or read-only ID is unique in the list,
795 * we just return last added one as first priority.
797 down_read(&super->pccs_rw_sem);
798 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
799 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
800 !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
802 atomic_inc(&dataset->pccd_refcount);
806 up_read(&super->pccs_rw_sem);
808 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
814 pcc_dataset_put(struct pcc_dataset *dataset)
816 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
817 pcc_dataset_rule_fini(&dataset->pccd_rule);
818 path_put(&dataset->pccd_path);
819 OBD_FREE_PTR(dataset);
824 pcc_dataset_del(struct pcc_super *super, char *pathname)
826 struct list_head *l, *tmp;
827 struct pcc_dataset *dataset;
830 down_write(&super->pccs_rw_sem);
831 list_for_each_safe(l, tmp, &super->pccs_datasets) {
832 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
833 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
834 list_del_init(&dataset->pccd_linkage);
835 pcc_dataset_put(dataset);
836 super->pccs_generation++;
841 up_write(&super->pccs_rw_sem);
846 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
848 seq_printf(m, "%s:\n", dataset->pccd_pathname);
849 seq_printf(m, " rwid: %u\n", dataset->pccd_rwid);
850 seq_printf(m, " flags: %x\n", dataset->pccd_flags);
851 seq_printf(m, " autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
855 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
857 struct pcc_dataset *dataset;
859 down_read(&super->pccs_rw_sem);
860 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
861 pcc_dataset_dump(dataset, m);
863 up_read(&super->pccs_rw_sem);
867 static void pcc_remove_datasets(struct pcc_super *super)
869 struct pcc_dataset *dataset, *tmp;
871 down_write(&super->pccs_rw_sem);
872 list_for_each_entry_safe(dataset, tmp,
873 &super->pccs_datasets, pccd_linkage) {
874 list_del(&dataset->pccd_linkage);
875 pcc_dataset_put(dataset);
877 super->pccs_generation++;
878 up_write(&super->pccs_rw_sem);
881 void pcc_super_fini(struct pcc_super *super)
883 pcc_remove_datasets(super);
884 put_cred(super->pccs_cred);
887 static bool pathname_is_valid(const char *pathname)
889 /* Needs to be absolute path */
890 if (pathname == NULL || strlen(pathname) == 0 ||
891 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
896 static struct pcc_cmd *
897 pcc_cmd_parse(char *buffer, unsigned long count)
899 static struct pcc_cmd *cmd;
906 GOTO(out, rc = -ENOMEM);
908 /* clear all setting */
909 if (strncmp(buffer, "clear", 5) == 0) {
910 cmd->pccc_cmd = PCC_CLEAR_ALL;
915 token = strsep(&val, " ");
916 if (val == NULL || strlen(val) == 0)
917 GOTO(out_free_cmd, rc = -EINVAL);
919 /* Type of the command */
920 if (strcmp(token, "add") == 0)
921 cmd->pccc_cmd = PCC_ADD_DATASET;
922 else if (strcmp(token, "del") == 0)
923 cmd->pccc_cmd = PCC_DEL_DATASET;
925 GOTO(out_free_cmd, rc = -EINVAL);
927 /* Pathname of the dataset */
928 token = strsep(&val, " ");
929 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
930 !pathname_is_valid(token))
931 GOTO(out_free_cmd, rc = -EINVAL);
932 cmd->pccc_pathname = token;
934 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
938 val = strrchr(token, '}');
940 GOTO(out_free_cmd, rc = -EINVAL);
946 } else if (*val == ' ') {
950 GOTO(out_free_cmd, rc = -EINVAL);
953 rc = pcc_id_parse(cmd, token);
955 GOTO(out_free_cmd, rc);
957 rc = pcc_parse_value_pairs(cmd, val);
959 GOTO(out_cmd_fini, rc = -EINVAL);
972 int pcc_cmd_handle(char *buffer, unsigned long count,
973 struct pcc_super *super)
978 cmd = pcc_cmd_parse(buffer, count);
982 switch (cmd->pccc_cmd) {
983 case PCC_ADD_DATASET:
984 rc = pcc_dataset_add(super, cmd);
986 case PCC_DEL_DATASET:
987 rc = pcc_dataset_del(super, cmd->pccc_pathname);
990 pcc_remove_datasets(super);
1002 static inline void pcc_inode_lock(struct inode *inode)
1004 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
1007 static inline void pcc_inode_unlock(struct inode *inode)
1009 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1012 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1014 pcci->pcci_lli = lli;
1015 lli->lli_pcc_inode = pcci;
1016 atomic_set(&pcci->pcci_refcount, 0);
1017 pcci->pcci_type = LU_PCC_NONE;
1018 pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1019 atomic_set(&pcci->pcci_active_ios, 0);
1020 init_waitqueue_head(&pcci->pcci_waitq);
1023 static void pcc_inode_fini(struct pcc_inode *pcci)
1025 struct ll_inode_info *lli = pcci->pcci_lli;
1027 path_put(&pcci->pcci_path);
1028 pcci->pcci_type = LU_PCC_NONE;
1029 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1030 lli->lli_pcc_inode = NULL;
1033 static void pcc_inode_get(struct pcc_inode *pcci)
1035 atomic_inc(&pcci->pcci_refcount);
1038 static void pcc_inode_put(struct pcc_inode *pcci)
1040 if (atomic_dec_and_test(&pcci->pcci_refcount))
1041 pcc_inode_fini(pcci);
1044 void pcc_inode_free(struct inode *inode)
1046 struct pcc_inode *pcci = ll_i2pcci(inode);
1049 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1050 pcc_inode_put(pcci);
1056 * As Andreas suggested, we'd better use new layout to
1058 * (fid->f_oid >> 16 & oxFFFF)/FID
1060 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1061 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1063 return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1065 (fid)->f_oid & 0xFFFF,
1066 (fid)->f_oid >> 16 & 0xFFFF,
1067 (unsigned int)((fid)->f_seq & 0xFFFF),
1068 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1069 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1070 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1074 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1076 return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1079 void pcc_file_init(struct pcc_file *pccf)
1081 pccf->pccf_file = NULL;
1082 pccf->pccf_type = LU_PCC_NONE;
1085 static inline bool pcc_auto_attach_enabled(enum pcc_dataset_flags flags,
1086 enum pcc_io_type iot)
1088 if (iot == PIT_OPEN)
1089 return flags & PCC_DATASET_OPEN_ATTACH;
1090 if (iot == PIT_GETATTR)
1091 return flags & PCC_DATASET_STAT_ATTACH;
1093 return flags & PCC_DATASET_AUTO_ATTACH;
1096 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1098 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1100 struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1101 struct ll_inode_info *lli = pcci->pcci_lli;
1106 if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1109 #ifndef HAVE_VFS_SETXATTR
1110 if (!pcc_dentry->d_inode->i_op->setxattr)
1113 rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
1114 &gen, sizeof(gen), 0);
1116 rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1117 &gen, sizeof(gen), 0);
1122 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1125 struct ll_inode_info *lli = ll_i2info(inode);
1134 env = cl_env_get(&refcheck);
1136 RETURN(PTR_ERR(env));
1138 rc = cl_object_layout_get(env, lli->lli_clob, clt);
1140 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1141 PFID(ll_inode2fid(inode)));
1143 cl_env_put(env, &refcheck);
1147 static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
1148 struct pcc_dataset *dataset)
1150 return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
1152 dataset->pccd_pathname,
1153 (fid)->f_oid & 0xFFFF,
1154 (fid)->f_oid >> 16 & 0xFFFF,
1155 (unsigned int)((fid)->f_seq & 0xFFFF),
1156 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1157 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1158 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1162 /* Must be called with pcci->pcci_lock held */
1163 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1164 struct pcc_inode *pcci,
1165 struct dentry *dentry,
1166 enum lu_pcc_type type)
1168 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1169 pcci->pcci_path.dentry = dentry;
1170 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1171 atomic_set(&pcci->pcci_refcount, 1);
1172 pcci->pcci_type = type;
1173 pcci->pcci_attr_valid = false;
1176 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1177 struct pcc_dataset *dataset)
1179 lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1180 lli->lli_pcc_dsflags = dataset->pccd_flags;
1183 static void pcc_inode_attach_set(struct pcc_super *super,
1184 struct pcc_dataset *dataset,
1185 struct ll_inode_info *lli,
1186 struct pcc_inode *pcci,
1187 struct dentry *dentry,
1188 enum lu_pcc_type type)
1190 pcc_inode_init(pcci, lli);
1191 pcc_inode_attach_init(dataset, pcci, dentry, type);
1192 down_read(&super->pccs_rw_sem);
1193 pcc_inode_dsflags_set(lli, dataset);
1194 up_read(&super->pccs_rw_sem);
1197 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1200 pcci->pcci_layout_gen = gen;
1203 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1205 return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1208 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1209 enum lu_pcc_type type,
1210 struct pcc_dataset *dataset,
1213 struct ll_inode_info *lli = ll_i2info(inode);
1214 struct pcc_inode *pcci = lli->lli_pcc_inode;
1215 const struct cred *old_cred;
1216 struct dentry *pcc_dentry;
1224 if (type == LU_PCC_READWRITE &&
1225 !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1228 OBD_ALLOC(pathname, PATH_MAX);
1229 if (pathname == NULL)
1232 pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
1234 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1235 rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
1237 /* ignore this error */
1240 pcc_dentry = path.dentry;
1241 #ifndef HAVE_VFS_SETXATTR
1242 if (!pcc_dentry->d_inode->i_op->getxattr)
1243 /* ignore this error */
1244 GOTO(out_put_path, rc = 0);
1246 rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
1247 &pcc_gen, sizeof(pcc_gen));
1249 rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1250 &pcc_gen, sizeof(pcc_gen));
1254 /* ignore this error */
1255 GOTO(out_put_path, rc = 0);
1258 /* The file is still valid cached in PCC, attach it immediately. */
1259 if (pcc_gen == gen) {
1260 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1261 PFID(&lli->lli_fid), gen);
1263 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1265 GOTO(out_put_path, rc = -ENOMEM);
1267 pcc_inode_init(pcci, lli);
1269 pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1272 * This happened when a file was once attached into
1273 * PCC, and some processes keep this file opened
1274 * (pcci->refcount > 1) and corresponding PCC file
1275 * without any I/O activity, and then this file was
1276 * detached by the manual detach command or the
1277 * revocation of the layout lock (i.e. cached LRU lock
1280 pcc_inode_get(pcci);
1281 pcci->pcci_type = type;
1283 pcc_inode_dsflags_set(lli, dataset);
1284 pcc_layout_gen_set(pcci, gen);
1290 revert_creds(old_cred);
1291 OBD_FREE(pathname, PATH_MAX);
1295 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1296 __u32 gen, enum lu_pcc_type type,
1299 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1300 struct ll_inode_info *lli = ll_i2info(inode);
1301 struct pcc_dataset *dataset = NULL, *tmp;
1306 down_read(&super->pccs_rw_sem);
1307 list_for_each_entry_safe(dataset, tmp,
1308 &super->pccs_datasets, pccd_linkage) {
1309 if (!pcc_auto_attach_enabled(dataset->pccd_flags, iot))
1312 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1313 if (rc < 0 || (!rc && *cached))
1318 * Update the saved dataset flags for the inode accordingly if failed.
1320 if (!rc && !*cached) {
1322 * Currently auto attach strategy for a PCC backend is
1323 * unchangeable once once it was added into the PCC datasets on
1324 * a client as the support to change auto attach strategy is
1325 * not implemented yet.
1328 * If tried to attach from one PCC backend:
1329 * @lli_pcc_generation > 0:
1330 * 1) The file was once attached into PCC, but now the
1331 * corresponding PCC backend should be removed from the client;
1332 * 2) The layout generation was changed, the data has been
1334 * 3) The corresponding PCC copy is not existed on PCC
1335 * @lli_pcc_generation == 0:
1336 * The file is never attached into PCC but in a HSM released
1337 * state, or once attached into PCC but the inode was evicted
1338 * from icache later.
1339 * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1340 * file will skip from the candidates to try auto attach until
1341 * the file is attached ninto PCC again.
1343 * If the file was never attached into PCC, or once attached but
1344 * its inode was evicted from icache (lli_pcc_generation == 0),
1345 * set the saved dataset flags with PCC_DATASET_NONE.
1347 * If the file was once attached into PCC but the corresponding
1348 * dataset was removed from the client, set the saved dataset
1349 * flags with PCC_DATASET_NONE.
1351 * TODO: If the file was once attached into PCC but not try to
1352 * auto attach due to the change of the configuration parameters
1353 * for this dataset (i.e. change from auto attach enabled to
1354 * auto attach disabled for this dataset), update the saved
1355 * dataset flags witha the found one.
1357 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1359 up_read(&super->pccs_rw_sem);
1365 * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1366 * Thus the client can get archive ID from the layout directly. When try to
1367 * attach the file automatically which is in HSM released state (according to
1368 * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1369 * valid cached on PCC more precisely according to the @rwid (archive ID) in
1370 * the PCC dataset and the archive ID in HSM attrs.
1372 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1373 enum pcc_io_type iot)
1375 struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1376 struct cl_layout clt = {
1378 .cl_is_released = false,
1380 struct ll_inode_info *lli = ll_i2info(inode);
1387 * Quick check whether there is PCC device.
1389 if (list_empty(&super->pccs_datasets))
1393 * The file layout lock was cancelled. And this open does not
1394 * obtain valid layout lock from MDT (i.e. the file is being
1397 if (iot == PIT_OPEN) {
1398 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1401 rc = ll_layout_refresh(inode, &gen);
1406 rc = pcc_get_layout_info(inode, &clt);
1410 if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1411 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1412 PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1416 if (clt.cl_is_released)
1417 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1418 LU_PCC_READWRITE, cached);
1423 static inline bool pcc_may_auto_attach(struct inode *inode,
1424 enum pcc_io_type iot)
1426 struct ll_inode_info *lli = ll_i2info(inode);
1427 struct pcc_super *super = ll_i2pccs(inode);
1429 /* Known the file was not in any PCC backend. */
1430 if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1434 * lli_pcc_generation = 0 means that the file was never attached into
1435 * PCC, or may be once attached into PCC but detached as the inode is
1436 * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1437 * icache shrinking due to the memory pressure), which will cause the
1438 * file detach from PCC when releasing the inode from icache.
1439 * In either case, we still try to attach.
1441 /* lli_pcc_generation == 0, or the PCC setting was changed,
1442 * or there is no PCC setup on the client and the try will return
1443 * immediately in pcc_try_auto_attch().
1445 if (super->pccs_generation != lli->lli_pcc_generation)
1448 /* The cached setting @lli_pcc_dsflags is valid */
1449 if (iot == PIT_OPEN)
1450 return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
1452 if (iot == PIT_GETATTR)
1453 return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
1455 return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
1458 int pcc_file_open(struct inode *inode, struct file *file)
1460 struct pcc_inode *pcci;
1461 struct ll_inode_info *lli = ll_i2info(inode);
1462 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1463 struct pcc_file *pccf = &fd->fd_pcc_file;
1464 struct file *pcc_file;
1467 bool cached = false;
1472 if (!S_ISREG(inode->i_mode))
1475 pcc_inode_lock(inode);
1476 pcci = ll_i2pcci(inode);
1478 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1479 GOTO(out_unlock, rc = 0);
1481 if (!pcci || !pcc_inode_has_layout(pcci)) {
1482 if (pcc_may_auto_attach(inode, PIT_OPEN))
1483 rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1485 if (rc < 0 || !cached)
1486 GOTO(out_unlock, rc);
1489 pcci = ll_i2pcci(inode);
1492 pcc_inode_get(pcci);
1493 WARN_ON(pccf->pccf_file);
1495 path = &pcci->pcci_path;
1496 dname = &path->dentry->d_name;
1497 CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1500 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1501 pcc_file = dentry_open(path, file->f_flags,
1502 pcc_super_cred(inode->i_sb));
1504 pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1505 pcc_super_cred(inode->i_sb));
1507 if (IS_ERR_OR_NULL(pcc_file)) {
1508 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1509 pcc_inode_put(pcci);
1511 pccf->pccf_file = pcc_file;
1512 pccf->pccf_type = pcci->pcci_type;
1516 pcc_inode_unlock(inode);
1520 void pcc_file_release(struct inode *inode, struct file *file)
1522 struct pcc_inode *pcci;
1523 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1524 struct pcc_file *pccf;
1530 if (!S_ISREG(inode->i_mode) || fd == NULL)
1533 pccf = &fd->fd_pcc_file;
1534 pcc_inode_lock(inode);
1535 if (pccf->pccf_file == NULL)
1538 pcci = ll_i2pcci(inode);
1540 path = &pcci->pcci_path;
1541 dname = &path->dentry->d_name;
1542 CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1544 pcc_inode_put(pcci);
1545 fput(pccf->pccf_file);
1546 pccf->pccf_file = NULL;
1548 pcc_inode_unlock(inode);
1552 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1554 struct pcc_inode *pcci;
1556 pcc_inode_lock(inode);
1557 pcci = ll_i2pcci(inode);
1558 if (pcci && pcc_inode_has_layout(pcci)) {
1559 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1560 atomic_inc(&pcci->pcci_active_ios);
1564 if (pcc_may_auto_attach(inode, iot)) {
1565 (void) pcc_try_auto_attach(inode, cached, iot);
1567 pcci = ll_i2pcci(inode);
1568 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1569 atomic_inc(&pcci->pcci_active_ios);
1573 pcc_inode_unlock(inode);
1576 static void pcc_io_fini(struct inode *inode)
1578 struct pcc_inode *pcci = ll_i2pcci(inode);
1580 LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1581 if (atomic_dec_and_test(&pcci->pcci_active_ios))
1582 wake_up_all(&pcci->pcci_waitq);
1587 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1589 struct file *file = iocb->ki_filp;
1591 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1592 return file->f_op->read_iter(iocb, iter);
1598 iov_for_each(iov, i, *iter) {
1601 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1602 if (-EIOCBQUEUED == res)
1603 res = wait_on_sync_kiocb(iocb);
1611 if (res < iov.iov_len)
1616 iov_iter_advance(iter, bytes);
1621 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1622 struct iov_iter *iter, bool *cached)
1624 struct file *file = iocb->ki_filp;
1625 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1626 struct pcc_file *pccf = &fd->fd_pcc_file;
1627 struct inode *inode = file_inode(file);
1632 if (pccf->pccf_file == NULL) {
1637 pcc_io_init(inode, PIT_READ, cached);
1641 iocb->ki_filp = pccf->pccf_file;
1642 /* generic_file_aio_read does not support ext4-dax,
1643 * __pcc_file_read_iter uses ->aio_read hook directly
1644 * to add support for ext4-dax.
1646 result = __pcc_file_read_iter(iocb, iter);
1647 iocb->ki_filp = file;
1654 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1656 struct file *file = iocb->ki_filp;
1658 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1659 return file->f_op->write_iter(iocb, iter);
1665 iov_for_each(iov, i, *iter) {
1668 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1669 if (-EIOCBQUEUED == res)
1670 res = wait_on_sync_kiocb(iocb);
1678 if (res < iov.iov_len)
1683 iov_iter_advance(iter, bytes);
1688 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1689 struct iov_iter *iter, bool *cached)
1691 struct file *file = iocb->ki_filp;
1692 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1693 struct pcc_file *pccf = &fd->fd_pcc_file;
1694 struct inode *inode = file_inode(file);
1699 if (pccf->pccf_file == NULL) {
1704 if (pccf->pccf_type != LU_PCC_READWRITE) {
1709 pcc_io_init(inode, PIT_WRITE, cached);
1713 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1714 GOTO(out, result = -ENOSPC);
1716 iocb->ki_filp = pccf->pccf_file;
1718 /* Since __pcc_file_write_iter makes write calls via
1719 * the normal vfs interface to the local PCC file system,
1720 * the inode lock is not needed.
1722 result = __pcc_file_write_iter(iocb, iter);
1723 iocb->ki_filp = file;
1729 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1733 const struct cred *old_cred;
1734 struct iattr attr2 = *attr;
1735 struct dentry *pcc_dentry;
1736 struct pcc_inode *pcci;
1740 if (!S_ISREG(inode->i_mode)) {
1745 pcc_io_init(inode, PIT_SETATTR, cached);
1749 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1750 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1751 ATTR_CTIME | ATTR_UID | ATTR_GID);
1752 pcci = ll_i2pcci(inode);
1753 pcc_dentry = pcci->pcci_path.dentry;
1754 inode_lock(pcc_dentry->d_inode);
1755 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1756 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1757 revert_creds(old_cred);
1758 inode_unlock(pcc_dentry->d_inode);
1764 int pcc_inode_getattr(struct inode *inode, bool *cached)
1766 struct ll_inode_info *lli = ll_i2info(inode);
1767 const struct cred *old_cred;
1776 if (!S_ISREG(inode->i_mode)) {
1781 pcc_io_init(inode, PIT_GETATTR, cached);
1785 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1786 rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1787 revert_creds(old_cred);
1791 ll_inode_size_lock(inode);
1792 if (inode->i_atime.tv_sec < lli->lli_atime ||
1793 lli->lli_update_atime) {
1794 inode->i_atime.tv_sec = lli->lli_atime;
1795 lli->lli_update_atime = 0;
1797 inode->i_mtime.tv_sec = lli->lli_mtime;
1798 inode->i_ctime.tv_sec = lli->lli_ctime;
1800 atime = inode->i_atime.tv_sec;
1801 mtime = inode->i_mtime.tv_sec;
1802 ctime = inode->i_ctime.tv_sec;
1804 if (atime < stat.atime.tv_sec)
1805 atime = stat.atime.tv_sec;
1807 if (ctime < stat.ctime.tv_sec)
1808 ctime = stat.ctime.tv_sec;
1810 if (mtime < stat.mtime.tv_sec)
1811 mtime = stat.mtime.tv_sec;
1813 i_size_write(inode, stat.size);
1814 inode->i_blocks = stat.blocks;
1816 inode->i_atime.tv_sec = atime;
1817 inode->i_mtime.tv_sec = mtime;
1818 inode->i_ctime.tv_sec = ctime;
1820 ll_inode_size_unlock(inode);
1826 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1827 struct pipe_inode_info *pipe,
1828 size_t count, unsigned int flags,
1831 struct inode *inode = file_inode(in_file);
1832 struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1833 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1842 if (!file_inode(pcc_file)->i_fop->splice_read)
1845 pcc_io_init(inode, PIT_SPLICE_READ, cached);
1849 result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1857 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1858 int datasync, bool *cached)
1860 struct inode *inode = file_inode(file);
1861 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1862 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1872 pcc_io_init(inode, PIT_FSYNC, cached);
1876 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1877 start, end, datasync);
1883 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1886 struct inode *inode = file_inode(file);
1887 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1888 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1889 struct pcc_inode *pcci;
1894 if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1899 pcc_inode_lock(inode);
1900 pcci = ll_i2pcci(inode);
1901 if (pcci && pcc_inode_has_layout(pcci)) {
1902 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1904 vma->vm_file = pcc_file;
1905 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1906 vma->vm_file = file;
1907 /* Save the vm ops of backend PCC */
1908 vma->vm_private_data = (void *)vma->vm_ops;
1912 pcc_inode_unlock(inode);
1917 void pcc_vm_open(struct vm_area_struct *vma)
1919 struct pcc_inode *pcci;
1920 struct file *file = vma->vm_file;
1921 struct inode *inode = file_inode(file);
1922 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1923 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1924 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1928 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1931 pcc_inode_lock(inode);
1932 pcci = ll_i2pcci(inode);
1933 if (pcci && pcc_inode_has_layout(pcci)) {
1934 vma->vm_file = pcc_file;
1935 pcc_vm_ops->open(vma);
1936 vma->vm_file = file;
1938 pcc_inode_unlock(inode);
1942 void pcc_vm_close(struct vm_area_struct *vma)
1944 struct file *file = vma->vm_file;
1945 struct inode *inode = file_inode(file);
1946 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1947 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1948 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1952 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1955 pcc_inode_lock(inode);
1956 /* Layout lock maybe revoked here */
1957 vma->vm_file = pcc_file;
1958 pcc_vm_ops->close(vma);
1959 vma->vm_file = file;
1960 pcc_inode_unlock(inode);
1964 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1967 struct page *page = vmf->page;
1968 struct mm_struct *mm = vma->vm_mm;
1969 struct file *file = vma->vm_file;
1970 struct inode *inode = file_inode(file);
1971 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1972 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1973 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1978 if (!pcc_file || !pcc_vm_ops) {
1983 if (!pcc_vm_ops->page_mkwrite &&
1984 page->mapping == pcc_file->f_mapping) {
1986 "%s: PCC backend fs not support ->page_mkwrite()\n",
1987 ll_i2sbi(inode)->ll_fsname);
1988 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1989 up_read(&mm->mmap_sem);
1991 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1993 /* Pause to allow for a race with concurrent detach */
1994 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1996 pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
1998 /* This happens when the file is detached from PCC after got
1999 * the fault page via ->fault() on the inode of the PCC copy.
2000 * Here it can not simply fall back to normal Lustre I/O path.
2001 * The reason is that the address space of fault page used by
2002 * ->page_mkwrite() is still the one of PCC inode. In the
2003 * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2004 * handled as the address space of the fault page is not
2005 * consistent with the one of the Lustre inode (though the
2006 * fault page was truncated).
2007 * As the file is detached from PCC, the fault page must
2008 * be released frist, and retry the mmap write (->fault() and
2010 * We use an ugly and tricky method by returning
2011 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2012 * __do_page_fault and retry the memory fault handling.
2014 if (page->mapping == pcc_file->f_mapping) {
2016 up_read(&mm->mmap_sem);
2017 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2024 * This fault injection can also be used to simulate -ENOSPC and
2025 * -EDQUOT failure of underlying PCC backend fs.
2027 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
2029 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2030 up_read(&mm->mmap_sem);
2031 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2034 vma->vm_file = pcc_file;
2035 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2036 rc = pcc_vm_ops->page_mkwrite(vmf);
2038 rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2040 vma->vm_file = file;
2046 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2049 struct file *file = vma->vm_file;
2050 struct inode *inode = file_inode(file);
2051 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2052 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2053 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2058 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2063 pcc_io_init(inode, PIT_FAULT, cached);
2067 vma->vm_file = pcc_file;
2068 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2069 rc = pcc_vm_ops->fault(vmf);
2071 rc = pcc_vm_ops->fault(vma, vmf);
2073 vma->vm_file = file;
2079 static void pcc_layout_wait(struct pcc_inode *pcci)
2081 struct l_wait_info lwi = { 0 };
2083 while (atomic_read(&pcci->pcci_active_ios) > 0) {
2084 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2085 atomic_read(&pcci->pcci_active_ios));
2086 l_wait_event(pcci->pcci_waitq,
2087 atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
2091 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2093 pcci->pcci_type = LU_PCC_NONE;
2094 pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2095 pcc_layout_wait(pcci);
2098 void pcc_layout_invalidate(struct inode *inode)
2100 struct pcc_inode *pcci;
2104 pcc_inode_lock(inode);
2105 pcci = ll_i2pcci(inode);
2106 if (pcci && pcc_inode_has_layout(pcci)) {
2107 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2108 __pcc_layout_invalidate(pcci);
2110 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2111 PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2113 pcc_inode_put(pcci);
2115 pcc_inode_unlock(inode);
2120 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2124 rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
2126 CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
2127 ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
2128 pcc_dentry->d_name.name, rc);
2133 /* Create directory under base if directory does not exist */
2134 static struct dentry *
2135 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2138 struct dentry *dentry;
2139 struct inode *dir = base->d_inode;
2142 dentry = lookup_one_len(name, base, strlen(name));
2146 if (d_is_positive(dentry))
2149 rc = vfs_mkdir(dir, dentry, mode);
2152 dentry = ERR_PTR(rc);
2160 static struct dentry *
2161 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2163 char *ptr, *entry_name;
2164 struct dentry *parent;
2165 struct dentry *child = ERR_PTR(-EINVAL);
2172 parent = dget(root);
2173 while ((ptr = strchr(ptr, '/')) != NULL) {
2175 child = pcc_mkdir(parent, entry_name, mode);
2189 /* Create file under base. If file already exist, return failure */
2190 static struct dentry *
2191 pcc_create(struct dentry *base, const char *name, umode_t mode)
2194 struct dentry *dentry;
2195 struct inode *dir = base->d_inode;
2198 dentry = lookup_one_len(name, base, strlen(name));
2202 if (d_is_positive(dentry))
2205 rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
2208 dentry = ERR_PTR(rc);
2216 static int __pcc_inode_create(struct pcc_dataset *dataset,
2218 struct dentry **dentry)
2221 struct dentry *base;
2222 struct dentry *child;
2225 OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
2229 pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
2231 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2237 snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
2238 child = pcc_create(base, path, 0);
2239 if (IS_ERR(child)) {
2240 rc = PTR_ERR(child);
2248 OBD_FREE(path, MAX_PCC_DATABASE_PATH);
2253 * Reset uid, gid or size for the PCC copy masked by @valid.
2254 * TODO: Set the project ID for PCC copy.
2256 int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2257 kuid_t uid, kgid_t gid, loff_t size)
2259 struct inode *inode = dentry->d_inode;
2265 attr.ia_valid = valid;
2268 attr.ia_size = size;
2271 rc = notify_change(dentry, &attr, NULL);
2272 inode_unlock(inode);
2277 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2278 struct lu_fid *fid, struct dentry **pcc_dentry)
2280 const struct cred *old_cred;
2283 old_cred = override_creds(pcc_super_cred(sb));
2284 rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2285 revert_creds(old_cred);
2289 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2291 struct dentry *pcc_dentry = pca->pca_dentry;
2292 struct pcc_super *super = ll_i2pccs(inode);
2293 const struct cred *old_cred;
2294 struct pcc_inode *pcci;
2299 if (!pca->pca_dataset)
2303 GOTO(out_dataset_put, rc = 0);
2305 LASSERT(pcc_dentry);
2307 old_cred = override_creds(super->pccs_cred);
2308 pcc_inode_lock(inode);
2309 LASSERT(ll_i2pcci(inode) == NULL);
2310 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2312 GOTO(out_put, rc = -ENOMEM);
2314 rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2315 old_cred->suid, old_cred->sgid, 0);
2319 pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2320 pcci, pcc_dentry, LU_PCC_READWRITE);
2322 rc = pcc_layout_xattr_set(pcci, 0);
2324 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2325 pcc_inode_put(pcci);
2326 GOTO(out_unlock, rc);
2329 /* Set the layout generation of newly created file with 0 */
2330 pcc_layout_gen_set(pcci, 0);
2334 (void) pcc_inode_remove(inode, pcc_dentry);
2338 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2341 pcc_inode_unlock(inode);
2342 revert_creds(old_cred);
2344 pcc_dataset_put(pca->pca_dataset);
2348 void pcc_create_attach_cleanup(struct super_block *sb,
2349 struct pcc_create_attach *pca)
2351 if (!pca->pca_dataset)
2354 if (pca->pca_dentry) {
2355 const struct cred *old_cred;
2358 old_cred = override_creds(pcc_super_cred(sb));
2359 rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
2362 CWARN("failed to unlink PCC file %.*s, rc = %d\n",
2363 pca->pca_dentry->d_name.len,
2364 pca->pca_dentry->d_name.name, rc);
2365 /* ignore the unlink failure */
2366 revert_creds(old_cred);
2367 dput(pca->pca_dentry);
2370 pcc_dataset_put(pca->pca_dataset);
2373 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2379 size = cfs_kernel_write(filp, buf, count, offset);
2388 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2392 loff_t pos, offset = 0;
2393 size_t buf_len = 1048576;
2398 OBD_ALLOC_LARGE(buf, buf_len);
2403 if (signal_pending(current))
2404 GOTO(out_free, rc = -EINTR);
2407 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2409 GOTO(out_free, rc = rc2);
2414 rc = pcc_filp_write(dst, buf, rc2, &pos);
2422 OBD_FREE_LARGE(buf, buf_len);
2426 static int pcc_attach_allowed_check(struct inode *inode)
2428 struct ll_inode_info *lli = ll_i2info(inode);
2429 struct pcc_inode *pcci;
2434 pcc_inode_lock(inode);
2435 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2436 GOTO(out_unlock, rc = -EBUSY);
2438 pcci = ll_i2pcci(inode);
2439 if (pcci && pcc_inode_has_layout(pcci))
2440 GOTO(out_unlock, rc = -EEXIST);
2442 lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2444 pcc_inode_unlock(inode);
2448 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2451 struct pcc_dataset *dataset;
2452 struct ll_inode_info *lli = ll_i2info(inode);
2453 struct pcc_super *super = ll_i2pccs(inode);
2454 struct pcc_inode *pcci;
2455 const struct cred *old_cred;
2456 struct dentry *dentry;
2457 struct file *pcc_filp;
2464 rc = pcc_attach_allowed_check(inode);
2468 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2469 LU_PCC_READWRITE, archive_id);
2470 if (dataset == NULL)
2473 old_cred = override_creds(super->pccs_cred);
2474 rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2476 GOTO(out_dataset_put, rc);
2478 path.mnt = dataset->pccd_path.mnt;
2479 path.dentry = dentry;
2480 #ifdef HAVE_DENTRY_OPEN_USE_PATH
2481 pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2483 pcc_filp = dentry_open(path.dentry, path.mnt, O_WRONLY | O_LARGEFILE,
2486 if (IS_ERR_OR_NULL(pcc_filp)) {
2487 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2488 GOTO(out_dentry, rc);
2491 rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
2492 old_cred->uid, old_cred->gid, 0);
2496 ret = pcc_copy_data(file, pcc_filp);
2498 GOTO(out_fput, rc = ret);
2501 * It must to truncate the PCC copy to the same size of the Lustre
2502 * copy after copy data. Otherwise, it may get wrong file size after
2503 * re-attach a file. See LU-13023 for details.
2505 rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
2506 KGIDT_INIT(0), ret);
2510 /* Pause to allow for a race with concurrent HSM remove */
2511 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2513 pcc_inode_lock(inode);
2514 pcci = ll_i2pcci(inode);
2516 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2518 GOTO(out_unlock, rc = -ENOMEM);
2520 pcc_inode_attach_set(super, dataset, lli, pcci,
2521 dentry, LU_PCC_READWRITE);
2523 pcc_inode_unlock(inode);
2528 (void) pcc_inode_remove(inode, dentry);
2532 pcc_dataset_put(dataset);
2533 revert_creds(old_cred);
2538 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2539 __u32 gen, bool lease_broken, int rc,
2542 struct ll_inode_info *lli = ll_i2info(inode);
2543 const struct cred *old_cred;
2544 struct pcc_inode *pcci;
2549 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2550 pcc_inode_lock(inode);
2551 pcci = ll_i2pcci(inode);
2552 if (rc || lease_broken) {
2553 if (attached && pcci)
2554 pcc_inode_put(pcci);
2556 GOTO(out_unlock, rc);
2559 /* PCC inode may be released due to layout lock revocatioin */
2561 GOTO(out_unlock, rc = -ESTALE);
2564 rc = pcc_layout_xattr_set(pcci, gen);
2568 LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2569 rc = ll_layout_refresh(inode, &gen2);
2572 pcc_layout_gen_set(pcci, gen);
2575 DFID" layout changed from %d to %d.\n",
2576 PFID(ll_inode2fid(inode)), gen, gen2);
2577 GOTO(out_put, rc = -ESTALE);
2583 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2584 pcc_inode_put(pcci);
2587 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2588 pcc_inode_unlock(inode);
2589 revert_creds(old_cred);
2593 static int pcc_hsm_remove(struct inode *inode)
2595 struct hsm_user_request *hur;
2602 rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2604 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2605 PFID(&ll_i2info(inode)->lli_fid), rc);
2609 ll_layout_refresh(inode, &gen);
2611 len = sizeof(struct hsm_user_request) +
2612 sizeof(struct hsm_user_item);
2613 OBD_ALLOC(hur, len);
2617 hur->hur_request.hr_action = HUA_REMOVE;
2618 hur->hur_request.hr_archive_id = 0;
2619 hur->hur_request.hr_flags = 0;
2620 memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2621 sizeof(hur->hur_user_item[0].hui_fid));
2622 hur->hur_user_item[0].hui_extent.offset = 0;
2623 hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2624 hur->hur_request.hr_itemcount = 1;
2625 rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2628 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2629 PFID(&ll_i2info(inode)->lli_fid), rc);
2635 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2637 struct ll_inode_info *lli = ll_i2info(inode);
2638 struct pcc_inode *pcci;
2639 bool hsm_remove = false;
2644 pcc_inode_lock(inode);
2645 pcci = lli->lli_pcc_inode;
2646 if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2647 !pcc_inode_has_layout(pcci))
2648 GOTO(out_unlock, rc = 0);
2650 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2652 if (pcci->pcci_type == LU_PCC_READWRITE) {
2653 if (opt == PCC_DETACH_OPT_UNCACHE) {
2656 * The file will be removed from PCC, set the flags
2657 * with PCC_DATASET_NONE even the later removal of the
2660 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
2663 __pcc_layout_invalidate(pcci);
2664 pcc_inode_put(pcci);
2668 pcc_inode_unlock(inode);
2670 const struct cred *old_cred;
2672 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2673 rc = pcc_hsm_remove(inode);
2674 revert_creds(old_cred);
2680 int pcc_ioctl_state(struct file *file, struct inode *inode,
2681 struct lu_pcc_state *state)
2687 int buf_len = sizeof(state->pccs_path);
2688 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2689 struct pcc_file *pccf = &fd->fd_pcc_file;
2690 struct pcc_inode *pcci;
2697 OBD_ALLOC(buf, buf_len);
2701 pcc_inode_lock(inode);
2702 pcci = ll_i2pcci(inode);
2704 state->pccs_type = LU_PCC_NONE;
2705 GOTO(out_unlock, rc = 0);
2708 count = atomic_read(&pcci->pcci_refcount);
2710 state->pccs_type = LU_PCC_NONE;
2711 state->pccs_open_count = 0;
2712 GOTO(out_unlock, rc = 0);
2715 if (pcc_inode_has_layout(pcci))
2717 if (pccf->pccf_file != NULL)
2719 state->pccs_type = pcci->pcci_type;
2720 state->pccs_open_count = count;
2721 state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2722 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2724 GOTO(out_unlock, rc = PTR_ERR(path));
2726 if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2727 GOTO(out_unlock, rc = -ENAMETOOLONG);
2730 pcc_inode_unlock(inode);
2731 OBD_FREE(buf, buf_len);