Whamcloud - gitweb
LU-10918 pcc: auto RO-PCC caching when O_RDONLY open files
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127         super->pccs_generation = 1;
128
129         return 0;
130 }
131
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
134 {
135         struct pcc_match_id *id, *n;
136
137         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138                 list_del_init(&id->pmi_linkage);
139                 OBD_FREE_PTR(id);
140         }
141 }
142
143 static void pcc_fname_list_free(struct list_head *fname_list)
144 {
145         struct pcc_match_fname *fname, *n;
146
147         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149                 list_del_init(&fname->pmf_linkage);
150                 OBD_FREE_PTR(fname);
151         }
152 }
153
154 static void pcc_expression_free(struct pcc_expression *expr)
155 {
156         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157                 expr->pe_field < PCC_FIELD_MAX);
158         switch (expr->pe_field) {
159         case PCC_FIELD_UID:
160         case PCC_FIELD_GID:
161         case PCC_FIELD_PROJID:
162                 pcc_id_list_free(&expr->pe_cond);
163                 break;
164         case PCC_FIELD_FNAME:
165                 pcc_fname_list_free(&expr->pe_cond);
166                 break;
167         default:
168                 LBUG();
169         }
170         OBD_FREE_PTR(expr);
171 }
172
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
174 {
175         struct pcc_expression *expression, *n;
176
177         LASSERT(list_empty(&conjunction->pc_linkage));
178         list_for_each_entry_safe(expression, n,
179                                  &conjunction->pc_expressions,
180                                  pe_linkage) {
181                 list_del_init(&expression->pe_linkage);
182                 pcc_expression_free(expression);
183         }
184         OBD_FREE_PTR(conjunction);
185 }
186
187 static void pcc_rule_conds_free(struct list_head *cond_list)
188 {
189         struct pcc_conjunction *conjunction, *n;
190
191         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192                 list_del_init(&conjunction->pc_linkage);
193                 pcc_conjunction_free(conjunction);
194         }
195 }
196
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
198 {
199         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202                 if (cmd->u.pccc_add.pccc_conds_str)
203                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
205         }
206 }
207
208 #define PCC_DISJUNCTION_DELIM   (",")
209 #define PCC_CONJUNCTION_DELIM   ("&")
210 #define PCC_EXPRESSION_DELIM    ("=")
211
212 static int
213 pcc_fname_list_add(char *id, struct list_head *fname_list)
214 {
215         struct pcc_match_fname *fname;
216
217         OBD_ALLOC_PTR(fname);
218         if (fname == NULL)
219                 return -ENOMEM;
220
221         OBD_ALLOC(fname->pmf_name, strlen(id) + 1);
222         if (fname->pmf_name == NULL) {
223                 OBD_FREE_PTR(fname);
224                 return -ENOMEM;
225         }
226
227         strcpy(fname->pmf_name, id);
228         list_add_tail(&fname->pmf_linkage, fname_list);
229         return 0;
230 }
231
232 static int
233 pcc_fname_list_parse(char *str, struct list_head *fname_list)
234 {
235         int rc = 0;
236
237         ENTRY;
238
239         INIT_LIST_HEAD(fname_list);
240         while (rc == 0 && str) {
241                 char *fname = strsep(&str, " ");
242
243                 if (*fname)
244                         rc = pcc_fname_list_add(fname, fname_list);
245         }
246         if (list_empty(fname_list))
247                 rc = -EINVAL;
248         if (rc)
249                 pcc_fname_list_free(fname_list);
250         RETURN(rc);
251 }
252
253 static int
254 pcc_id_list_parse(char *str, struct list_head *id_list,
255                   enum pcc_field type)
256 {
257         int rc = 0;
258
259         ENTRY;
260
261         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
262             type != PCC_FIELD_PROJID)
263                 RETURN(-EINVAL);
264
265         INIT_LIST_HEAD(id_list);
266         while (str) {
267                 char *num;
268                 struct pcc_match_id *id;
269                 unsigned long id_val;
270
271                 num = strsep(&str, " ");
272                 if (!*num)
273                         continue;
274                 rc = kstrtoul(num, 0, &id_val);
275                 if (rc)
276                         GOTO(out, rc);
277
278                 OBD_ALLOC_PTR(id);
279                 if (id == NULL)
280                         GOTO(out, rc = -ENOMEM);
281
282                 id->pmi_id = id_val;
283                 list_add_tail(&id->pmi_linkage, id_list);
284         }
285         if (list_empty(id_list))
286                 rc = -EINVAL;
287 out:
288         if (rc)
289                 pcc_id_list_free(id_list);
290         RETURN(rc);
291 }
292
293 static int
294 pcc_expression_parse(char *str, struct list_head *cond_list)
295 {
296         struct pcc_expression *expr;
297         char *field;
298         int len;
299         int rc = 0;
300
301         OBD_ALLOC_PTR(expr);
302         if (expr == NULL)
303                 return -ENOMEM;
304
305         field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
306         if (!*field || !str)
307                 /* No LHS or no '=' */
308                 GOTO(out, rc = -EINVAL);
309         str = skip_spaces(str);
310         len = strlen(str);
311         if (str[0] != '{' || str[len - 1] != '}')
312                 GOTO(out, rc = -EINVAL);
313
314         /* Skip '{' and '}' */
315         str[len - 1] = '\0';
316         str += 1;
317
318         if (strcmp(field, "uid") == 0) {
319                 if (pcc_id_list_parse(str,
320                                       &expr->pe_cond,
321                                       PCC_FIELD_UID) < 0)
322                         GOTO(out, rc = -EINVAL);
323                 expr->pe_field = PCC_FIELD_UID;
324         } else if (strcmp(field, "gid") == 0) {
325                 if (pcc_id_list_parse(str,
326                                       &expr->pe_cond,
327                                       PCC_FIELD_GID) < 0)
328                         GOTO(out, rc = -EINVAL);
329                 expr->pe_field = PCC_FIELD_GID;
330         } else if (strcmp(field, "projid") == 0) {
331                 if (pcc_id_list_parse(str,
332                                       &expr->pe_cond,
333                                       PCC_FIELD_PROJID) < 0)
334                         GOTO(out, rc = -EINVAL);
335                 expr->pe_field = PCC_FIELD_PROJID;
336         } else if (strcmp(field, "fname") == 0) {
337                 if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
338                         GOTO(out, rc = -EINVAL);
339                 expr->pe_field = PCC_FIELD_FNAME;
340         } else {
341                 GOTO(out, rc = -EINVAL);
342         }
343
344         list_add_tail(&expr->pe_linkage, cond_list);
345         return 0;
346 out:
347         OBD_FREE_PTR(expr);
348         return rc;
349 }
350
351 static int
352 pcc_conjunction_parse(char *str, struct list_head *cond_list)
353 {
354         struct pcc_conjunction *conjunction;
355         int rc = 0;
356
357         OBD_ALLOC_PTR(conjunction);
358         if (conjunction == NULL)
359                 return -ENOMEM;
360
361         INIT_LIST_HEAD(&conjunction->pc_expressions);
362         list_add_tail(&conjunction->pc_linkage, cond_list);
363
364         while (rc == 0 && str) {
365                 char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
366
367                 rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
368         }
369         return rc;
370 }
371
372 static int pcc_conds_parse(char *orig, struct list_head *cond_list)
373 {
374         char *str;
375         int rc = 0;
376
377         orig = kstrdup(orig, GFP_KERNEL);
378         if (!orig)
379                 return -ENOMEM;
380         str = orig;
381
382         INIT_LIST_HEAD(cond_list);
383         while (rc == 0 && str) {
384                 char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
385
386                 rc = pcc_conjunction_parse(term, cond_list);
387         }
388         kfree(orig);
389         return rc;
390 }
391
392 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
393 {
394         int rc;
395
396         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
397         if (cmd->u.pccc_add.pccc_conds_str == NULL)
398                 return -ENOMEM;
399
400         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
401
402         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
403                              &cmd->u.pccc_add.pccc_conds);
404         if (rc)
405                 pcc_cmd_fini(cmd);
406
407         return rc;
408 }
409
410 static int
411 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
412 {
413         char *key, *val;
414         unsigned long id;
415         int rc;
416
417         val = buffer;
418         key = strsep(&val, "=");
419         if (val == NULL || strlen(val) == 0)
420                 return -EINVAL;
421
422         /* Key of the value pair */
423         if (strcmp(key, "rwid") == 0) {
424                 rc = kstrtoul(val, 10, &id);
425                 if (rc)
426                         return rc;
427                 if (id <= 0)
428                         return -EINVAL;
429                 cmd->u.pccc_add.pccc_rwid = id;
430         } else if (strcmp(key, "roid") == 0) {
431                 rc = kstrtoul(val, 10, &id);
432                 if (rc)
433                         return rc;
434                 if (id <= 0)
435                         return -EINVAL;
436                 cmd->u.pccc_add.pccc_roid = id;
437         } else if (strcmp(key, "auto_attach") == 0) {
438                 rc = kstrtoul(val, 10, &id);
439                 if (rc)
440                         return rc;
441                 if (id == 0)
442                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
443         } else if (strcmp(key, "open_attach") == 0) {
444                 rc = kstrtoul(val, 10, &id);
445                 if (rc)
446                         return rc;
447                 if (id == 0)
448                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
449         } else if (strcmp(key, "io_attach") == 0) {
450                 rc = kstrtoul(val, 10, &id);
451                 if (rc)
452                         return rc;
453                 if (id == 0)
454                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
455         } else if (strcmp(key, "stat_attach") == 0) {
456                 rc = kstrtoul(val, 10, &id);
457                 if (rc)
458                         return rc;
459                 if (id == 0)
460                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
461         } else if (strcmp(key, "rwpcc") == 0 || strcmp(key, "pccrw") == 0) {
462                 rc = kstrtoul(val, 10, &id);
463                 if (rc)
464                         return rc;
465                 if (id > 0)
466                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCCRW;
467         } else if (strcmp(key, "ropcc") == 0 || strcmp(key, "pccro") == 0) {
468                 rc = kstrtoul(val, 10, &id);
469                 if (rc)
470                         return rc;
471                 if (id > 0)
472                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCCRO;
473         } else {
474                 return -EINVAL;
475         }
476
477         return 0;
478 }
479
480 static int
481 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
482 {
483         char *val;
484         char *token;
485         int rc;
486
487         switch (cmd->pccc_cmd) {
488         case PCC_ADD_DATASET:
489                 /* Enable auto attach by default */
490                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
491                 break;
492         case PCC_DEL_DATASET:
493         case PCC_CLEAR_ALL:
494                 break;
495         default:
496                 return -EINVAL;
497         }
498
499         val = buffer;
500         while (val != NULL && strlen(val) != 0) {
501                 token = strsep(&val, " ");
502                 rc = pcc_parse_value_pair(cmd, token);
503                 if (rc)
504                         return rc;
505         }
506
507         return 0;
508 }
509
510 static void
511 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
512 {
513         if (!list_empty(&rule->pmr_conds))
514                 pcc_rule_conds_free(&rule->pmr_conds);
515         LASSERT(rule->pmr_conds_str != NULL);
516         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
517 }
518
519 static int
520 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
521 {
522         int rc = 0;
523
524         LASSERT(cmd->u.pccc_add.pccc_conds_str);
525         OBD_ALLOC(rule->pmr_conds_str,
526                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
527         if (rule->pmr_conds_str == NULL)
528                 return -ENOMEM;
529
530         memcpy(rule->pmr_conds_str,
531                cmd->u.pccc_add.pccc_conds_str,
532                strlen(cmd->u.pccc_add.pccc_conds_str));
533
534         INIT_LIST_HEAD(&rule->pmr_conds);
535         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
536                 rc = pcc_conds_parse(rule->pmr_conds_str,
537                                      &rule->pmr_conds);
538
539         if (rc)
540                 pcc_dataset_rule_fini(rule);
541
542         return rc;
543 }
544
545 /* Rule Matching */
546 static int
547 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
548 {
549         struct pcc_match_id *id;
550
551         list_for_each_entry(id, id_list, pmi_linkage) {
552                 if (id->pmi_id == id_val)
553                         return 1;
554         }
555         return 0;
556 }
557
558 static bool
559 cfs_match_wildcard(const char *pattern, const char *content)
560 {
561         if (*pattern == '\0' && *content == '\0')
562                 return true;
563
564         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
565                 return false;
566
567         while (*pattern == *content) {
568                 pattern++;
569                 content++;
570                 if (*pattern == '\0' && *content == '\0')
571                         return true;
572
573                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
574                     *content == '\0')
575                         return false;
576         }
577
578         if (*pattern == '*')
579                 return (cfs_match_wildcard(pattern + 1, content) ||
580                         cfs_match_wildcard(pattern, content + 1));
581
582         return false;
583 }
584
585 static int
586 pcc_fname_list_match(struct list_head *fname_list, const char *name)
587 {
588         struct pcc_match_fname *fname;
589
590         list_for_each_entry(fname, fname_list, pmf_linkage) {
591                 if (cfs_match_wildcard(fname->pmf_name, name))
592                         return 1;
593         }
594         return 0;
595 }
596
597 static int
598 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
599 {
600         switch (expr->pe_field) {
601         case PCC_FIELD_UID:
602                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
603         case PCC_FIELD_GID:
604                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
605         case PCC_FIELD_PROJID:
606                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
607         case PCC_FIELD_FNAME:
608                 return pcc_fname_list_match(&expr->pe_cond,
609                                             matcher->pm_name->name);
610         default:
611                 return 0;
612         }
613 }
614
615 static int
616 pcc_conjunction_match(struct pcc_conjunction *conjunction,
617                       struct pcc_matcher *matcher)
618 {
619         struct pcc_expression *expr;
620         int matched;
621
622         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
623                 matched = pcc_expression_match(expr, matcher);
624                 if (!matched)
625                         return 0;
626         }
627
628         return 1;
629 }
630
631 static int
632 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
633 {
634         struct pcc_conjunction *conjunction;
635         int matched;
636
637         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
638                 matched = pcc_conjunction_match(conjunction, matcher);
639                 if (matched)
640                         return 1;
641         }
642
643         return 0;
644 }
645
646 static inline bool
647 pcc_dataset_attach_allowed(struct pcc_dataset *dataset, enum lu_pcc_type type)
648 {
649         if (type == LU_PCC_READWRITE && dataset->pccd_flags & PCC_DATASET_PCCRW)
650                 return true;
651
652         if (type == LU_PCC_READONLY && dataset->pccd_flags & PCC_DATASET_PCCRO)
653                 return true;
654
655         return false;
656 }
657
658 struct pcc_dataset*
659 pcc_dataset_match_get(struct pcc_super *super, enum lu_pcc_type type,
660                       struct pcc_matcher *matcher)
661 {
662         struct pcc_dataset *dataset;
663         struct pcc_dataset *selected = NULL;
664
665         down_read(&super->pccs_rw_sem);
666         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
667                 if (pcc_dataset_attach_allowed(dataset, type) &&
668                     pcc_cond_match(&dataset->pccd_rule, matcher)) {
669                         atomic_inc(&dataset->pccd_refcount);
670                         selected = dataset;
671                         break;
672                 }
673         }
674         up_read(&super->pccs_rw_sem);
675         if (selected)
676                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
677                        dataset->pccd_rule.pmr_conds_str,
678                        matcher->pm_uid, matcher->pm_gid,
679                        matcher->pm_projid, matcher->pm_name->name);
680
681         return selected;
682 }
683
684 static int
685 pcc_dataset_flags_check(struct pcc_super *super, struct pcc_cmd *cmd)
686 {
687         struct ll_sb_info *sbi;
688
689         sbi = container_of(super, struct ll_sb_info, ll_pcc_super);
690
691         /*
692          * A PCC backend can provide caching service for both PCC-RW and PCC-RO.
693          * It defaults to readonly PCC as long as the server supports it.
694          */
695         if (!(exp_connect_flags2(sbi->ll_md_exp) & OBD_CONNECT2_PCCRO)) {
696                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRO ||
697                     !(cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRW))
698                         return -EOPNOTSUPP;
699         } else if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0) {
700                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_DEFAULT;
701         } /* else RWPCC or ROPCC must have been given */
702
703         if (cmd->u.pccc_add.pccc_rwid == 0 &&
704             cmd->u.pccc_add.pccc_roid == 0)
705                 return -EINVAL;
706
707         if (cmd->u.pccc_add.pccc_rwid == 0 &&
708             cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRW)
709                 cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
710
711         if (cmd->u.pccc_add.pccc_roid == 0 &&
712             cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCCRO)
713                 cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
714
715         return 0;
716 }
717
718 /**
719  * pcc_dataset_add - Add a Cache policy to control which files need be
720  * cached and where it will be cached.
721  *
722  * @super:      superblock of pcc
723  * @cmd:        pcc command
724  */
725 static int
726 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
727 {
728         char *pathname = cmd->pccc_pathname;
729         struct pcc_dataset *dataset;
730         struct pcc_dataset *tmp;
731         bool found = false;
732         int rc;
733
734         rc = pcc_dataset_flags_check(super, cmd);
735         if (rc)
736                 return rc;
737
738         OBD_ALLOC_PTR(dataset);
739         if (dataset == NULL)
740                 return -ENOMEM;
741
742         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
743         if (unlikely(rc)) {
744                 OBD_FREE_PTR(dataset);
745                 return rc;
746         }
747         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
748         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
749         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
750         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
751         atomic_set(&dataset->pccd_refcount, 1);
752
753         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
754         if (rc) {
755                 pcc_dataset_put(dataset);
756                 return rc;
757         }
758
759         down_write(&super->pccs_rw_sem);
760         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
761                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
762                     (dataset->pccd_rwid != 0 &&
763                      dataset->pccd_rwid == tmp->pccd_rwid) ||
764                     (dataset->pccd_roid != 0 &&
765                      dataset->pccd_roid == tmp->pccd_roid)) {
766                         found = true;
767                         break;
768                 }
769         }
770         if (!found)
771                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
772         up_write(&super->pccs_rw_sem);
773
774         if (found) {
775                 pcc_dataset_put(dataset);
776                 rc = -EEXIST;
777         }
778
779         return rc;
780 }
781
782 static struct pcc_dataset *
783 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
784 {
785         struct pcc_dataset *dataset;
786         struct pcc_dataset *selected = NULL;
787
788         if (id == 0)
789                 return NULL;
790
791         /*
792          * archive ID (read-write ID) or read-only ID is unique in the list,
793          * we just return last added one as first priority.
794          */
795         down_read(&super->pccs_rw_sem);
796         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
797                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
798                     !(dataset->pccd_flags & PCC_DATASET_PCCRW)))
799                         continue;
800                 if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
801                     !(dataset->pccd_flags & PCC_DATASET_PCCRO)))
802                         continue;
803                 atomic_inc(&dataset->pccd_refcount);
804                 selected = dataset;
805                 break;
806         }
807         up_read(&super->pccs_rw_sem);
808         if (selected)
809                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
810
811         return selected;
812 }
813
814 void
815 pcc_dataset_put(struct pcc_dataset *dataset)
816 {
817         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
818                 pcc_dataset_rule_fini(&dataset->pccd_rule);
819                 path_put(&dataset->pccd_path);
820                 OBD_FREE_PTR(dataset);
821         }
822 }
823
824 static int
825 pcc_dataset_del(struct pcc_super *super, char *pathname)
826 {
827         struct list_head *l, *tmp;
828         struct pcc_dataset *dataset;
829         int rc = -ENOENT;
830
831         down_write(&super->pccs_rw_sem);
832         list_for_each_safe(l, tmp, &super->pccs_datasets) {
833                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
834                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
835                         list_del_init(&dataset->pccd_linkage);
836                         pcc_dataset_put(dataset);
837                         super->pccs_generation++;
838                         rc = 0;
839                         break;
840                 }
841         }
842         up_write(&super->pccs_rw_sem);
843         return rc;
844 }
845
846 static void
847 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
848 {
849         seq_printf(m, "%s:\n", dataset->pccd_pathname);
850         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
851         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
852         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
853 }
854
855 int
856 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
857 {
858         struct pcc_dataset *dataset;
859
860         down_read(&super->pccs_rw_sem);
861         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
862                 pcc_dataset_dump(dataset, m);
863         }
864         up_read(&super->pccs_rw_sem);
865         return 0;
866 }
867
868 static void pcc_remove_datasets(struct pcc_super *super)
869 {
870         struct pcc_dataset *dataset, *tmp;
871
872         down_write(&super->pccs_rw_sem);
873         list_for_each_entry_safe(dataset, tmp,
874                                  &super->pccs_datasets, pccd_linkage) {
875                 list_del(&dataset->pccd_linkage);
876                 pcc_dataset_put(dataset);
877         }
878         super->pccs_generation++;
879         up_write(&super->pccs_rw_sem);
880 }
881
882 void pcc_super_fini(struct pcc_super *super)
883 {
884         pcc_remove_datasets(super);
885         put_cred(super->pccs_cred);
886 }
887
888 static bool pathname_is_valid(const char *pathname)
889 {
890         /* Needs to be absolute path */
891         if (pathname == NULL || strlen(pathname) == 0 ||
892             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
893                 return false;
894         return true;
895 }
896
897 static struct pcc_cmd *
898 pcc_cmd_parse(char *buffer, unsigned long count)
899 {
900         static struct pcc_cmd *cmd;
901         char *token;
902         char *val;
903         int rc = 0;
904
905         OBD_ALLOC_PTR(cmd);
906         if (cmd == NULL)
907                 GOTO(out, rc = -ENOMEM);
908
909         /* clear all setting */
910         if (strncmp(buffer, "clear", 5) == 0) {
911                 cmd->pccc_cmd = PCC_CLEAR_ALL;
912                 GOTO(out, rc = 0);
913         }
914
915         val = buffer;
916         token = strsep(&val, " ");
917         if (val == NULL || strlen(val) == 0)
918                 GOTO(out_free_cmd, rc = -EINVAL);
919
920         /* Type of the command */
921         if (strcmp(token, "add") == 0)
922                 cmd->pccc_cmd = PCC_ADD_DATASET;
923         else if (strcmp(token, "del") == 0)
924                 cmd->pccc_cmd = PCC_DEL_DATASET;
925         else
926                 GOTO(out_free_cmd, rc = -EINVAL);
927
928         /* Pathname of the dataset */
929         token = strsep(&val, " ");
930         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
931             !pathname_is_valid(token))
932                 GOTO(out_free_cmd, rc = -EINVAL);
933         cmd->pccc_pathname = token;
934
935         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
936                 /* List of ID */
937                 LASSERT(val);
938                 token = val;
939                 val = strrchr(token, '}');
940                 if (!val)
941                         GOTO(out_free_cmd, rc = -EINVAL);
942
943                 /* Skip '}' */
944                 val++;
945                 if (*val == '\0') {
946                         val = NULL;
947                 } else if (*val == ' ') {
948                         *val = '\0';
949                         val++;
950                 } else {
951                         GOTO(out_free_cmd, rc = -EINVAL);
952                 }
953
954                 rc = pcc_id_parse(cmd, token);
955                 if (rc)
956                         GOTO(out_free_cmd, rc);
957
958                 rc = pcc_parse_value_pairs(cmd, val);
959                 if (rc)
960                         GOTO(out_cmd_fini, rc = -EINVAL);
961         }
962         goto out;
963 out_cmd_fini:
964         pcc_cmd_fini(cmd);
965 out_free_cmd:
966         OBD_FREE_PTR(cmd);
967 out:
968         if (rc)
969                 cmd = ERR_PTR(rc);
970         return cmd;
971 }
972
973 int pcc_cmd_handle(char *buffer, unsigned long count,
974                    struct pcc_super *super)
975 {
976         int rc = 0;
977         struct pcc_cmd *cmd;
978
979         cmd = pcc_cmd_parse(buffer, count);
980         if (IS_ERR(cmd))
981                 return PTR_ERR(cmd);
982
983         switch (cmd->pccc_cmd) {
984         case PCC_ADD_DATASET:
985                 rc = pcc_dataset_add(super, cmd);
986                 break;
987         case PCC_DEL_DATASET:
988                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
989                 break;
990         case PCC_CLEAR_ALL:
991                 pcc_remove_datasets(super);
992                 break;
993         default:
994                 rc = -EINVAL;
995                 break;
996         }
997
998         pcc_cmd_fini(cmd);
999         OBD_FREE_PTR(cmd);
1000         return rc;
1001 }
1002
1003 static inline void pcc_inode_lock(struct inode *inode)
1004 {
1005         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
1006 }
1007
1008 static inline void pcc_inode_unlock(struct inode *inode)
1009 {
1010         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1011 }
1012
1013 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1014 {
1015         pcci->pcci_lli = lli;
1016         lli->lli_pcc_inode = pcci;
1017         atomic_set(&pcci->pcci_refcount, 0);
1018         pcci->pcci_type = LU_PCC_NONE;
1019         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1020         atomic_set(&pcci->pcci_active_ios, 0);
1021         init_waitqueue_head(&pcci->pcci_waitq);
1022 }
1023
1024 static void pcc_inode_fini(struct pcc_inode *pcci)
1025 {
1026         struct ll_inode_info *lli = pcci->pcci_lli;
1027
1028         path_put(&pcci->pcci_path);
1029         pcci->pcci_type = LU_PCC_NONE;
1030         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1031         lli->lli_pcc_inode = NULL;
1032 }
1033
1034 static void pcc_inode_get(struct pcc_inode *pcci)
1035 {
1036         atomic_inc(&pcci->pcci_refcount);
1037 }
1038
1039 static void pcc_inode_put(struct pcc_inode *pcci)
1040 {
1041         if (atomic_dec_and_test(&pcci->pcci_refcount))
1042                 pcc_inode_fini(pcci);
1043 }
1044
1045 void pcc_inode_free(struct inode *inode)
1046 {
1047         struct pcc_inode *pcci = ll_i2pcci(inode);
1048
1049         if (pcci) {
1050                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1051                 pcc_inode_put(pcci);
1052         }
1053 }
1054
1055 /*
1056  * TODO:
1057  * As Andreas suggested, we'd better use new layout to
1058  * reduce overhead:
1059  * (fid->f_oid >> 16 & oxFFFF)/FID
1060  */
1061 #define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1062 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1063 {
1064         return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1065                          DFID_NOBRACE,
1066                          (fid)->f_oid       & 0xFFFF,
1067                          (fid)->f_oid >> 16 & 0xFFFF,
1068                          (unsigned int)((fid)->f_seq       & 0xFFFF),
1069                          (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1070                          (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1071                          (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1072                          PFID(fid));
1073 }
1074
1075 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1076 {
1077         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1078 }
1079
1080 void pcc_file_init(struct pcc_file *pccf)
1081 {
1082         pccf->pccf_file = NULL;
1083         pccf->pccf_type = LU_PCC_NONE;
1084 }
1085
1086 static inline bool pcc_auto_attach_enabled(struct pcc_dataset *dataset,
1087                                            enum lu_pcc_type type,
1088                                            enum pcc_io_type iot)
1089 {
1090         if (pcc_dataset_attach_allowed(dataset, type)) {
1091                 if (iot == PIT_OPEN)
1092                         return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
1093                 if (iot == PIT_GETATTR)
1094                         return dataset->pccd_flags & PCC_DATASET_STAT_ATTACH;
1095                 else
1096                         return dataset->pccd_flags & PCC_DATASET_AUTO_ATTACH;
1097         }
1098
1099         return false;
1100 }
1101
1102 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1103
1104 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1105 {
1106         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1107         struct ll_inode_info *lli = pcci->pcci_lli;
1108         int rc;
1109
1110         ENTRY;
1111
1112         if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1113                 RETURN(0);
1114
1115         rc = ll_vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1116                              &gen, sizeof(gen), 0);
1117
1118         RETURN(rc);
1119 }
1120
1121 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1122 {
1123         struct lu_env *env;
1124         struct ll_inode_info *lli = ll_i2info(inode);
1125         __u16 refcheck;
1126         int rc;
1127
1128         ENTRY;
1129
1130         if (!lli->lli_clob)
1131                 RETURN(-EINVAL);
1132
1133         env = cl_env_get(&refcheck);
1134         if (IS_ERR(env))
1135                 RETURN(PTR_ERR(env));
1136
1137         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1138         if (rc < 0)
1139                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1140                        PFID(ll_inode2fid(inode)));
1141
1142         cl_env_put(env, &refcheck);
1143         RETURN(rc < 0 ? rc : 0);
1144 }
1145
1146 /* Must be called with pcci->pcci_lock held */
1147 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1148                                   struct pcc_inode *pcci,
1149                                   struct dentry *dentry,
1150                                   enum lu_pcc_type type)
1151 {
1152         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1153         pcci->pcci_path.dentry = dentry;
1154         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1155         atomic_set(&pcci->pcci_refcount, 1);
1156         pcci->pcci_type = type;
1157         pcci->pcci_attr_valid = false;
1158 }
1159
1160 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1161                                          struct pcc_dataset *dataset)
1162 {
1163         lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1164         lli->lli_pcc_dsflags = dataset->pccd_flags;
1165 }
1166
1167 static void pcc_inode_attach_set(struct pcc_super *super,
1168                                  struct pcc_dataset *dataset,
1169                                  struct ll_inode_info *lli,
1170                                  struct pcc_inode *pcci,
1171                                  struct dentry *dentry,
1172                                  enum lu_pcc_type type)
1173 {
1174         pcc_inode_init(pcci, lli);
1175         pcc_inode_attach_init(dataset, pcci, dentry, type);
1176         down_read(&super->pccs_rw_sem);
1177         pcc_inode_dsflags_set(lli, dataset);
1178         up_read(&super->pccs_rw_sem);
1179 }
1180
1181 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1182                                       __u32 gen)
1183 {
1184         pcci->pcci_layout_gen = gen;
1185 }
1186
1187 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1188 {
1189         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1190 }
1191
1192 static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
1193 {
1194         char *ptr = NULL, *component;
1195         struct dentry *parent;
1196         struct dentry *child = ERR_PTR(-ENOENT);
1197
1198         ptr = pathname;
1199
1200         /* move past any initial '/' to the start of the first path component*/
1201         while (*ptr == '/')
1202                 ptr++;
1203
1204         /* store the start of the first path component */
1205         component = ptr;
1206
1207         parent = dget(base);
1208         while (ptr) {
1209                 /* find the start of the next component - if we don't find it,
1210                  * the current component is the last component
1211                  */
1212                 ptr = strchr(ptr, '/');
1213                 /* put a NUL char in place of the '/' before the next compnent
1214                  * so we can treat this component as a string; note the full
1215                  * path string is NUL terminated to this is not needed for the
1216                  * last component
1217                  */
1218                 if (ptr)
1219                         *ptr = '\0';
1220
1221                 /* look up the current component */
1222                 inode_lock(parent->d_inode);
1223                 child = lookup_one_len(component, parent, strlen(component));
1224                 inode_unlock(parent->d_inode);
1225
1226                 /* repair the path string: put '/' back in place of the NUL */
1227                 if (ptr)
1228                         *ptr = '/';
1229
1230                 dput(parent);
1231
1232                 if (IS_ERR_OR_NULL(child))
1233                         break;
1234
1235                 /* we may find a cached negative dentry */
1236                 if (!d_is_positive(child)) {
1237                         dput(child);
1238                         child = NULL;
1239                         break;
1240                 }
1241
1242                 /* descend in to the next level of the path */
1243                 parent = child;
1244
1245                 /* move the pointer past the '/' to the next component */
1246                 if (ptr)
1247                         ptr++;
1248                 component = ptr;
1249         }
1250
1251         /* NULL child means we didn't find anything */
1252         if (!child)
1253                 child = ERR_PTR(-ENOENT);
1254
1255         return child;
1256 }
1257
1258 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1259                                   enum lu_pcc_type type,
1260                                   struct pcc_dataset *dataset,
1261                                   bool *cached)
1262 {
1263         struct ll_inode_info *lli = ll_i2info(inode);
1264         struct pcc_inode *pcci = lli->lli_pcc_inode;
1265         const struct cred *old_cred;
1266         struct dentry *pcc_dentry = NULL;
1267         char pathname[PCC_DATASET_MAX_PATH];
1268         __u32 pcc_gen;
1269         int rc;
1270
1271         ENTRY;
1272
1273         if (type == LU_PCC_READWRITE &&
1274             !(dataset->pccd_flags & PCC_DATASET_PCCRW))
1275                 RETURN(0);
1276
1277         if (type == LU_PCC_READONLY &&
1278             !(dataset->pccd_flags & PCC_DATASET_PCCRO))
1279                 RETURN(0);
1280
1281         rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
1282                                   &lli->lli_fid);
1283
1284         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1285         pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
1286         if (IS_ERR(pcc_dentry)) {
1287                 rc = PTR_ERR(pcc_dentry);
1288                 CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
1289                        ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
1290                        pathname, rc);
1291                 /* ignore this error */
1292                 GOTO(out, rc = 0);
1293         }
1294
1295         rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1296                              &pcc_gen, sizeof(pcc_gen));
1297         if (rc < 0)
1298                 /* ignore this error */
1299                 GOTO(out_put_pcc_dentry, rc = 0);
1300
1301         rc = 0;
1302         /* The file is still valid cached in PCC, attach it immediately. */
1303         if (pcc_gen == gen) {
1304                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1305                        PFID(&lli->lli_fid), gen);
1306                 if (!pcci) {
1307                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1308                         if (pcci == NULL)
1309                                 GOTO(out_put_pcc_dentry, rc = -ENOMEM);
1310
1311                         pcc_inode_init(pcci, lli);
1312                         dget(pcc_dentry);
1313                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1314                 } else {
1315                         /*
1316                          * This happened when a file was once attached into
1317                          * PCC, and some processes keep this file opened
1318                          * (pcci->refcount > 1) and corresponding PCC file
1319                          * without any I/O activity, and then this file was
1320                          * detached by the manual detach command or the
1321                          * revocation of the layout lock (i.e. cached LRU lock
1322                          * shrinking).
1323                          */
1324                         pcc_inode_get(pcci);
1325                         pcci->pcci_type = type;
1326                 }
1327                 pcc_inode_dsflags_set(lli, dataset);
1328                 pcc_layout_gen_set(pcci, gen);
1329                 *cached = true;
1330         }
1331 out_put_pcc_dentry:
1332         dput(pcc_dentry);
1333 out:
1334         revert_creds(old_cred);
1335         RETURN(rc);
1336 }
1337
1338 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1339                                    __u32 gen, enum lu_pcc_type type,
1340                                    bool *cached)
1341 {
1342         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1343         struct ll_inode_info *lli = ll_i2info(inode);
1344         struct pcc_dataset *dataset = NULL, *tmp;
1345         int rc = 0;
1346
1347         ENTRY;
1348
1349         down_read(&super->pccs_rw_sem);
1350         list_for_each_entry_safe(dataset, tmp,
1351                                  &super->pccs_datasets, pccd_linkage) {
1352                 if (!pcc_auto_attach_enabled(dataset, type, iot))
1353                         break;
1354
1355                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1356                 if (rc < 0 || (!rc && *cached))
1357                         break;
1358         }
1359
1360         /*
1361          * Update the saved dataset flags for the inode accordingly if failed.
1362          */
1363         if (!rc && !*cached) {
1364                 /*
1365                  * Currently auto attach strategy for a PCC backend is
1366                  * unchangeable once once it was added into the PCC datasets on
1367                  * a client as the support to change auto attach strategy is
1368                  * not implemented yet.
1369                  */
1370                 /*
1371                  * If tried to attach from one PCC backend:
1372                  * @lli_pcc_generation > 0:
1373                  * 1) The file was once attached into PCC, but now the
1374                  * corresponding PCC backend should be removed from the client;
1375                  * 2) The layout generation was changed, the data has been
1376                  * restored;
1377                  * 3) The corresponding PCC copy is not existed on PCC
1378                  * @lli_pcc_generation == 0:
1379                  * The file is never attached into PCC but in a HSM released
1380                  * state, or once attached into PCC but the inode was evicted
1381                  * from icache later.
1382                  * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1383                  * file will skip from the candidates to try auto attach until
1384                  * the file is attached into PCC again.
1385                  *
1386                  * If the file was never attached into PCC, or once attached but
1387                  * its inode was evicted from icache (lli_pcc_generation == 0),
1388                  * or the corresponding dataset was removed from the client,
1389                  * set the saved dataset flags with PCC_DATASET_NONE.
1390                  *
1391                  * TODO: If the file was once attached into PCC but not try to
1392                  * auto attach due to the change of the configuration parameters
1393                  * for this dataset (i.e. change from auto attach enabled to
1394                  * auto attach disabled for this dataset), update the saved
1395                  * dataset flags with the found one.
1396                  */
1397                 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1398         }
1399         up_read(&super->pccs_rw_sem);
1400
1401         RETURN(rc);
1402 }
1403
1404 static int pcc_readonly_ioctl_attach(struct file *file, struct inode *inode,
1405                                      __u32 roid);
1406
1407 /* Call with pcci_mutex hold */
1408 static int pcc_try_readonly_open_attach(struct inode *inode, struct file *file,
1409                                         bool *cached)
1410 {
1411         struct dentry *dentry = file->f_path.dentry;
1412         struct pcc_dataset *dataset;
1413         struct pcc_matcher item;
1414         struct pcc_inode *pcci;
1415         int rc = 0;
1416
1417         ENTRY;
1418
1419         if (!((file->f_flags & O_ACCMODE) == O_RDONLY))
1420                 RETURN(0);
1421
1422         item.pm_uid = from_kuid(&init_user_ns, current_uid());
1423         item.pm_gid = from_kgid(&init_user_ns, current_gid());
1424         item.pm_projid = ll_i2info(inode)->lli_projid;
1425         item.pm_name = &dentry->d_name;
1426         dataset = pcc_dataset_match_get(&ll_i2sbi(inode)->ll_pcc_super,
1427                                         LU_PCC_READONLY, &item);
1428         if (dataset == NULL)
1429                 RETURN(0);
1430
1431         if ((dataset->pccd_flags & PCC_DATASET_PCC_ALL) == PCC_DATASET_PCCRO) {
1432                 pcc_inode_unlock(inode);
1433                 rc = pcc_readonly_ioctl_attach(file, inode, dataset->pccd_roid);
1434                 pcc_inode_lock(inode);
1435                 pcci = ll_i2pcci(inode);
1436                 if (pcci && pcc_inode_has_layout(pcci))
1437                         *cached = true;
1438                 if (rc) {
1439                         CDEBUG(D_CACHE,
1440                                "Failed to try PCC-RO attach "DFID", rc = %d\n",
1441                                PFID(&ll_i2info(inode)->lli_fid), rc);
1442                         /* ignore the error during auto PCC-RO attach. */
1443                         rc = 0;
1444                 }
1445         }
1446
1447         pcc_dataset_put(dataset);
1448         RETURN(rc);
1449 }
1450
1451 /*
1452  * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1453  * Thus the client can get archive ID from the layout directly. When try to
1454  * attach the file automatically which is in HSM released state (according to
1455  * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1456  * valid cached on PCC more precisely according to the @rwid (archive ID) in
1457  * the PCC dataset and the archive ID in HSM attrs.
1458  */
1459 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1460                                enum pcc_io_type iot)
1461 {
1462         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1463         struct cl_layout clt = {
1464                 .cl_layout_gen = 0,
1465                 .cl_is_released = false,
1466         };
1467         struct ll_inode_info *lli = ll_i2info(inode);
1468         __u32 gen;
1469         int rc;
1470
1471         ENTRY;
1472
1473         /*
1474          * Quick check whether there is PCC device.
1475          */
1476         if (list_empty(&super->pccs_datasets))
1477                 RETURN(0);
1478
1479         /*
1480          * The file layout lock was cancelled. And this open does not
1481          * obtain valid layout lock from MDT (i.e. the file is being
1482          * HSM restoring).
1483          */
1484         if (iot == PIT_OPEN) {
1485                 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1486                         RETURN(0);
1487         } else {
1488                 rc = ll_layout_refresh(inode, &gen);
1489                 if (rc)
1490                         RETURN(rc);
1491         }
1492
1493         rc = pcc_get_layout_info(inode, &clt);
1494         if (rc)
1495                 RETURN(rc);
1496
1497         if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1498                 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1499                        PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1500                 RETURN(-EINVAL);
1501         }
1502
1503         if (clt.cl_is_released)
1504                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1505                                              LU_PCC_READWRITE, cached);
1506         else if (clt.cl_is_rdonly)
1507                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1508                                              LU_PCC_READONLY, cached);
1509
1510         RETURN(rc);
1511 }
1512
1513 static inline bool pcc_may_auto_attach(struct inode *inode,
1514                                        enum pcc_io_type iot)
1515 {
1516         struct ll_inode_info *lli = ll_i2info(inode);
1517         struct pcc_super *super = ll_i2pccs(inode);
1518
1519         ENTRY;
1520
1521         /* Known the file was not in any PCC backend. */
1522         if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1523                 RETURN(false);
1524
1525         /*
1526          * lli_pcc_generation == 0 means that the file was never attached into
1527          * PCC, or may be once attached into PCC but detached as the inode is
1528          * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1529          * icache shrinking due to the memory pressure), which will cause the
1530          * file detach from PCC when releasing the inode from icache.
1531          * In either case, we still try to attach.
1532          */
1533         /* lli_pcc_generation == 0, or the PCC setting was changed,
1534          * or there is no PCC setup on the client and the try will return
1535          * immediately in pcc_try_auto_attach().
1536          */
1537         if (super->pccs_generation != lli->lli_pcc_generation)
1538                 RETURN(true);
1539
1540         /* The cached setting @lli_pcc_dsflags is valid */
1541         if (iot == PIT_OPEN)
1542                 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
1543
1544         if (iot == PIT_GETATTR)
1545                 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
1546
1547         RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
1548 }
1549
1550 int pcc_file_open(struct inode *inode, struct file *file)
1551 {
1552         struct pcc_inode *pcci;
1553         struct ll_inode_info *lli = ll_i2info(inode);
1554         struct ll_file_data *fd = file->private_data;
1555         struct pcc_file *pccf = &fd->fd_pcc_file;
1556         struct file *pcc_file;
1557         struct path *path;
1558         bool cached = false;
1559         int rc = 0;
1560
1561         ENTRY;
1562
1563         if (!S_ISREG(inode->i_mode))
1564                 RETURN(0);
1565
1566         if (IS_ENCRYPTED(inode))
1567                 RETURN(0);
1568
1569         pcc_inode_lock(inode);
1570         pcci = ll_i2pcci(inode);
1571
1572         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1573                 GOTO(out_unlock, rc = 0);
1574
1575         if (!pcci || !pcc_inode_has_layout(pcci)) {
1576                 if (pcc_may_auto_attach(inode, PIT_OPEN))
1577                         rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1578
1579                 if (rc == 0 && !cached)
1580                         rc = pcc_try_readonly_open_attach(inode, file, &cached);
1581
1582                 if (rc < 0 || !cached)
1583                         GOTO(out_unlock, rc);
1584
1585                 if (!pcci)
1586                         pcci = ll_i2pcci(inode);
1587         }
1588
1589         pcc_inode_get(pcci);
1590         WARN_ON(pccf->pccf_file);
1591
1592         path = &pcci->pcci_path;
1593         CDEBUG(D_CACHE, "opening pcc file '%pd'\n", path->dentry);
1594
1595         pcc_file = dentry_open(path, file->f_flags,
1596                                pcc_super_cred(inode->i_sb));
1597         if (IS_ERR_OR_NULL(pcc_file)) {
1598                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1599                 pcc_inode_put(pcci);
1600         } else {
1601                 pccf->pccf_file = pcc_file;
1602                 pccf->pccf_type = pcci->pcci_type;
1603         }
1604
1605 out_unlock:
1606         pcc_inode_unlock(inode);
1607         RETURN(rc);
1608 }
1609
1610 void pcc_file_release(struct inode *inode, struct file *file)
1611 {
1612         struct pcc_inode *pcci;
1613         struct ll_file_data *fd = file->private_data;
1614         struct pcc_file *pccf;
1615         struct path *path;
1616
1617         ENTRY;
1618
1619         if (!S_ISREG(inode->i_mode) || fd == NULL)
1620                 RETURN_EXIT;
1621
1622         pccf = &fd->fd_pcc_file;
1623         pcc_inode_lock(inode);
1624         if (pccf->pccf_file == NULL)
1625                 goto out;
1626
1627         pcci = ll_i2pcci(inode);
1628         LASSERT(pcci);
1629         path = &pcci->pcci_path;
1630         CDEBUG(D_CACHE, "releasing pcc file \"%pd\"\n", path->dentry);
1631         pcc_inode_put(pcci);
1632         fput(pccf->pccf_file);
1633         pccf->pccf_file = NULL;
1634 out:
1635         pcc_inode_unlock(inode);
1636         RETURN_EXIT;
1637 }
1638
1639 /* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
1640 static bool pcc_io_tolerate(struct pcc_inode *pcci,
1641                             enum pcc_io_type iot, int rc)
1642 {
1643         if (pcci->pcci_type == LU_PCC_READWRITE) {
1644                 if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
1645                         return false;
1646                 /* Handle the ->page_mkwrite failure tolerance separately
1647                  * in pcc_page_mkwrite().
1648                  */
1649         } else if (pcci->pcci_type == LU_PCC_READONLY) {
1650                 /*
1651                  * For async I/O engine such as libaio and io_uring, PCC read
1652                  * should not tolerate -EAGAIN/-EIOCBQUEUED errors, return
1653                  * the error code to the caller directly.
1654                  */
1655                 if ((iot == PIT_READ || iot == PIT_GETATTR ||
1656                      iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM &&
1657                      rc != -EAGAIN && rc != -EIOCBQUEUED)
1658                         return false;
1659                 if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
1660                     !(rc & VM_FAULT_OOM))
1661                         return false;
1662         }
1663
1664         return true;
1665 }
1666
1667 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1668 {
1669         struct pcc_inode *pcci;
1670
1671         pcc_inode_lock(inode);
1672         pcci = ll_i2pcci(inode);
1673         if (pcci && pcc_inode_has_layout(pcci)) {
1674                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1675                 if (pcci->pcci_type == LU_PCC_READONLY &&
1676                     (iot == PIT_WRITE || iot == PIT_SETATTR ||
1677                      iot == PIT_PAGE_MKWRITE)) {
1678                         /* Fall back to normal I/O path */
1679                         *cached = false;
1680                         /* For mmap write, we need to detach the file from
1681                          * RO-PCC, release the page got from ->fault(), and
1682                          * then retry the memory fault handling (->fault()
1683                          * and ->page_mkwrite()).
1684                          * These are done in pcc_page_mkwrite();
1685                          */
1686                 } else {
1687                         atomic_inc(&pcci->pcci_active_ios);
1688                         *cached = true;
1689                 }
1690         } else {
1691                 *cached = false;
1692                 if (pcc_may_auto_attach(inode, iot)) {
1693                         (void) pcc_try_auto_attach(inode, cached, iot);
1694                         if (*cached) {
1695                                 pcci = ll_i2pcci(inode);
1696                                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1697                                 atomic_inc(&pcci->pcci_active_ios);
1698                         }
1699                 }
1700         }
1701         pcc_inode_unlock(inode);
1702 }
1703
1704 static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
1705                         int rc, bool *cached)
1706 {
1707         struct pcc_inode *pcci = ll_i2pcci(inode);
1708
1709         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
1710
1711         *cached = pcc_io_tolerate(pcci, iot, rc);
1712         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1713                 wake_up(&pcci->pcci_waitq);
1714 }
1715
1716
1717 static ssize_t
1718 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1719 {
1720         struct file *file = iocb->ki_filp;
1721
1722 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1723         return file->f_op->read_iter(iocb, iter);
1724 #else
1725         struct iovec iov;
1726         struct iov_iter i;
1727         ssize_t bytes = 0;
1728
1729         iov_for_each(iov, i, *iter) {
1730                 ssize_t res;
1731
1732                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1733                 if (-EIOCBQUEUED == res)
1734                         res = wait_on_sync_kiocb(iocb);
1735                 if (res <= 0) {
1736                         if (bytes == 0)
1737                                 bytes = res;
1738                         break;
1739                 }
1740
1741                 bytes += res;
1742                 if (res < iov.iov_len)
1743                         break;
1744         }
1745
1746         if (bytes > 0)
1747                 iov_iter_advance(iter, bytes);
1748         return bytes;
1749 #endif
1750 }
1751
1752 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1753                            struct iov_iter *iter, bool *cached)
1754 {
1755         struct file *file = iocb->ki_filp;
1756         struct ll_file_data *fd = file->private_data;
1757         struct pcc_file *pccf = &fd->fd_pcc_file;
1758         struct inode *inode = file_inode(file);
1759         ssize_t result;
1760
1761         ENTRY;
1762
1763         if (pccf->pccf_file == NULL) {
1764                 *cached = false;
1765                 RETURN(0);
1766         }
1767
1768         pcc_io_init(inode, PIT_READ, cached);
1769         if (!*cached)
1770                 RETURN(0);
1771
1772         /* Fake I/O error on RO-PCC */
1773         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1774                 GOTO(out, result = -EIO);
1775
1776         iocb->ki_filp = pccf->pccf_file;
1777         /* generic_file_aio_read does not support ext4-dax,
1778          * __pcc_file_read_iter uses ->aio_read hook directly
1779          * to add support for ext4-dax.
1780          */
1781         result = __pcc_file_read_iter(iocb, iter);
1782         iocb->ki_filp = file;
1783 out:
1784         pcc_io_fini(inode, PIT_READ, result, cached);
1785         RETURN(result);
1786 }
1787
1788 static ssize_t
1789 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1790 {
1791         struct file *file = iocb->ki_filp;
1792
1793 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1794         return file->f_op->write_iter(iocb, iter);
1795 #else
1796         struct iovec iov;
1797         struct iov_iter i;
1798         ssize_t bytes = 0;
1799
1800         iov_for_each(iov, i, *iter) {
1801                 ssize_t res;
1802
1803                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1804                 if (-EIOCBQUEUED == res)
1805                         res = wait_on_sync_kiocb(iocb);
1806                 if (res <= 0) {
1807                         if (bytes == 0)
1808                                 bytes = res;
1809                         break;
1810                 }
1811
1812                 bytes += res;
1813                 if (res < iov.iov_len)
1814                         break;
1815         }
1816
1817         if (bytes > 0)
1818                 iov_iter_advance(iter, bytes);
1819         return bytes;
1820 #endif
1821 }
1822
1823 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1824                             struct iov_iter *iter, bool *cached)
1825 {
1826         struct file *file = iocb->ki_filp;
1827         struct ll_file_data *fd = file->private_data;
1828         struct pcc_file *pccf = &fd->fd_pcc_file;
1829         struct inode *inode = file_inode(file);
1830         ssize_t result;
1831
1832         ENTRY;
1833
1834         if (pccf->pccf_file == NULL) {
1835                 *cached = false;
1836                 RETURN(0);
1837         }
1838
1839         if (pccf->pccf_type != LU_PCC_READWRITE) {
1840                 *cached = false;
1841                 RETURN(-EAGAIN);
1842         }
1843
1844         pcc_io_init(inode, PIT_WRITE, cached);
1845         if (!*cached)
1846                 RETURN(0);
1847
1848         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1849                 GOTO(out, result = -ENOSPC);
1850
1851         iocb->ki_filp = pccf->pccf_file;
1852
1853         /* Since __pcc_file_write_iter makes write calls via
1854          * the normal vfs interface to the local PCC file system,
1855          * the inode lock is not needed.
1856          */
1857         result = __pcc_file_write_iter(iocb, iter);
1858         iocb->ki_filp = file;
1859 out:
1860         pcc_io_fini(inode, PIT_WRITE, result, cached);
1861         RETURN(result);
1862 }
1863
1864 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1865                       bool *cached)
1866 {
1867         int rc;
1868         const struct cred *old_cred;
1869         struct iattr attr2 = *attr;
1870         struct dentry *pcc_dentry;
1871         struct pcc_inode *pcci;
1872
1873         ENTRY;
1874
1875         if (!S_ISREG(inode->i_mode)) {
1876                 *cached = false;
1877                 RETURN(0);
1878         }
1879
1880         pcc_io_init(inode, PIT_SETATTR, cached);
1881         if (!*cached)
1882                 RETURN(0);
1883
1884         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1885                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1886                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1887         pcci = ll_i2pcci(inode);
1888         pcc_dentry = pcci->pcci_path.dentry;
1889         inode_lock(pcc_dentry->d_inode);
1890         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1891 #ifdef HAVE_USER_NAMESPACE_ARG
1892         rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
1893                                                 &attr2);
1894 #else
1895         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1896 #endif
1897         revert_creds(old_cred);
1898         inode_unlock(pcc_dentry->d_inode);
1899
1900         pcc_io_fini(inode, PIT_SETATTR, rc, cached);
1901         RETURN(rc);
1902 }
1903
1904 int pcc_inode_getattr(struct inode *inode, u32 request_mask,
1905                       unsigned int flags, bool *cached)
1906 {
1907         struct ll_inode_info *lli = ll_i2info(inode);
1908         const struct cred *old_cred;
1909         struct kstat stat;
1910         s64 atime;
1911         s64 mtime;
1912         s64 ctime;
1913         int rc;
1914
1915         ENTRY;
1916
1917         if (!S_ISREG(inode->i_mode)) {
1918                 *cached = false;
1919                 RETURN(0);
1920         }
1921
1922         pcc_io_init(inode, PIT_GETATTR, cached);
1923         if (!*cached)
1924                 RETURN(0);
1925
1926         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1927         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat, request_mask,
1928                             flags);
1929         revert_creds(old_cred);
1930         if (rc)
1931                 GOTO(out, rc);
1932
1933         ll_inode_size_lock(inode);
1934         if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
1935             inode_get_atime_sec(inode) < lli->lli_atime)
1936                 inode_set_atime(inode, lli->lli_atime, 0);
1937
1938         inode_set_mtime(inode, lli->lli_mtime, 0);
1939         inode_set_ctime(inode, lli->lli_ctime, 0);
1940
1941         atime = inode_get_atime_sec(inode);
1942         mtime = inode_get_mtime_sec(inode);
1943         ctime = inode_get_ctime_sec(inode);
1944
1945         if (atime < stat.atime.tv_sec)
1946                 atime = stat.atime.tv_sec;
1947
1948         if (ctime < stat.ctime.tv_sec)
1949                 ctime = stat.ctime.tv_sec;
1950
1951         if (mtime < stat.mtime.tv_sec)
1952                 mtime = stat.mtime.tv_sec;
1953
1954         i_size_write(inode, stat.size);
1955         inode->i_blocks = stat.blocks;
1956
1957         inode_set_atime(inode, atime, 0);
1958         inode_set_mtime(inode, mtime, 0);
1959         inode_set_ctime(inode, ctime, 0);
1960
1961         ll_inode_size_unlock(inode);
1962 out:
1963         pcc_io_fini(inode, PIT_GETATTR, rc, cached);
1964         RETURN(rc);
1965 }
1966
1967 #ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
1968 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1969                              struct pipe_inode_info *pipe,
1970                              size_t count, unsigned int flags)
1971 {
1972         struct inode *inode = file_inode(in_file);
1973         struct ll_file_data *fd = in_file->private_data;
1974         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1975         bool cached = false;
1976         ssize_t result;
1977
1978         ENTRY;
1979
1980         if (!pcc_file)
1981                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1982                                                 count, flags));
1983
1984         pcc_io_init(inode, PIT_SPLICE_READ, &cached);
1985         if (!cached)
1986                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1987                                                 count, flags));
1988
1989         result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
1990
1991         pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
1992         RETURN(result);
1993 }
1994 #endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
1995
1996 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1997               int datasync, bool *cached)
1998 {
1999         struct inode *inode = file_inode(file);
2000         struct ll_file_data *fd = file->private_data;
2001         struct pcc_file *pccf = &fd->fd_pcc_file;
2002         struct file *pcc_file = pccf->pccf_file;
2003         int rc;
2004
2005         ENTRY;
2006
2007         if (!pcc_file) {
2008                 *cached = false;
2009                 RETURN(0);
2010         }
2011
2012         if (!S_ISREG(inode->i_mode)) {
2013                 *cached = false;
2014                 RETURN(0);
2015         }
2016
2017         /*
2018          * After the file is attached into RO-PCC, its dirty pages on this
2019          * client may not be flushed. So fsync() should fall back to normal
2020          * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
2021          * copy is meaningless.
2022          */
2023         if (pccf->pccf_type == LU_PCC_READONLY) {
2024                 *cached = false;
2025                 RETURN(-EAGAIN);
2026         }
2027
2028         pcc_io_init(inode, PIT_FSYNC, cached);
2029         if (!*cached)
2030                 RETURN(0);
2031
2032         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
2033                                                 start, end, datasync);
2034
2035         pcc_io_fini(inode, PIT_FSYNC, rc, cached);
2036         RETURN(rc);
2037 }
2038
2039 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
2040                   bool *cached)
2041 {
2042         struct inode *inode = file_inode(file);
2043         struct ll_file_data *fd = file->private_data;
2044         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2045         struct pcc_inode *pcci;
2046         int rc = 0;
2047
2048         ENTRY;
2049
2050         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
2051                 *cached = false;
2052                 RETURN(0);
2053         }
2054
2055         pcc_inode_lock(inode);
2056         pcci = ll_i2pcci(inode);
2057         if (pcci && pcc_inode_has_layout(pcci)) {
2058                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
2059                 *cached = true;
2060                 vma->vm_file = pcc_file;
2061                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
2062                 vma->vm_file = file;
2063                 /* Save the vm ops of backend PCC */
2064                 vma->vm_private_data = (void *)vma->vm_ops;
2065         } else {
2066                 *cached = false;
2067         }
2068         pcc_inode_unlock(inode);
2069
2070         RETURN(rc);
2071 }
2072
2073 void pcc_vm_open(struct vm_area_struct *vma)
2074 {
2075         struct pcc_inode *pcci;
2076         struct file *file = vma->vm_file;
2077         struct inode *inode = file_inode(file);
2078         struct ll_file_data *fd = file->private_data;
2079         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2080         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2081
2082         ENTRY;
2083
2084         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
2085                 RETURN_EXIT;
2086
2087         pcc_inode_lock(inode);
2088         pcci = ll_i2pcci(inode);
2089         if (pcci && pcc_inode_has_layout(pcci)) {
2090                 vma->vm_file = pcc_file;
2091                 pcc_vm_ops->open(vma);
2092                 vma->vm_file = file;
2093         }
2094         pcc_inode_unlock(inode);
2095         EXIT;
2096 }
2097
2098 void pcc_vm_close(struct vm_area_struct *vma)
2099 {
2100         struct file *file = vma->vm_file;
2101         struct inode *inode = file_inode(file);
2102         struct ll_file_data *fd = file->private_data;
2103         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2104         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2105
2106         ENTRY;
2107
2108         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
2109                 RETURN_EXIT;
2110
2111         pcc_inode_lock(inode);
2112         /* Layout lock maybe revoked here */
2113         vma->vm_file = pcc_file;
2114         pcc_vm_ops->close(vma);
2115         vma->vm_file = file;
2116         pcc_inode_unlock(inode);
2117         EXIT;
2118 }
2119
2120 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
2121                      bool *cached)
2122 {
2123         struct page *page = vmf->page;
2124         struct mm_struct *mm = vma->vm_mm;
2125         struct file *file = vma->vm_file;
2126         struct inode *inode = file_inode(file);
2127         struct ll_file_data *fd = file->private_data;
2128         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2129         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2130         int rc;
2131
2132         ENTRY;
2133
2134         if (!pcc_file || !pcc_vm_ops) {
2135                 *cached = false;
2136                 RETURN(0);
2137         }
2138
2139         if (!pcc_vm_ops->page_mkwrite &&
2140             page->mapping == pcc_file->f_mapping) {
2141                 CDEBUG(D_MMAP,
2142                        "%s: PCC backend fs not support ->page_mkwrite()\n",
2143                        ll_i2sbi(inode)->ll_fsname);
2144                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2145                 mmap_read_unlock(mm);
2146                 *cached = true;
2147                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2148         }
2149         /* Pause to allow for a race with concurrent detach */
2150         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
2151
2152         pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
2153         if (!*cached) {
2154                 /* This happens when the file is detached from PCC after got
2155                  * the fault page via ->fault() on the inode of the PCC copy.
2156                  * Here it can not simply fall back to normal Lustre I/O path.
2157                  * The reason is that the address space of fault page used by
2158                  * ->page_mkwrite() is still the one of PCC inode. In the
2159                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2160                  * handled as the address space of the fault page is not
2161                  * consistent with the one of the Lustre inode (though the
2162                  * fault page was truncated).
2163                  * As the file is detached from PCC, the fault page must
2164                  * be released frist, and retry the mmap write (->fault() and
2165                  * ->page_mkwrite).
2166                  * We use an ugly and tricky method by returning
2167                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2168                  * __do_page_fault and retry the memory fault handling.
2169                  */
2170                 if (page->mapping == pcc_file->f_mapping) {
2171                         pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2172                         *cached = true;
2173                         mmap_read_unlock(mm);
2174                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2175                 }
2176
2177                 RETURN(0);
2178         }
2179
2180         /*
2181          * This fault injection can also be used to simulate -ENOSPC and
2182          * -EDQUOT failure of underlying PCC backend fs.
2183          */
2184         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
2185                 GOTO(out, rc = VM_FAULT_SIGBUS);
2186
2187         vma->vm_file = pcc_file;
2188 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2189         rc = pcc_vm_ops->page_mkwrite(vmf);
2190 #else
2191         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2192 #endif
2193         vma->vm_file = file;
2194
2195 out:
2196         pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
2197
2198         /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
2199          * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
2200          * Lustre I/O path.
2201          */
2202         if (rc & VM_FAULT_SIGBUS) {
2203                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2204                 mmap_read_unlock(mm);
2205                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2206         }
2207         RETURN(rc);
2208 }
2209
2210 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2211               bool *cached)
2212 {
2213         struct file *file = vma->vm_file;
2214         struct inode *inode = file_inode(file);
2215         struct ll_file_data *fd = file->private_data;
2216         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2217         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2218         int rc;
2219
2220         ENTRY;
2221
2222         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2223                 *cached = false;
2224                 RETURN(0);
2225         }
2226
2227         if (!S_ISREG(inode->i_mode)) {
2228                 *cached = false;
2229                 RETURN(0);
2230         }
2231
2232         pcc_io_init(inode, PIT_FAULT, cached);
2233         if (!*cached)
2234                 RETURN(0);
2235
2236         /* Tolerate the mmap read failure for RO-PCC */
2237         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
2238                 GOTO(out, rc = VM_FAULT_SIGBUS);
2239
2240         vma->vm_file = pcc_file;
2241 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2242         rc = pcc_vm_ops->fault(vmf);
2243 #else
2244         rc = pcc_vm_ops->fault(vma, vmf);
2245 #endif
2246         vma->vm_file = file;
2247 out:
2248         pcc_io_fini(inode, PIT_FAULT, rc, cached);
2249         RETURN(rc);
2250 }
2251
2252 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2253 {
2254         pcci->pcci_type = LU_PCC_NONE;
2255         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2256         if (atomic_read(&pcci->pcci_active_ios) == 0)
2257                 return;
2258
2259         CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2260                        atomic_read(&pcci->pcci_active_ios));
2261         wait_event_idle(pcci->pcci_waitq,
2262                         atomic_read(&pcci->pcci_active_ios) == 0);
2263 }
2264
2265 void pcc_layout_invalidate(struct inode *inode)
2266 {
2267         struct pcc_inode *pcci;
2268
2269         ENTRY;
2270
2271         pcc_inode_lock(inode);
2272         pcci = ll_i2pcci(inode);
2273         if (pcci && pcc_inode_has_layout(pcci)) {
2274                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2275                 __pcc_layout_invalidate(pcci);
2276
2277                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2278                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2279
2280                 pcc_inode_put(pcci);
2281         }
2282         pcc_inode_unlock(inode);
2283
2284         EXIT;
2285 }
2286
2287 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2288 {
2289         struct dentry *parent = dget_parent(pcc_dentry);
2290         int rc;
2291
2292         rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry);
2293         if (rc)
2294                 CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
2295                       ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
2296
2297         dput(parent);
2298         return rc;
2299 }
2300
2301 /* Create directory under base if directory does not exist */
2302 static struct dentry *
2303 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2304 {
2305         int rc;
2306         struct dentry *dentry;
2307         struct inode *dir = base->d_inode;
2308
2309         inode_lock(dir);
2310         dentry = lookup_one_len(name, base, strlen(name));
2311         if (IS_ERR(dentry))
2312                 goto out;
2313
2314         if (d_is_positive(dentry))
2315                 goto out;
2316
2317         rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
2318         if (rc) {
2319                 dput(dentry);
2320                 dentry = ERR_PTR(rc);
2321                 goto out;
2322         }
2323 out:
2324         inode_unlock(dir);
2325         return dentry;
2326 }
2327
2328 static struct dentry *
2329 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2330 {
2331         char *ptr, *entry_name;
2332         struct dentry *parent;
2333         struct dentry *child = ERR_PTR(-EINVAL);
2334
2335         ptr = path;
2336         while (*ptr == '/')
2337                 ptr++;
2338
2339         entry_name = ptr;
2340         parent = dget(root);
2341         while ((ptr = strchr(ptr, '/')) != NULL) {
2342                 *ptr = '\0';
2343                 child = pcc_mkdir(parent, entry_name, mode);
2344                 *ptr = '/';
2345                 dput(parent);
2346                 if (IS_ERR(child))
2347                         break;
2348
2349                 parent = child;
2350                 ptr++;
2351                 entry_name = ptr;
2352         }
2353
2354         return child;
2355 }
2356
2357 /* Create file under base. If file already exist, return failure */
2358 static struct dentry *
2359 pcc_create(struct dentry *base, const char *name, umode_t mode)
2360 {
2361         int rc;
2362         struct dentry *dentry;
2363         struct inode *dir = base->d_inode;
2364
2365         inode_lock(dir);
2366         dentry = lookup_one_len(name, base, strlen(name));
2367         if (IS_ERR(dentry))
2368                 goto out;
2369
2370         if (d_is_positive(dentry))
2371                 goto out;
2372
2373         rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
2374         if (rc) {
2375                 dput(dentry);
2376                 dentry = ERR_PTR(rc);
2377                 goto out;
2378         }
2379 out:
2380         inode_unlock(dir);
2381         return dentry;
2382 }
2383
2384 static int __pcc_inode_create(struct pcc_dataset *dataset,
2385                               struct lu_fid *fid,
2386                               struct dentry **dentry)
2387 {
2388         char *path;
2389         struct dentry *base;
2390         struct dentry *child;
2391         int rc = 0;
2392
2393         OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
2394         if (path == NULL)
2395                 return -ENOMEM;
2396
2397         pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
2398
2399         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2400         if (IS_ERR(base)) {
2401                 rc = PTR_ERR(base);
2402                 GOTO(out, rc);
2403         }
2404
2405         snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
2406         child = pcc_create(base, path, 0);
2407         if (IS_ERR(child)) {
2408                 rc = PTR_ERR(child);
2409                 GOTO(out_base, rc);
2410         }
2411         *dentry = child;
2412
2413 out_base:
2414         dput(base);
2415 out:
2416         OBD_FREE(path, PCC_DATASET_MAX_PATH);
2417         return rc;
2418 }
2419
2420 /*
2421  * Reset uid, gid or size for the PCC copy masked by @valid.
2422  * TODO: Set the project ID for PCC copy.
2423  */
2424 static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2425                                  kuid_t uid, kgid_t gid, loff_t size)
2426 {
2427         struct inode *inode = dentry->d_inode;
2428         struct iattr attr;
2429         int rc;
2430
2431         ENTRY;
2432
2433         attr.ia_valid = valid;
2434         attr.ia_uid = uid;
2435         attr.ia_gid = gid;
2436         attr.ia_size = size;
2437
2438         inode_lock(inode);
2439         rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
2440         inode_unlock(inode);
2441
2442         RETURN(rc);
2443 }
2444
2445 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2446                      struct lu_fid *fid, struct dentry **pcc_dentry)
2447 {
2448         const struct cred *old_cred;
2449         int rc;
2450
2451         old_cred = override_creds(pcc_super_cred(sb));
2452         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2453         revert_creds(old_cred);
2454         return rc;
2455 }
2456
2457 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2458 {
2459         struct dentry *pcc_dentry = pca->pca_dentry;
2460         const struct cred *old_cred;
2461         struct pcc_super *super;
2462         struct pcc_inode *pcci;
2463         int rc;
2464
2465         ENTRY;
2466
2467         if (!pca->pca_dataset)
2468                 RETURN(0);
2469
2470         if (!inode)
2471                 GOTO(out_dataset_put, rc = 0);
2472
2473         super = ll_i2pccs(inode);
2474
2475         LASSERT(pcc_dentry);
2476
2477         old_cred = override_creds(super->pccs_cred);
2478         pcc_inode_lock(inode);
2479         LASSERT(ll_i2pcci(inode) == NULL);
2480         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2481         if (pcci == NULL)
2482                 GOTO(out_put, rc = -ENOMEM);
2483
2484         rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2485                                    old_cred->suid, old_cred->sgid, 0);
2486         if (rc)
2487                 GOTO(out_put, rc);
2488
2489         pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2490                              pcci, pcc_dentry, LU_PCC_READWRITE);
2491
2492         rc = pcc_layout_xattr_set(pcci, 0);
2493         if (rc) {
2494                 if (!pcci->pcci_unlinked)
2495                         (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2496                 pcc_inode_put(pcci);
2497                 GOTO(out_unlock, rc);
2498         }
2499
2500         /* Set the layout generation of newly created file with 0 */
2501         pcc_layout_gen_set(pcci, 0);
2502
2503 out_put:
2504         if (rc) {
2505                 (void) pcc_inode_remove(inode, pcc_dentry);
2506                 dput(pcc_dentry);
2507
2508                 if (pcci)
2509                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2510         }
2511 out_unlock:
2512         pcc_inode_unlock(inode);
2513         revert_creds(old_cred);
2514 out_dataset_put:
2515         pcc_dataset_put(pca->pca_dataset);
2516         RETURN(rc);
2517 }
2518
2519 void pcc_create_attach_cleanup(struct super_block *sb,
2520                                struct pcc_create_attach *pca)
2521 {
2522         if (!pca->pca_dataset)
2523                 return;
2524
2525         if (pca->pca_dentry) {
2526                 struct dentry *parent;
2527                 struct inode *i_dir;
2528                 const struct cred *old_cred;
2529                 int rc;
2530
2531                 old_cred = override_creds(pcc_super_cred(sb));
2532                 parent = dget_parent(pca->pca_dentry);
2533                 i_dir = d_inode(parent);
2534                 rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry);
2535                 dput(parent);
2536                 if (rc)
2537                         CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
2538                               ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
2539                 /* ignore the unlink failure */
2540                 revert_creds(old_cred);
2541                 dput(pca->pca_dentry);
2542         }
2543
2544         pcc_dataset_put(pca->pca_dataset);
2545 }
2546
2547 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2548                           loff_t *offset)
2549 {
2550         while (count > 0) {
2551                 ssize_t size;
2552
2553                 size = cfs_kernel_write(filp, buf, count, offset);
2554                 if (size < 0)
2555                         return size;
2556                 count -= size;
2557                 buf += size;
2558         }
2559         return 0;
2560 }
2561
2562 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2563 {
2564         ssize_t rc = 0;
2565         ssize_t rc2;
2566         loff_t pos, offset = 0;
2567         size_t buf_len = 1048576;
2568         void *buf;
2569
2570         ENTRY;
2571
2572 #ifdef FMODE_CAN_READ
2573         /* Need to add FMODE_CAN_READ flags here, otherwise the check in
2574          * kernel_read() during open() for auto PCC-RO attach will fail.
2575          */
2576         if ((src->f_mode & FMODE_READ) &&
2577             likely(src->f_op->read || src->f_op->read_iter))
2578                 src->f_mode |= FMODE_CAN_READ;
2579 #endif
2580
2581         OBD_ALLOC_LARGE(buf, buf_len);
2582         if (buf == NULL)
2583                 RETURN(-ENOMEM);
2584
2585         while (1) {
2586                 if (signal_pending(current))
2587                         GOTO(out_free, rc = -EINTR);
2588
2589                 pos = offset;
2590                 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2591                 if (rc2 < 0)
2592                         GOTO(out_free, rc = rc2);
2593                 else if (rc2 == 0)
2594                         break;
2595
2596                 pos = offset;
2597                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2598                 if (rc < 0)
2599                         GOTO(out_free, rc);
2600                 offset += rc2;
2601         }
2602
2603         rc = offset;
2604 out_free:
2605         OBD_FREE_LARGE(buf, buf_len);
2606         RETURN(rc);
2607 }
2608
2609 static int pcc_attach_allowed_check(struct inode *inode)
2610 {
2611         struct ll_inode_info *lli = ll_i2info(inode);
2612         struct pcc_inode *pcci;
2613         int rc = 0;
2614
2615         ENTRY;
2616
2617         pcc_inode_lock(inode);
2618         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2619                 GOTO(out_unlock, rc = -EBUSY);
2620
2621         pcci = ll_i2pcci(inode);
2622         if (pcci && pcc_inode_has_layout(pcci))
2623                 GOTO(out_unlock, rc = -EEXIST);
2624
2625         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2626 out_unlock:
2627         pcc_inode_unlock(inode);
2628         RETURN(rc);
2629 }
2630
2631 static int pcc_attach_data_archive(struct file *file, struct inode *inode,
2632                                    struct pcc_dataset *dataset,
2633                                    struct dentry **dentry)
2634 {
2635         const struct cred *old_cred;
2636         struct file *pcc_filp;
2637         bool direct = false;
2638         struct path path;
2639         ssize_t ret;
2640         int rc;
2641
2642         ENTRY;
2643
2644         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2645         rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
2646         if (rc)
2647                 GOTO(out_cred, rc);
2648
2649         path.mnt = dataset->pccd_path.mnt;
2650         path.dentry = *dentry;
2651         pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2652         if (IS_ERR_OR_NULL(pcc_filp)) {
2653                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2654                 GOTO(out_dentry, rc);
2655         }
2656
2657         rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
2658                                    old_cred->uid, old_cred->gid, 0);
2659         if (rc)
2660                 GOTO(out_fput, rc);
2661
2662         /*
2663          * When attach a file at file open() time with direct I/O mode, the
2664          * data copy from Lustre OSTs to PCC copy in kernel will report
2665          * -EFAULT error as the buffer is allocated in the kernel space, not
2666          * from the user space.
2667          * Thus it needs to unmask O_DIRECT flag from the file handle during
2668          * data copy. After finished data copying, restore the flag in the
2669          * file handle.
2670          */
2671         if (file->f_flags & O_DIRECT) {
2672                 file->f_flags &= ~O_DIRECT;
2673                 direct = true;
2674         }
2675
2676         ret = pcc_copy_data(file, pcc_filp);
2677         if (direct)
2678                 file->f_flags |= O_DIRECT;
2679         if (ret < 0)
2680                 GOTO(out_fput, rc = ret);
2681
2682         /*
2683          * It must to truncate the PCC copy to the same size of the Lustre
2684          * copy after copy data. Otherwise, it may get wrong file size after
2685          * re-attach a file. See LU-13023 for details.
2686          */
2687         rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
2688                                    KGIDT_INIT(0), ret);
2689 out_fput:
2690         fput(pcc_filp);
2691 out_dentry:
2692         if (rc) {
2693                 pcc_inode_remove(inode, *dentry);
2694                 dput(*dentry);
2695         }
2696 out_cred:
2697         revert_creds(old_cred);
2698         RETURN(rc);
2699 }
2700
2701 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2702                          __u32 archive_id)
2703 {
2704         struct pcc_dataset *dataset;
2705         struct ll_inode_info *lli = ll_i2info(inode);
2706         struct pcc_super *super = ll_i2pccs(inode);
2707         struct pcc_inode *pcci;
2708         struct dentry *dentry;
2709         int rc;
2710
2711         ENTRY;
2712
2713         rc = pcc_attach_allowed_check(inode);
2714         if (rc)
2715                 RETURN(rc);
2716
2717         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2718                                   LU_PCC_READWRITE, archive_id);
2719         if (dataset == NULL)
2720                 RETURN(-ENOENT);
2721
2722         rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2723         if (rc)
2724                 GOTO(out_dataset_put, rc);
2725
2726         /* Pause to allow for a race with concurrent HSM remove */
2727         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2728
2729         pcc_inode_lock(inode);
2730         pcci = ll_i2pcci(inode);
2731         LASSERT(!pcci);
2732         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2733         if (pcci == NULL)
2734                 GOTO(out_unlock, rc = -ENOMEM);
2735
2736         pcc_inode_attach_set(super, dataset, lli, pcci,
2737                              dentry, LU_PCC_READWRITE);
2738 out_unlock:
2739         pcc_inode_unlock(inode);
2740         if (rc) {
2741                 const struct cred *old_cred;
2742
2743                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2744                 (void) pcc_inode_remove(inode, dentry);
2745                 revert_creds(old_cred);
2746                 dput(dentry);
2747         }
2748 out_dataset_put:
2749         pcc_dataset_put(dataset);
2750         RETURN(rc);
2751 }
2752
2753 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2754                               __u32 gen, bool lease_broken, int rc,
2755                               bool attached)
2756 {
2757         struct ll_inode_info *lli = ll_i2info(inode);
2758         const struct cred *old_cred;
2759         struct pcc_inode *pcci;
2760         __u32 gen2;
2761
2762         ENTRY;
2763
2764         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2765         pcc_inode_lock(inode);
2766         pcci = ll_i2pcci(inode);
2767         if (rc || lease_broken) {
2768                 if (attached && pcci)
2769                         pcc_inode_put(pcci);
2770
2771                 GOTO(out_unlock, rc);
2772         }
2773
2774         /* PCC inode may be released due to layout lock revocatioin */
2775         if (!pcci)
2776                 GOTO(out_unlock, rc = -ESTALE);
2777
2778         LASSERT(attached);
2779         rc = pcc_layout_xattr_set(pcci, gen);
2780         if (rc)
2781                 GOTO(out_put, rc);
2782
2783         LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2784         rc = ll_layout_refresh(inode, &gen2);
2785         if (!rc) {
2786                 if (gen2 == gen) {
2787                         pcc_layout_gen_set(pcci, gen);
2788                 } else {
2789                         CDEBUG(D_CACHE,
2790                                DFID" layout changed from %d to %d.\n",
2791                                PFID(ll_inode2fid(inode)), gen, gen2);
2792                         GOTO(out_put, rc = -ESTALE);
2793                 }
2794         }
2795
2796 out_put:
2797         if (rc) {
2798                 if (!pcci->pcci_unlinked)
2799                         (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2800                 pcc_inode_put(pcci);
2801         }
2802 out_unlock:
2803         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2804         pcc_inode_unlock(inode);
2805         revert_creds(old_cred);
2806         RETURN(rc);
2807 }
2808
2809 static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
2810
2811 {
2812         struct ll_inode_info *lli = ll_i2info(inode);
2813         struct lu_extent ext = {
2814                 .e_start = 0,
2815                 .e_end = OBD_OBJECT_EOF,
2816         };
2817         struct cl_layout clt = {
2818                 .cl_layout_gen = 0,
2819                 .cl_is_released = false,
2820                 .cl_is_rdonly = false,
2821         };
2822         int retries = 0;
2823         int rc;
2824
2825         ENTRY;
2826
2827 repeat:
2828         rc = pcc_get_layout_info(inode, &clt);
2829         if (rc)
2830                 RETURN(rc);
2831
2832         /*
2833          * For the HSM released file, restore the data first.
2834          */
2835         if (clt.cl_is_released) {
2836                 retries++;
2837                 if (retries > 2)
2838                         RETURN(-EBUSY);
2839
2840                 if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
2841                         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2842                         if (rc) {
2843                                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2844                                        PFID(&lli->lli_fid), rc);
2845                                 RETURN(rc);
2846                         }
2847                 }
2848                 rc = ll_layout_refresh(inode, gen);
2849                 if (rc)
2850                         RETURN(rc);
2851
2852                 goto repeat;
2853         }
2854
2855
2856         if (!clt.cl_is_rdonly) {
2857                 rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
2858                                             &ext);
2859                 if (rc)
2860                         RETURN(rc);
2861
2862                 rc = ll_layout_refresh(inode, gen);
2863                 if (rc)
2864                         RETURN(rc);
2865         } else { /* Readonly layout */
2866                 *gen = clt.cl_layout_gen;
2867         }
2868
2869         RETURN(rc);
2870 }
2871
2872 static void pcc_readonly_attach_fini(struct inode *inode)
2873 {
2874         pcc_inode_lock(inode);
2875         ll_i2info(inode)->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2876         pcc_inode_unlock(inode);
2877 }
2878
2879 static int pcc_readonly_ioctl_attach(struct file *file,
2880                                      struct inode *inode,
2881                                      __u32 roid)
2882 {
2883         struct ll_sb_info *sbi = ll_i2sbi(inode);
2884         struct pcc_super *super = ll_i2pccs(inode);
2885         struct ll_inode_info *lli = ll_i2info(inode);
2886         const struct cred *old_cred;
2887         struct pcc_dataset *dataset;
2888         struct pcc_inode *pcci;
2889         struct dentry *dentry;
2890         bool attached = false;
2891         bool unlinked = false;
2892         __u32 gen;
2893         int rc;
2894
2895         ENTRY;
2896
2897         if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
2898                 RETURN(-EOPNOTSUPP);
2899
2900         rc = pcc_attach_allowed_check(inode);
2901         if (rc)
2902                 RETURN(rc);
2903
2904         rc = pcc_layout_rdonly_set(inode, &gen);
2905         if (rc)
2906                 GOTO(out_fini, rc);
2907
2908         dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
2909                                   LU_PCC_READONLY, roid);
2910         if (dataset == NULL)
2911                 GOTO(out_fini, rc = -ENOENT);
2912
2913         rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2914         if (rc)
2915                 GOTO(out_dataset_put, rc);
2916
2917         mutex_lock(&lli->lli_layout_mutex);
2918         pcc_inode_lock(inode);
2919         old_cred = override_creds(super->pccs_cred);
2920         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2921         if (gen != ll_layout_version_get(lli))
2922                 GOTO(out_put_unlock, rc = -ESTALE);
2923
2924         pcci = ll_i2pcci(inode);
2925         if (!pcci) {
2926                 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2927                 if (pcci == NULL)
2928                         GOTO(out_put_unlock, rc = -ENOMEM);
2929
2930                 pcc_inode_attach_set(super, dataset, lli, pcci,
2931                                      dentry, LU_PCC_READONLY);
2932         } else {
2933                 atomic_inc(&pcci->pcci_refcount);
2934                 path_put(&pcci->pcci_path);
2935                 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
2936                 pcci->pcci_path.dentry = dentry;
2937                 pcci->pcci_type = LU_PCC_READONLY;
2938         }
2939         attached = true;
2940         rc = pcc_layout_xattr_set(pcci, gen);
2941         if (rc) {
2942                 pcci->pcci_type = LU_PCC_NONE;
2943                 unlinked = pcci->pcci_unlinked;
2944                 GOTO(out_put_unlock, rc);
2945         }
2946
2947         pcc_layout_gen_set(pcci, gen);
2948 out_put_unlock:
2949         if (rc) {
2950                 if (!unlinked)
2951                         (void) pcc_inode_remove(inode, dentry);
2952                 if (attached)
2953                         pcc_inode_put(pcci);
2954                 else
2955                         dput(dentry);
2956         }
2957         revert_creds(old_cred);
2958         pcc_inode_unlock(inode);
2959         mutex_unlock(&lli->lli_layout_mutex);
2960 out_dataset_put:
2961         pcc_dataset_put(dataset);
2962 out_fini:
2963         pcc_readonly_attach_fini(inode);
2964
2965         RETURN(rc);
2966 }
2967
2968 int pcc_ioctl_attach(struct file *file, struct inode *inode,
2969                      struct lu_pcc_attach *attach)
2970 {
2971         int rc = 0;
2972
2973         ENTRY;
2974
2975         switch (attach->pcca_type) {
2976         case LU_PCC_READWRITE:
2977                 rc = -EOPNOTSUPP;
2978                 break;
2979         case LU_PCC_READONLY:
2980                 rc = pcc_readonly_ioctl_attach(file, inode,
2981                                                attach->pcca_id);
2982                 break;
2983         default:
2984                 rc = -EINVAL;
2985                 break;
2986         }
2987
2988         RETURN(rc);
2989 }
2990
2991 static int pcc_hsm_remove(struct inode *inode)
2992 {
2993         struct hsm_user_request *hur;
2994         __u32 gen;
2995         int len;
2996         int rc;
2997
2998         ENTRY;
2999
3000         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
3001         if (rc) {
3002                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
3003                        PFID(&ll_i2info(inode)->lli_fid), rc);
3004                 RETURN(rc);
3005         }
3006
3007         ll_layout_refresh(inode, &gen);
3008
3009         len = sizeof(struct hsm_user_request) +
3010               sizeof(struct hsm_user_item);
3011         OBD_ALLOC(hur, len);
3012         if (hur == NULL)
3013                 RETURN(-ENOMEM);
3014
3015         hur->hur_request.hr_action = HUA_REMOVE;
3016         hur->hur_request.hr_archive_id = 0;
3017         hur->hur_request.hr_flags = 0;
3018         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
3019                sizeof(hur->hur_user_item[0].hui_fid));
3020         hur->hur_user_item[0].hui_extent.offset = 0;
3021         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
3022         hur->hur_request.hr_itemcount = 1;
3023         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
3024                            len, hur, NULL);
3025         if (rc)
3026                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
3027                        PFID(&ll_i2info(inode)->lli_fid), rc);
3028
3029         OBD_FREE(hur, len);
3030         RETURN(rc);
3031 }
3032
3033 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
3034 {
3035         struct ll_inode_info *lli = ll_i2info(inode);
3036         struct pcc_inode *pcci;
3037         const struct cred *old_cred;
3038         bool hsm_remove = false;
3039         int rc = 0;
3040
3041         ENTRY;
3042
3043         pcc_inode_lock(inode);
3044         pcci = lli->lli_pcc_inode;
3045         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
3046             !pcc_inode_has_layout(pcci))
3047                 GOTO(out_unlock, rc = 0);
3048
3049         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
3050
3051         if (pcci->pcci_type == LU_PCC_READWRITE) {
3052                 if (opt == PCC_DETACH_OPT_UNCACHE) {
3053                         hsm_remove = true;
3054                         /*
3055                          * The file will be removed from PCC, set the flags
3056                          * with PCC_DATASET_NONE even the later removal of the
3057                          * PCC copy fails.
3058                          */
3059                         lli->lli_pcc_dsflags = PCC_DATASET_NONE;
3060                 }
3061
3062                 __pcc_layout_invalidate(pcci);
3063                 pcc_inode_put(pcci);
3064         } else if (pcci->pcci_type == LU_PCC_READONLY) {
3065                 __pcc_layout_invalidate(pcci);
3066
3067                 if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
3068                         old_cred =  override_creds(pcc_super_cred(inode->i_sb));
3069                         rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
3070                         revert_creds(old_cred);
3071                         if (!rc)
3072                                 pcci->pcci_unlinked = true;
3073                 }
3074
3075                 pcc_inode_put(pcci);
3076         } else {
3077                 rc = -EOPNOTSUPP;
3078         }
3079
3080 out_unlock:
3081         pcc_inode_unlock(inode);
3082         if (hsm_remove) {
3083                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
3084                 rc = pcc_hsm_remove(inode);
3085                 revert_creds(old_cred);
3086         }
3087
3088         RETURN(rc);
3089 }
3090
3091 int pcc_ioctl_state(struct file *file, struct inode *inode,
3092                     struct lu_pcc_state *state)
3093 {
3094         int rc = 0;
3095         int count;
3096         char *buf;
3097         char *path;
3098         int buf_len = sizeof(state->pccs_path);
3099         struct ll_file_data *fd = file->private_data;
3100         struct pcc_file *pccf = &fd->fd_pcc_file;
3101         struct pcc_inode *pcci;
3102
3103         ENTRY;
3104
3105         if (buf_len <= 0)
3106                 RETURN(-EINVAL);
3107
3108         OBD_ALLOC(buf, buf_len);
3109         if (buf == NULL)
3110                 RETURN(-ENOMEM);
3111
3112         pcc_inode_lock(inode);
3113         pcci = ll_i2pcci(inode);
3114         if (pcci == NULL) {
3115                 state->pccs_type = LU_PCC_NONE;
3116                 GOTO(out_unlock, rc = 0);
3117         }
3118
3119         count = atomic_read(&pcci->pcci_refcount);
3120         if (count == 0) {
3121                 state->pccs_type = LU_PCC_NONE;
3122                 state->pccs_open_count = 0;
3123                 GOTO(out_unlock, rc = 0);
3124         }
3125
3126         if (pcc_inode_has_layout(pcci))
3127                 count--;
3128         if (pccf->pccf_file != NULL)
3129                 count--;
3130         state->pccs_type = pcci->pcci_type;
3131         state->pccs_open_count = count;
3132         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
3133         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
3134         if (IS_ERR(path))
3135                 GOTO(out_unlock, rc = PTR_ERR(path));
3136
3137         if (strscpy(state->pccs_path, path, buf_len) < 0)
3138                 GOTO(out_unlock, rc = -ENAMETOOLONG);
3139
3140 out_unlock:
3141         pcc_inode_unlock(inode);
3142         OBD_FREE(buf, buf_len);
3143         RETURN(rc);
3144 }