Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127         super->pccs_generation = 1;
128
129         return 0;
130 }
131
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
134 {
135         struct pcc_match_id *id, *n;
136
137         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138                 list_del_init(&id->pmi_linkage);
139                 OBD_FREE_PTR(id);
140         }
141 }
142
143 static void pcc_fname_list_free(struct list_head *fname_list)
144 {
145         struct pcc_match_fname *fname, *n;
146
147         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149                 list_del_init(&fname->pmf_linkage);
150                 OBD_FREE_PTR(fname);
151         }
152 }
153
154 static void pcc_expression_free(struct pcc_expression *expr)
155 {
156         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157                 expr->pe_field < PCC_FIELD_MAX);
158         switch (expr->pe_field) {
159         case PCC_FIELD_UID:
160         case PCC_FIELD_GID:
161         case PCC_FIELD_PROJID:
162                 pcc_id_list_free(&expr->pe_cond);
163                 break;
164         case PCC_FIELD_FNAME:
165                 pcc_fname_list_free(&expr->pe_cond);
166                 break;
167         default:
168                 LBUG();
169         }
170         OBD_FREE_PTR(expr);
171 }
172
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
174 {
175         struct pcc_expression *expression, *n;
176
177         LASSERT(list_empty(&conjunction->pc_linkage));
178         list_for_each_entry_safe(expression, n,
179                                  &conjunction->pc_expressions,
180                                  pe_linkage) {
181                 list_del_init(&expression->pe_linkage);
182                 pcc_expression_free(expression);
183         }
184         OBD_FREE_PTR(conjunction);
185 }
186
187 static void pcc_rule_conds_free(struct list_head *cond_list)
188 {
189         struct pcc_conjunction *conjunction, *n;
190
191         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192                 list_del_init(&conjunction->pc_linkage);
193                 pcc_conjunction_free(conjunction);
194         }
195 }
196
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
198 {
199         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202                 if (cmd->u.pccc_add.pccc_conds_str)
203                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
205         }
206 }
207
208 #define PCC_DISJUNCTION_DELIM   (",")
209 #define PCC_CONJUNCTION_DELIM   ("&")
210 #define PCC_EXPRESSION_DELIM    ("=")
211
212 static int
213 pcc_fname_list_add(char *id, struct list_head *fname_list)
214 {
215         struct pcc_match_fname *fname;
216
217         OBD_ALLOC_PTR(fname);
218         if (fname == NULL)
219                 return -ENOMEM;
220
221         OBD_ALLOC(fname->pmf_name, strlen(id) + 1);
222         if (fname->pmf_name == NULL) {
223                 OBD_FREE_PTR(fname);
224                 return -ENOMEM;
225         }
226
227         strcpy(fname->pmf_name, id);
228         list_add_tail(&fname->pmf_linkage, fname_list);
229         return 0;
230 }
231
232 static int
233 pcc_fname_list_parse(char *str, struct list_head *fname_list)
234 {
235         int rc = 0;
236
237         ENTRY;
238
239         INIT_LIST_HEAD(fname_list);
240         while (rc == 0 && str) {
241                 char *fname = strsep(&str, " ");
242
243                 if (*fname)
244                         rc = pcc_fname_list_add(fname, fname_list);
245         }
246         if (list_empty(fname_list))
247                 rc = -EINVAL;
248         if (rc)
249                 pcc_fname_list_free(fname_list);
250         RETURN(rc);
251 }
252
253 static int
254 pcc_id_list_parse(char *str, struct list_head *id_list,
255                   enum pcc_field type)
256 {
257         int rc = 0;
258
259         ENTRY;
260
261         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
262             type != PCC_FIELD_PROJID)
263                 RETURN(-EINVAL);
264
265         INIT_LIST_HEAD(id_list);
266         while (str) {
267                 char *num;
268                 struct pcc_match_id *id;
269                 unsigned long id_val;
270
271                 num = strsep(&str, " ");
272                 if (!*num)
273                         continue;
274                 rc = kstrtoul(num, 0, &id_val);
275                 if (rc)
276                         GOTO(out, rc);
277
278                 OBD_ALLOC_PTR(id);
279                 if (id == NULL)
280                         GOTO(out, rc = -ENOMEM);
281
282                 id->pmi_id = id_val;
283                 list_add_tail(&id->pmi_linkage, id_list);
284         }
285         if (list_empty(id_list))
286                 rc = -EINVAL;
287 out:
288         if (rc)
289                 pcc_id_list_free(id_list);
290         RETURN(rc);
291 }
292
293 static int
294 pcc_expression_parse(char *str, struct list_head *cond_list)
295 {
296         struct pcc_expression *expr;
297         char *field;
298         int len;
299         int rc = 0;
300
301         OBD_ALLOC_PTR(expr);
302         if (expr == NULL)
303                 return -ENOMEM;
304
305         field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
306         if (!*field || !str)
307                 /* No LHS or no '=' */
308                 GOTO(out, rc = -EINVAL);
309         str = skip_spaces(str);
310         len = strlen(str);
311         if (str[0] != '{' || str[len - 1] != '}')
312                 GOTO(out, rc = -EINVAL);
313
314         /* Skip '{' and '}' */
315         str[len - 1] = '\0';
316         str += 1;
317
318         if (strcmp(field, "uid") == 0) {
319                 if (pcc_id_list_parse(str,
320                                       &expr->pe_cond,
321                                       PCC_FIELD_UID) < 0)
322                         GOTO(out, rc = -EINVAL);
323                 expr->pe_field = PCC_FIELD_UID;
324         } else if (strcmp(field, "gid") == 0) {
325                 if (pcc_id_list_parse(str,
326                                       &expr->pe_cond,
327                                       PCC_FIELD_GID) < 0)
328                         GOTO(out, rc = -EINVAL);
329                 expr->pe_field = PCC_FIELD_GID;
330         } else if (strcmp(field, "projid") == 0) {
331                 if (pcc_id_list_parse(str,
332                                       &expr->pe_cond,
333                                       PCC_FIELD_PROJID) < 0)
334                         GOTO(out, rc = -EINVAL);
335                 expr->pe_field = PCC_FIELD_PROJID;
336         } else if (strcmp(field, "fname") == 0) {
337                 if (pcc_fname_list_parse(str, &expr->pe_cond) < 0)
338                         GOTO(out, rc = -EINVAL);
339                 expr->pe_field = PCC_FIELD_FNAME;
340         } else {
341                 GOTO(out, rc = -EINVAL);
342         }
343
344         list_add_tail(&expr->pe_linkage, cond_list);
345         return 0;
346 out:
347         OBD_FREE_PTR(expr);
348         return rc;
349 }
350
351 static int
352 pcc_conjunction_parse(char *str, struct list_head *cond_list)
353 {
354         struct pcc_conjunction *conjunction;
355         int rc = 0;
356
357         OBD_ALLOC_PTR(conjunction);
358         if (conjunction == NULL)
359                 return -ENOMEM;
360
361         INIT_LIST_HEAD(&conjunction->pc_expressions);
362         list_add_tail(&conjunction->pc_linkage, cond_list);
363
364         while (rc == 0 && str) {
365                 char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
366
367                 rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
368         }
369         return rc;
370 }
371
372 static int pcc_conds_parse(char *orig, struct list_head *cond_list)
373 {
374         char *str;
375         int rc = 0;
376
377         orig = kstrdup(orig, GFP_KERNEL);
378         if (!orig)
379                 return -ENOMEM;
380         str = orig;
381
382         INIT_LIST_HEAD(cond_list);
383         while (rc == 0 && str) {
384                 char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
385
386                 rc = pcc_conjunction_parse(term, cond_list);
387         }
388         kfree(orig);
389         return rc;
390 }
391
392 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
393 {
394         int rc;
395
396         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
397         if (cmd->u.pccc_add.pccc_conds_str == NULL)
398                 return -ENOMEM;
399
400         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
401
402         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
403                              &cmd->u.pccc_add.pccc_conds);
404         if (rc)
405                 pcc_cmd_fini(cmd);
406
407         return rc;
408 }
409
410 static int
411 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
412 {
413         char *key, *val;
414         unsigned long id;
415         int rc;
416
417         val = buffer;
418         key = strsep(&val, "=");
419         if (val == NULL || strlen(val) == 0)
420                 return -EINVAL;
421
422         /* Key of the value pair */
423         if (strcmp(key, "rwid") == 0) {
424                 rc = kstrtoul(val, 10, &id);
425                 if (rc)
426                         return rc;
427                 if (id <= 0)
428                         return -EINVAL;
429                 cmd->u.pccc_add.pccc_rwid = id;
430         } else if (strcmp(key, "roid") == 0) {
431                 rc = kstrtoul(val, 10, &id);
432                 if (rc)
433                         return rc;
434                 if (id <= 0)
435                         return -EINVAL;
436                 cmd->u.pccc_add.pccc_roid = id;
437         } else if (strcmp(key, "auto_attach") == 0) {
438                 rc = kstrtoul(val, 10, &id);
439                 if (rc)
440                         return rc;
441                 if (id == 0)
442                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
443         } else if (strcmp(key, "open_attach") == 0) {
444                 rc = kstrtoul(val, 10, &id);
445                 if (rc)
446                         return rc;
447                 if (id == 0)
448                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
449         } else if (strcmp(key, "io_attach") == 0) {
450                 rc = kstrtoul(val, 10, &id);
451                 if (rc)
452                         return rc;
453                 if (id == 0)
454                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
455         } else if (strcmp(key, "stat_attach") == 0) {
456                 rc = kstrtoul(val, 10, &id);
457                 if (rc)
458                         return rc;
459                 if (id == 0)
460                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
461         } else if (strcmp(key, "rwpcc") == 0) {
462                 rc = kstrtoul(val, 10, &id);
463                 if (rc)
464                         return rc;
465                 if (id > 0)
466                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
467         } else if (strcmp(key, "ropcc") == 0) {
468                 rc = kstrtoul(val, 10, &id);
469                 if (rc)
470                         return rc;
471                 if (id > 0)
472                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
473         } else {
474                 return -EINVAL;
475         }
476
477         return 0;
478 }
479
480 static int
481 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
482 {
483         char *val;
484         char *token;
485         int rc;
486
487         switch (cmd->pccc_cmd) {
488         case PCC_ADD_DATASET:
489                 /* Enable auto attach by default */
490                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
491                 break;
492         case PCC_DEL_DATASET:
493         case PCC_CLEAR_ALL:
494                 break;
495         default:
496                 return -EINVAL;
497         }
498
499         val = buffer;
500         while (val != NULL && strlen(val) != 0) {
501                 token = strsep(&val, " ");
502                 rc = pcc_parse_value_pair(cmd, token);
503                 if (rc)
504                         return rc;
505         }
506
507         switch (cmd->pccc_cmd) {
508         case PCC_ADD_DATASET:
509                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
510                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
511                         return -EINVAL;
512                 /*
513                  * By default, a PCC backend can provide caching service for
514                  * both PCC-RW and PCC-RO.
515                  */
516                 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
517                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
518
519                 if (cmd->u.pccc_add.pccc_rwid == 0 &&
520                     cmd->u.pccc_add.pccc_roid == 0)
521                         return -EINVAL;
522
523                 if (cmd->u.pccc_add.pccc_rwid == 0 &&
524                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC)
525                         cmd->u.pccc_add.pccc_rwid = cmd->u.pccc_add.pccc_roid;
526
527                 if (cmd->u.pccc_add.pccc_roid == 0 &&
528                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
529                         cmd->u.pccc_add.pccc_roid = cmd->u.pccc_add.pccc_rwid;
530
531                 break;
532         case PCC_DEL_DATASET:
533         case PCC_CLEAR_ALL:
534                 break;
535         default:
536                 return -EINVAL;
537         }
538         return 0;
539 }
540
541 static void
542 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
543 {
544         if (!list_empty(&rule->pmr_conds))
545                 pcc_rule_conds_free(&rule->pmr_conds);
546         LASSERT(rule->pmr_conds_str != NULL);
547         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
548 }
549
550 static int
551 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
552 {
553         int rc = 0;
554
555         LASSERT(cmd->u.pccc_add.pccc_conds_str);
556         OBD_ALLOC(rule->pmr_conds_str,
557                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
558         if (rule->pmr_conds_str == NULL)
559                 return -ENOMEM;
560
561         memcpy(rule->pmr_conds_str,
562                cmd->u.pccc_add.pccc_conds_str,
563                strlen(cmd->u.pccc_add.pccc_conds_str));
564
565         INIT_LIST_HEAD(&rule->pmr_conds);
566         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
567                 rc = pcc_conds_parse(rule->pmr_conds_str,
568                                      &rule->pmr_conds);
569
570         if (rc)
571                 pcc_dataset_rule_fini(rule);
572
573         return rc;
574 }
575
576 /* Rule Matching */
577 static int
578 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
579 {
580         struct pcc_match_id *id;
581
582         list_for_each_entry(id, id_list, pmi_linkage) {
583                 if (id->pmi_id == id_val)
584                         return 1;
585         }
586         return 0;
587 }
588
589 static bool
590 cfs_match_wildcard(const char *pattern, const char *content)
591 {
592         if (*pattern == '\0' && *content == '\0')
593                 return true;
594
595         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
596                 return false;
597
598         while (*pattern == *content) {
599                 pattern++;
600                 content++;
601                 if (*pattern == '\0' && *content == '\0')
602                         return true;
603
604                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
605                     *content == '\0')
606                         return false;
607         }
608
609         if (*pattern == '*')
610                 return (cfs_match_wildcard(pattern + 1, content) ||
611                         cfs_match_wildcard(pattern, content + 1));
612
613         return false;
614 }
615
616 static int
617 pcc_fname_list_match(struct list_head *fname_list, const char *name)
618 {
619         struct pcc_match_fname *fname;
620
621         list_for_each_entry(fname, fname_list, pmf_linkage) {
622                 if (cfs_match_wildcard(fname->pmf_name, name))
623                         return 1;
624         }
625         return 0;
626 }
627
628 static int
629 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
630 {
631         switch (expr->pe_field) {
632         case PCC_FIELD_UID:
633                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
634         case PCC_FIELD_GID:
635                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
636         case PCC_FIELD_PROJID:
637                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
638         case PCC_FIELD_FNAME:
639                 return pcc_fname_list_match(&expr->pe_cond,
640                                             matcher->pm_name->name);
641         default:
642                 return 0;
643         }
644 }
645
646 static int
647 pcc_conjunction_match(struct pcc_conjunction *conjunction,
648                       struct pcc_matcher *matcher)
649 {
650         struct pcc_expression *expr;
651         int matched;
652
653         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
654                 matched = pcc_expression_match(expr, matcher);
655                 if (!matched)
656                         return 0;
657         }
658
659         return 1;
660 }
661
662 static int
663 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
664 {
665         struct pcc_conjunction *conjunction;
666         int matched;
667
668         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
669                 matched = pcc_conjunction_match(conjunction, matcher);
670                 if (matched)
671                         return 1;
672         }
673
674         return 0;
675 }
676
677 struct pcc_dataset*
678 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
679 {
680         struct pcc_dataset *dataset;
681         struct pcc_dataset *selected = NULL;
682
683         down_read(&super->pccs_rw_sem);
684         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
685                 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
686                         continue;
687
688                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
689                         atomic_inc(&dataset->pccd_refcount);
690                         selected = dataset;
691                         break;
692                 }
693         }
694         up_read(&super->pccs_rw_sem);
695         if (selected)
696                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
697                        dataset->pccd_rule.pmr_conds_str,
698                        matcher->pm_uid, matcher->pm_gid,
699                        matcher->pm_projid, matcher->pm_name->name);
700
701         return selected;
702 }
703
704 /**
705  * pcc_dataset_add - Add a Cache policy to control which files need be
706  * cached and where it will be cached.
707  *
708  * @super:      superblock of pcc
709  * @cmd:        pcc command
710  */
711 static int
712 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
713 {
714         char *pathname = cmd->pccc_pathname;
715         struct pcc_dataset *dataset;
716         struct pcc_dataset *tmp;
717         bool found = false;
718         int rc;
719
720         OBD_ALLOC_PTR(dataset);
721         if (dataset == NULL)
722                 return -ENOMEM;
723
724         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
725         if (unlikely(rc)) {
726                 OBD_FREE_PTR(dataset);
727                 return rc;
728         }
729         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
730         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
731         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
732         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
733         atomic_set(&dataset->pccd_refcount, 1);
734
735         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
736         if (rc) {
737                 pcc_dataset_put(dataset);
738                 return rc;
739         }
740
741         down_write(&super->pccs_rw_sem);
742         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
743                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
744                     (dataset->pccd_rwid != 0 &&
745                      dataset->pccd_rwid == tmp->pccd_rwid) ||
746                     (dataset->pccd_roid != 0 &&
747                      dataset->pccd_roid == tmp->pccd_roid)) {
748                         found = true;
749                         break;
750                 }
751         }
752         if (!found)
753                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
754         up_write(&super->pccs_rw_sem);
755
756         if (found) {
757                 pcc_dataset_put(dataset);
758                 rc = -EEXIST;
759         }
760
761         return rc;
762 }
763
764 static struct pcc_dataset *
765 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
766 {
767         struct pcc_dataset *dataset;
768         struct pcc_dataset *selected = NULL;
769
770         if (id == 0)
771                 return NULL;
772
773         /*
774          * archive ID (read-write ID) or read-only ID is unique in the list,
775          * we just return last added one as first priority.
776          */
777         down_read(&super->pccs_rw_sem);
778         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
779                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
780                     !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
781                         continue;
782                 if (type == LU_PCC_READONLY && (dataset->pccd_roid != id ||
783                     !(dataset->pccd_flags & PCC_DATASET_ROPCC)))
784                         continue;
785                 atomic_inc(&dataset->pccd_refcount);
786                 selected = dataset;
787                 break;
788         }
789         up_read(&super->pccs_rw_sem);
790         if (selected)
791                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
792
793         return selected;
794 }
795
796 void
797 pcc_dataset_put(struct pcc_dataset *dataset)
798 {
799         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
800                 pcc_dataset_rule_fini(&dataset->pccd_rule);
801                 path_put(&dataset->pccd_path);
802                 OBD_FREE_PTR(dataset);
803         }
804 }
805
806 static int
807 pcc_dataset_del(struct pcc_super *super, char *pathname)
808 {
809         struct list_head *l, *tmp;
810         struct pcc_dataset *dataset;
811         int rc = -ENOENT;
812
813         down_write(&super->pccs_rw_sem);
814         list_for_each_safe(l, tmp, &super->pccs_datasets) {
815                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
816                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
817                         list_del_init(&dataset->pccd_linkage);
818                         pcc_dataset_put(dataset);
819                         super->pccs_generation++;
820                         rc = 0;
821                         break;
822                 }
823         }
824         up_write(&super->pccs_rw_sem);
825         return rc;
826 }
827
828 static void
829 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
830 {
831         seq_printf(m, "%s:\n", dataset->pccd_pathname);
832         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
833         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
834         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
835 }
836
837 int
838 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
839 {
840         struct pcc_dataset *dataset;
841
842         down_read(&super->pccs_rw_sem);
843         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
844                 pcc_dataset_dump(dataset, m);
845         }
846         up_read(&super->pccs_rw_sem);
847         return 0;
848 }
849
850 static void pcc_remove_datasets(struct pcc_super *super)
851 {
852         struct pcc_dataset *dataset, *tmp;
853
854         down_write(&super->pccs_rw_sem);
855         list_for_each_entry_safe(dataset, tmp,
856                                  &super->pccs_datasets, pccd_linkage) {
857                 list_del(&dataset->pccd_linkage);
858                 pcc_dataset_put(dataset);
859         }
860         super->pccs_generation++;
861         up_write(&super->pccs_rw_sem);
862 }
863
864 void pcc_super_fini(struct pcc_super *super)
865 {
866         pcc_remove_datasets(super);
867         put_cred(super->pccs_cred);
868 }
869
870 static bool pathname_is_valid(const char *pathname)
871 {
872         /* Needs to be absolute path */
873         if (pathname == NULL || strlen(pathname) == 0 ||
874             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
875                 return false;
876         return true;
877 }
878
879 static struct pcc_cmd *
880 pcc_cmd_parse(char *buffer, unsigned long count)
881 {
882         static struct pcc_cmd *cmd;
883         char *token;
884         char *val;
885         int rc = 0;
886
887         OBD_ALLOC_PTR(cmd);
888         if (cmd == NULL)
889                 GOTO(out, rc = -ENOMEM);
890
891         /* clear all setting */
892         if (strncmp(buffer, "clear", 5) == 0) {
893                 cmd->pccc_cmd = PCC_CLEAR_ALL;
894                 GOTO(out, rc = 0);
895         }
896
897         val = buffer;
898         token = strsep(&val, " ");
899         if (val == NULL || strlen(val) == 0)
900                 GOTO(out_free_cmd, rc = -EINVAL);
901
902         /* Type of the command */
903         if (strcmp(token, "add") == 0)
904                 cmd->pccc_cmd = PCC_ADD_DATASET;
905         else if (strcmp(token, "del") == 0)
906                 cmd->pccc_cmd = PCC_DEL_DATASET;
907         else
908                 GOTO(out_free_cmd, rc = -EINVAL);
909
910         /* Pathname of the dataset */
911         token = strsep(&val, " ");
912         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
913             !pathname_is_valid(token))
914                 GOTO(out_free_cmd, rc = -EINVAL);
915         cmd->pccc_pathname = token;
916
917         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
918                 /* List of ID */
919                 LASSERT(val);
920                 token = val;
921                 val = strrchr(token, '}');
922                 if (!val)
923                         GOTO(out_free_cmd, rc = -EINVAL);
924
925                 /* Skip '}' */
926                 val++;
927                 if (*val == '\0') {
928                         val = NULL;
929                 } else if (*val == ' ') {
930                         *val = '\0';
931                         val++;
932                 } else {
933                         GOTO(out_free_cmd, rc = -EINVAL);
934                 }
935
936                 rc = pcc_id_parse(cmd, token);
937                 if (rc)
938                         GOTO(out_free_cmd, rc);
939
940                 rc = pcc_parse_value_pairs(cmd, val);
941                 if (rc)
942                         GOTO(out_cmd_fini, rc = -EINVAL);
943         }
944         goto out;
945 out_cmd_fini:
946         pcc_cmd_fini(cmd);
947 out_free_cmd:
948         OBD_FREE_PTR(cmd);
949 out:
950         if (rc)
951                 cmd = ERR_PTR(rc);
952         return cmd;
953 }
954
955 int pcc_cmd_handle(char *buffer, unsigned long count,
956                    struct pcc_super *super)
957 {
958         int rc = 0;
959         struct pcc_cmd *cmd;
960
961         cmd = pcc_cmd_parse(buffer, count);
962         if (IS_ERR(cmd))
963                 return PTR_ERR(cmd);
964
965         switch (cmd->pccc_cmd) {
966         case PCC_ADD_DATASET:
967                 rc = pcc_dataset_add(super, cmd);
968                 break;
969         case PCC_DEL_DATASET:
970                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
971                 break;
972         case PCC_CLEAR_ALL:
973                 pcc_remove_datasets(super);
974                 break;
975         default:
976                 rc = -EINVAL;
977                 break;
978         }
979
980         pcc_cmd_fini(cmd);
981         OBD_FREE_PTR(cmd);
982         return rc;
983 }
984
985 static inline void pcc_inode_lock(struct inode *inode)
986 {
987         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
988 }
989
990 static inline void pcc_inode_unlock(struct inode *inode)
991 {
992         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
993 }
994
995 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
996 {
997         pcci->pcci_lli = lli;
998         lli->lli_pcc_inode = pcci;
999         atomic_set(&pcci->pcci_refcount, 0);
1000         pcci->pcci_type = LU_PCC_NONE;
1001         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1002         atomic_set(&pcci->pcci_active_ios, 0);
1003         init_waitqueue_head(&pcci->pcci_waitq);
1004 }
1005
1006 static void pcc_inode_fini(struct pcc_inode *pcci)
1007 {
1008         struct ll_inode_info *lli = pcci->pcci_lli;
1009
1010         path_put(&pcci->pcci_path);
1011         pcci->pcci_type = LU_PCC_NONE;
1012         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1013         lli->lli_pcc_inode = NULL;
1014 }
1015
1016 static void pcc_inode_get(struct pcc_inode *pcci)
1017 {
1018         atomic_inc(&pcci->pcci_refcount);
1019 }
1020
1021 static void pcc_inode_put(struct pcc_inode *pcci)
1022 {
1023         if (atomic_dec_and_test(&pcci->pcci_refcount))
1024                 pcc_inode_fini(pcci);
1025 }
1026
1027 void pcc_inode_free(struct inode *inode)
1028 {
1029         struct pcc_inode *pcci = ll_i2pcci(inode);
1030
1031         if (pcci) {
1032                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1033                 pcc_inode_put(pcci);
1034         }
1035 }
1036
1037 /*
1038  * TODO:
1039  * As Andreas suggested, we'd better use new layout to
1040  * reduce overhead:
1041  * (fid->f_oid >> 16 & oxFFFF)/FID
1042  */
1043 #define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1044 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1045 {
1046         return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1047                          DFID_NOBRACE,
1048                          (fid)->f_oid       & 0xFFFF,
1049                          (fid)->f_oid >> 16 & 0xFFFF,
1050                          (unsigned int)((fid)->f_seq       & 0xFFFF),
1051                          (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1052                          (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1053                          (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1054                          PFID(fid));
1055 }
1056
1057 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1058 {
1059         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1060 }
1061
1062 void pcc_file_init(struct pcc_file *pccf)
1063 {
1064         pccf->pccf_file = NULL;
1065         pccf->pccf_type = LU_PCC_NONE;
1066 }
1067
1068 static inline bool pcc_auto_attach_enabled(enum pcc_dataset_flags flags,
1069                                            enum pcc_io_type iot)
1070 {
1071         if (iot == PIT_OPEN)
1072                 return flags & PCC_DATASET_OPEN_ATTACH;
1073         if (iot == PIT_GETATTR)
1074                 return flags & PCC_DATASET_STAT_ATTACH;
1075         else
1076                 return flags & PCC_DATASET_AUTO_ATTACH;
1077 }
1078
1079 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1080
1081 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1082 {
1083         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1084         struct ll_inode_info *lli = pcci->pcci_lli;
1085         int rc;
1086
1087         ENTRY;
1088
1089         if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1090                 RETURN(0);
1091
1092         rc = ll_vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1093                              &gen, sizeof(gen), 0);
1094
1095         RETURN(rc);
1096 }
1097
1098 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1099 {
1100         struct lu_env *env;
1101         struct ll_inode_info *lli = ll_i2info(inode);
1102         __u16 refcheck;
1103         int rc;
1104
1105         ENTRY;
1106
1107         if (!lli->lli_clob)
1108                 RETURN(-EINVAL);
1109
1110         env = cl_env_get(&refcheck);
1111         if (IS_ERR(env))
1112                 RETURN(PTR_ERR(env));
1113
1114         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1115         if (rc < 0)
1116                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1117                        PFID(ll_inode2fid(inode)));
1118
1119         cl_env_put(env, &refcheck);
1120         RETURN(rc < 0 ? rc : 0);
1121 }
1122
1123 /* Must be called with pcci->pcci_lock held */
1124 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1125                                   struct pcc_inode *pcci,
1126                                   struct dentry *dentry,
1127                                   enum lu_pcc_type type)
1128 {
1129         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1130         pcci->pcci_path.dentry = dentry;
1131         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1132         atomic_set(&pcci->pcci_refcount, 1);
1133         pcci->pcci_type = type;
1134         pcci->pcci_attr_valid = false;
1135 }
1136
1137 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1138                                          struct pcc_dataset *dataset)
1139 {
1140         lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1141         lli->lli_pcc_dsflags = dataset->pccd_flags;
1142 }
1143
1144 static void pcc_inode_attach_set(struct pcc_super *super,
1145                                  struct pcc_dataset *dataset,
1146                                  struct ll_inode_info *lli,
1147                                  struct pcc_inode *pcci,
1148                                  struct dentry *dentry,
1149                                  enum lu_pcc_type type)
1150 {
1151         pcc_inode_init(pcci, lli);
1152         pcc_inode_attach_init(dataset, pcci, dentry, type);
1153         down_read(&super->pccs_rw_sem);
1154         pcc_inode_dsflags_set(lli, dataset);
1155         up_read(&super->pccs_rw_sem);
1156 }
1157
1158 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1159                                       __u32 gen)
1160 {
1161         pcci->pcci_layout_gen = gen;
1162 }
1163
1164 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1165 {
1166         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1167 }
1168
1169 static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
1170 {
1171         char *ptr = NULL, *component;
1172         struct dentry *parent;
1173         struct dentry *child = ERR_PTR(-ENOENT);
1174
1175         ptr = pathname;
1176
1177         /* move past any initial '/' to the start of the first path component*/
1178         while (*ptr == '/')
1179                 ptr++;
1180
1181         /* store the start of the first path component */
1182         component = ptr;
1183
1184         parent = dget(base);
1185         while (ptr) {
1186                 /* find the start of the next component - if we don't find it,
1187                  * the current component is the last component
1188                  */
1189                 ptr = strchr(ptr, '/');
1190                 /* put a NUL char in place of the '/' before the next compnent
1191                  * so we can treat this component as a string; note the full
1192                  * path string is NUL terminated to this is not needed for the
1193                  * last component
1194                  */
1195                 if (ptr)
1196                         *ptr = '\0';
1197
1198                 /* look up the current component */
1199                 inode_lock(parent->d_inode);
1200                 child = lookup_one_len(component, parent, strlen(component));
1201                 inode_unlock(parent->d_inode);
1202
1203                 /* repair the path string: put '/' back in place of the NUL */
1204                 if (ptr)
1205                         *ptr = '/';
1206
1207                 dput(parent);
1208
1209                 if (IS_ERR_OR_NULL(child))
1210                         break;
1211
1212                 /* we may find a cached negative dentry */
1213                 if (!d_is_positive(child)) {
1214                         dput(child);
1215                         child = NULL;
1216                         break;
1217                 }
1218
1219                 /* descend in to the next level of the path */
1220                 parent = child;
1221
1222                 /* move the pointer past the '/' to the next component */
1223                 if (ptr)
1224                         ptr++;
1225                 component = ptr;
1226         }
1227
1228         /* NULL child means we didn't find anything */
1229         if (!child)
1230                 child = ERR_PTR(-ENOENT);
1231
1232         return child;
1233 }
1234
1235 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1236                                   enum lu_pcc_type type,
1237                                   struct pcc_dataset *dataset,
1238                                   bool *cached)
1239 {
1240         struct ll_inode_info *lli = ll_i2info(inode);
1241         struct pcc_inode *pcci = lli->lli_pcc_inode;
1242         const struct cred *old_cred;
1243         struct dentry *pcc_dentry = NULL;
1244         char pathname[PCC_DATASET_MAX_PATH];
1245         __u32 pcc_gen;
1246         int rc;
1247
1248         ENTRY;
1249
1250         if (type == LU_PCC_READWRITE &&
1251             !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1252                 RETURN(0);
1253
1254         if (type == LU_PCC_READONLY &&
1255             !(dataset->pccd_flags & PCC_DATASET_ROPCC))
1256                 RETURN(0);
1257
1258         rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
1259                                   &lli->lli_fid);
1260
1261         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1262         pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
1263         if (IS_ERR(pcc_dentry)) {
1264                 rc = PTR_ERR(pcc_dentry);
1265                 CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
1266                        ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
1267                        pathname, rc);
1268                 /* ignore this error */
1269                 GOTO(out, rc = 0);
1270         }
1271
1272         rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1273                              &pcc_gen, sizeof(pcc_gen));
1274         if (rc < 0)
1275                 /* ignore this error */
1276                 GOTO(out_put_pcc_dentry, rc = 0);
1277
1278         rc = 0;
1279         /* The file is still valid cached in PCC, attach it immediately. */
1280         if (pcc_gen == gen) {
1281                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1282                        PFID(&lli->lli_fid), gen);
1283                 if (!pcci) {
1284                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1285                         if (pcci == NULL)
1286                                 GOTO(out_put_pcc_dentry, rc = -ENOMEM);
1287
1288                         pcc_inode_init(pcci, lli);
1289                         dget(pcc_dentry);
1290                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1291                 } else {
1292                         /*
1293                          * This happened when a file was once attached into
1294                          * PCC, and some processes keep this file opened
1295                          * (pcci->refcount > 1) and corresponding PCC file
1296                          * without any I/O activity, and then this file was
1297                          * detached by the manual detach command or the
1298                          * revocation of the layout lock (i.e. cached LRU lock
1299                          * shrinking).
1300                          */
1301                         pcc_inode_get(pcci);
1302                         pcci->pcci_type = type;
1303                 }
1304                 pcc_inode_dsflags_set(lli, dataset);
1305                 pcc_layout_gen_set(pcci, gen);
1306                 *cached = true;
1307         }
1308 out_put_pcc_dentry:
1309         dput(pcc_dentry);
1310 out:
1311         revert_creds(old_cred);
1312         RETURN(rc);
1313 }
1314
1315 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1316                                    __u32 gen, enum lu_pcc_type type,
1317                                    bool *cached)
1318 {
1319         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1320         struct ll_inode_info *lli = ll_i2info(inode);
1321         struct pcc_dataset *dataset = NULL, *tmp;
1322         int rc = 0;
1323
1324         ENTRY;
1325
1326         down_read(&super->pccs_rw_sem);
1327         list_for_each_entry_safe(dataset, tmp,
1328                                  &super->pccs_datasets, pccd_linkage) {
1329                 if (!pcc_auto_attach_enabled(dataset->pccd_flags, iot))
1330                         break;
1331
1332                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1333                 if (rc < 0 || (!rc && *cached))
1334                         break;
1335         }
1336
1337         /*
1338          * Update the saved dataset flags for the inode accordingly if failed.
1339          */
1340         if (!rc && !*cached) {
1341                 /*
1342                  * Currently auto attach strategy for a PCC backend is
1343                  * unchangeable once once it was added into the PCC datasets on
1344                  * a client as the support to change auto attach strategy is
1345                  * not implemented yet.
1346                  */
1347                 /*
1348                  * If tried to attach from one PCC backend:
1349                  * @lli_pcc_generation > 0:
1350                  * 1) The file was once attached into PCC, but now the
1351                  * corresponding PCC backend should be removed from the client;
1352                  * 2) The layout generation was changed, the data has been
1353                  * restored;
1354                  * 3) The corresponding PCC copy is not existed on PCC
1355                  * @lli_pcc_generation == 0:
1356                  * The file is never attached into PCC but in a HSM released
1357                  * state, or once attached into PCC but the inode was evicted
1358                  * from icache later.
1359                  * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1360                  * file will skip from the candidates to try auto attach until
1361                  * the file is attached into PCC again.
1362                  *
1363                  * If the file was never attached into PCC, or once attached but
1364                  * its inode was evicted from icache (lli_pcc_generation == 0),
1365                  * or the corresponding dataset was removed from the client,
1366                  * set the saved dataset flags with PCC_DATASET_NONE.
1367                  *
1368                  * TODO: If the file was once attached into PCC but not try to
1369                  * auto attach due to the change of the configuration parameters
1370                  * for this dataset (i.e. change from auto attach enabled to
1371                  * auto attach disabled for this dataset), update the saved
1372                  * dataset flags with the found one.
1373                  */
1374                 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1375         }
1376         up_read(&super->pccs_rw_sem);
1377
1378         RETURN(rc);
1379 }
1380
1381 /*
1382  * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1383  * Thus the client can get archive ID from the layout directly. When try to
1384  * attach the file automatically which is in HSM released state (according to
1385  * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1386  * valid cached on PCC more precisely according to the @rwid (archive ID) in
1387  * the PCC dataset and the archive ID in HSM attrs.
1388  */
1389 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1390                                enum pcc_io_type iot)
1391 {
1392         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1393         struct cl_layout clt = {
1394                 .cl_layout_gen = 0,
1395                 .cl_is_released = false,
1396         };
1397         struct ll_inode_info *lli = ll_i2info(inode);
1398         __u32 gen;
1399         int rc;
1400
1401         ENTRY;
1402
1403         /*
1404          * Quick check whether there is PCC device.
1405          */
1406         if (list_empty(&super->pccs_datasets))
1407                 RETURN(0);
1408
1409         /*
1410          * The file layout lock was cancelled. And this open does not
1411          * obtain valid layout lock from MDT (i.e. the file is being
1412          * HSM restoring).
1413          */
1414         if (iot == PIT_OPEN) {
1415                 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1416                         RETURN(0);
1417         } else {
1418                 rc = ll_layout_refresh(inode, &gen);
1419                 if (rc)
1420                         RETURN(rc);
1421         }
1422
1423         rc = pcc_get_layout_info(inode, &clt);
1424         if (rc)
1425                 RETURN(rc);
1426
1427         if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1428                 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1429                        PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1430                 RETURN(-EINVAL);
1431         }
1432
1433         if (clt.cl_is_released)
1434                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1435                                              LU_PCC_READWRITE, cached);
1436         else if (clt.cl_is_rdonly)
1437                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1438                                              LU_PCC_READONLY, cached);
1439
1440         RETURN(rc);
1441 }
1442
1443 static inline bool pcc_may_auto_attach(struct inode *inode,
1444                                        enum pcc_io_type iot)
1445 {
1446         struct ll_inode_info *lli = ll_i2info(inode);
1447         struct pcc_super *super = ll_i2pccs(inode);
1448
1449         ENTRY;
1450
1451         /* Known the file was not in any PCC backend. */
1452         if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1453                 RETURN(false);
1454
1455         /*
1456          * lli_pcc_generation == 0 means that the file was never attached into
1457          * PCC, or may be once attached into PCC but detached as the inode is
1458          * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1459          * icache shrinking due to the memory pressure), which will cause the
1460          * file detach from PCC when releasing the inode from icache.
1461          * In either case, we still try to attach.
1462          */
1463         /* lli_pcc_generation == 0, or the PCC setting was changed,
1464          * or there is no PCC setup on the client and the try will return
1465          * immediately in pcc_try_auto_attach().
1466          */
1467         if (super->pccs_generation != lli->lli_pcc_generation)
1468                 RETURN(true);
1469
1470         /* The cached setting @lli_pcc_dsflags is valid */
1471         if (iot == PIT_OPEN)
1472                 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH);
1473
1474         if (iot == PIT_GETATTR)
1475                 RETURN(lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH);
1476
1477         RETURN(lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH);
1478 }
1479
1480 int pcc_file_open(struct inode *inode, struct file *file)
1481 {
1482         struct pcc_inode *pcci;
1483         struct ll_inode_info *lli = ll_i2info(inode);
1484         struct ll_file_data *fd = file->private_data;
1485         struct pcc_file *pccf = &fd->fd_pcc_file;
1486         struct file *pcc_file;
1487         struct path *path;
1488         bool cached = false;
1489         int rc = 0;
1490
1491         ENTRY;
1492
1493         if (!S_ISREG(inode->i_mode))
1494                 RETURN(0);
1495
1496         if (IS_ENCRYPTED(inode))
1497                 RETURN(0);
1498
1499         pcc_inode_lock(inode);
1500         pcci = ll_i2pcci(inode);
1501
1502         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1503                 GOTO(out_unlock, rc = 0);
1504
1505         if (!pcci || !pcc_inode_has_layout(pcci)) {
1506                 if (pcc_may_auto_attach(inode, PIT_OPEN))
1507                         rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1508
1509                 if (rc < 0 || !cached)
1510                         GOTO(out_unlock, rc);
1511
1512                 if (!pcci)
1513                         pcci = ll_i2pcci(inode);
1514         }
1515
1516         pcc_inode_get(pcci);
1517         WARN_ON(pccf->pccf_file);
1518
1519         path = &pcci->pcci_path;
1520         CDEBUG(D_CACHE, "opening pcc file '%pd'\n", path->dentry);
1521
1522         pcc_file = dentry_open(path, file->f_flags,
1523                                pcc_super_cred(inode->i_sb));
1524         if (IS_ERR_OR_NULL(pcc_file)) {
1525                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1526                 pcc_inode_put(pcci);
1527         } else {
1528                 pccf->pccf_file = pcc_file;
1529                 pccf->pccf_type = pcci->pcci_type;
1530         }
1531
1532 out_unlock:
1533         pcc_inode_unlock(inode);
1534         RETURN(rc);
1535 }
1536
1537 void pcc_file_release(struct inode *inode, struct file *file)
1538 {
1539         struct pcc_inode *pcci;
1540         struct ll_file_data *fd = file->private_data;
1541         struct pcc_file *pccf;
1542         struct path *path;
1543
1544         ENTRY;
1545
1546         if (!S_ISREG(inode->i_mode) || fd == NULL)
1547                 RETURN_EXIT;
1548
1549         pccf = &fd->fd_pcc_file;
1550         pcc_inode_lock(inode);
1551         if (pccf->pccf_file == NULL)
1552                 goto out;
1553
1554         pcci = ll_i2pcci(inode);
1555         LASSERT(pcci);
1556         path = &pcci->pcci_path;
1557         CDEBUG(D_CACHE, "releasing pcc file \"%pd\"\n", path->dentry);
1558         pcc_inode_put(pcci);
1559         fput(pccf->pccf_file);
1560         pccf->pccf_file = NULL;
1561 out:
1562         pcc_inode_unlock(inode);
1563         RETURN_EXIT;
1564 }
1565
1566 /* Tolerate the IO failure on PCC and fall back to normal Lustre IO path */
1567 static bool pcc_io_tolerate(struct pcc_inode *pcci,
1568                             enum pcc_io_type iot, int rc)
1569 {
1570         if (pcci->pcci_type == LU_PCC_READWRITE) {
1571                 if (iot == PIT_WRITE && (rc == -ENOSPC || rc == -EDQUOT))
1572                         return false;
1573                 /* Handle the ->page_mkwrite failure tolerance separately
1574                  * in pcc_page_mkwrite().
1575                  */
1576         } else if (pcci->pcci_type == LU_PCC_READONLY) {
1577                 if ((iot == PIT_READ || iot == PIT_GETATTR ||
1578                      iot == PIT_SPLICE_READ) && rc < 0 && rc != -ENOMEM)
1579                         return false;
1580                 if (iot == PIT_FAULT && (rc & VM_FAULT_SIGBUS) &&
1581                     !(rc & VM_FAULT_OOM))
1582                         return false;
1583         }
1584
1585         return true;
1586 }
1587
1588 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1589 {
1590         struct pcc_inode *pcci;
1591
1592         pcc_inode_lock(inode);
1593         pcci = ll_i2pcci(inode);
1594         if (pcci && pcc_inode_has_layout(pcci)) {
1595                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1596                 if (pcci->pcci_type == LU_PCC_READONLY &&
1597                     (iot == PIT_WRITE || iot == PIT_SETATTR ||
1598                      iot == PIT_PAGE_MKWRITE)) {
1599                         /* Fall back to normal I/O path */
1600                         *cached = false;
1601                         /* For mmap write, we need to detach the file from
1602                          * RO-PCC, release the page got from ->fault(), and
1603                          * then retry the memory fault handling (->fault()
1604                          * and ->page_mkwrite()).
1605                          * These are done in pcc_page_mkwrite();
1606                          */
1607                 } else {
1608                         atomic_inc(&pcci->pcci_active_ios);
1609                         *cached = true;
1610                 }
1611         } else {
1612                 *cached = false;
1613                 if (pcc_may_auto_attach(inode, iot)) {
1614                         (void) pcc_try_auto_attach(inode, cached, iot);
1615                         if (*cached) {
1616                                 pcci = ll_i2pcci(inode);
1617                                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1618                                 atomic_inc(&pcci->pcci_active_ios);
1619                         }
1620                 }
1621         }
1622         pcc_inode_unlock(inode);
1623 }
1624
1625 static void pcc_io_fini(struct inode *inode, enum pcc_io_type iot,
1626                         int rc, bool *cached)
1627 {
1628         struct pcc_inode *pcci = ll_i2pcci(inode);
1629
1630         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0 && *cached);
1631
1632         *cached = pcc_io_tolerate(pcci, iot, rc);
1633         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1634                 wake_up(&pcci->pcci_waitq);
1635 }
1636
1637
1638 static ssize_t
1639 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1640 {
1641         struct file *file = iocb->ki_filp;
1642
1643 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1644         return file->f_op->read_iter(iocb, iter);
1645 #else
1646         struct iovec iov;
1647         struct iov_iter i;
1648         ssize_t bytes = 0;
1649
1650         iov_for_each(iov, i, *iter) {
1651                 ssize_t res;
1652
1653                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1654                 if (-EIOCBQUEUED == res)
1655                         res = wait_on_sync_kiocb(iocb);
1656                 if (res <= 0) {
1657                         if (bytes == 0)
1658                                 bytes = res;
1659                         break;
1660                 }
1661
1662                 bytes += res;
1663                 if (res < iov.iov_len)
1664                         break;
1665         }
1666
1667         if (bytes > 0)
1668                 iov_iter_advance(iter, bytes);
1669         return bytes;
1670 #endif
1671 }
1672
1673 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1674                            struct iov_iter *iter, bool *cached)
1675 {
1676         struct file *file = iocb->ki_filp;
1677         struct ll_file_data *fd = file->private_data;
1678         struct pcc_file *pccf = &fd->fd_pcc_file;
1679         struct inode *inode = file_inode(file);
1680         ssize_t result;
1681
1682         ENTRY;
1683
1684         if (pccf->pccf_file == NULL) {
1685                 *cached = false;
1686                 RETURN(0);
1687         }
1688
1689         pcc_io_init(inode, PIT_READ, cached);
1690         if (!*cached)
1691                 RETURN(0);
1692
1693         /* Fake I/O error on RO-PCC */
1694         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1695                 GOTO(out, result = -EIO);
1696
1697         iocb->ki_filp = pccf->pccf_file;
1698         /* generic_file_aio_read does not support ext4-dax,
1699          * __pcc_file_read_iter uses ->aio_read hook directly
1700          * to add support for ext4-dax.
1701          */
1702         result = __pcc_file_read_iter(iocb, iter);
1703         iocb->ki_filp = file;
1704 out:
1705         pcc_io_fini(inode, PIT_READ, result, cached);
1706         RETURN(result);
1707 }
1708
1709 static ssize_t
1710 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1711 {
1712         struct file *file = iocb->ki_filp;
1713
1714 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1715         return file->f_op->write_iter(iocb, iter);
1716 #else
1717         struct iovec iov;
1718         struct iov_iter i;
1719         ssize_t bytes = 0;
1720
1721         iov_for_each(iov, i, *iter) {
1722                 ssize_t res;
1723
1724                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1725                 if (-EIOCBQUEUED == res)
1726                         res = wait_on_sync_kiocb(iocb);
1727                 if (res <= 0) {
1728                         if (bytes == 0)
1729                                 bytes = res;
1730                         break;
1731                 }
1732
1733                 bytes += res;
1734                 if (res < iov.iov_len)
1735                         break;
1736         }
1737
1738         if (bytes > 0)
1739                 iov_iter_advance(iter, bytes);
1740         return bytes;
1741 #endif
1742 }
1743
1744 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1745                             struct iov_iter *iter, bool *cached)
1746 {
1747         struct file *file = iocb->ki_filp;
1748         struct ll_file_data *fd = file->private_data;
1749         struct pcc_file *pccf = &fd->fd_pcc_file;
1750         struct inode *inode = file_inode(file);
1751         ssize_t result;
1752
1753         ENTRY;
1754
1755         if (pccf->pccf_file == NULL) {
1756                 *cached = false;
1757                 RETURN(0);
1758         }
1759
1760         if (pccf->pccf_type != LU_PCC_READWRITE) {
1761                 *cached = false;
1762                 RETURN(-EAGAIN);
1763         }
1764
1765         pcc_io_init(inode, PIT_WRITE, cached);
1766         if (!*cached)
1767                 RETURN(0);
1768
1769         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1770                 GOTO(out, result = -ENOSPC);
1771
1772         iocb->ki_filp = pccf->pccf_file;
1773
1774         /* Since __pcc_file_write_iter makes write calls via
1775          * the normal vfs interface to the local PCC file system,
1776          * the inode lock is not needed.
1777          */
1778         result = __pcc_file_write_iter(iocb, iter);
1779         iocb->ki_filp = file;
1780 out:
1781         pcc_io_fini(inode, PIT_WRITE, result, cached);
1782         RETURN(result);
1783 }
1784
1785 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1786                       bool *cached)
1787 {
1788         int rc;
1789         const struct cred *old_cred;
1790         struct iattr attr2 = *attr;
1791         struct dentry *pcc_dentry;
1792         struct pcc_inode *pcci;
1793
1794         ENTRY;
1795
1796         if (!S_ISREG(inode->i_mode)) {
1797                 *cached = false;
1798                 RETURN(0);
1799         }
1800
1801         pcc_io_init(inode, PIT_SETATTR, cached);
1802         if (!*cached)
1803                 RETURN(0);
1804
1805         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1806                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1807                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1808         pcci = ll_i2pcci(inode);
1809         pcc_dentry = pcci->pcci_path.dentry;
1810         inode_lock(pcc_dentry->d_inode);
1811         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1812 #ifdef HAVE_USER_NAMESPACE_ARG
1813         rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
1814                                                 &attr2);
1815 #else
1816         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1817 #endif
1818         revert_creds(old_cred);
1819         inode_unlock(pcc_dentry->d_inode);
1820
1821         pcc_io_fini(inode, PIT_SETATTR, rc, cached);
1822         RETURN(rc);
1823 }
1824
1825 int pcc_inode_getattr(struct inode *inode, u32 request_mask,
1826                       unsigned int flags, bool *cached)
1827 {
1828         struct ll_inode_info *lli = ll_i2info(inode);
1829         const struct cred *old_cred;
1830         struct kstat stat;
1831         s64 atime;
1832         s64 mtime;
1833         s64 ctime;
1834         int rc;
1835
1836         ENTRY;
1837
1838         if (!S_ISREG(inode->i_mode)) {
1839                 *cached = false;
1840                 RETURN(0);
1841         }
1842
1843         pcc_io_init(inode, PIT_GETATTR, cached);
1844         if (!*cached)
1845                 RETURN(0);
1846
1847         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1848         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat, request_mask,
1849                             flags);
1850         revert_creds(old_cred);
1851         if (rc)
1852                 GOTO(out, rc);
1853
1854         ll_inode_size_lock(inode);
1855         if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
1856             inode_get_atime_sec(inode) < lli->lli_atime)
1857                 inode_set_atime(inode, lli->lli_atime, 0);
1858
1859         inode_set_mtime(inode, lli->lli_mtime, 0);
1860         inode_set_ctime(inode, lli->lli_ctime, 0);
1861
1862         atime = inode_get_atime_sec(inode);
1863         mtime = inode_get_mtime_sec(inode);
1864         ctime = inode_get_ctime_sec(inode);
1865
1866         if (atime < stat.atime.tv_sec)
1867                 atime = stat.atime.tv_sec;
1868
1869         if (ctime < stat.ctime.tv_sec)
1870                 ctime = stat.ctime.tv_sec;
1871
1872         if (mtime < stat.mtime.tv_sec)
1873                 mtime = stat.mtime.tv_sec;
1874
1875         i_size_write(inode, stat.size);
1876         inode->i_blocks = stat.blocks;
1877
1878         inode_set_atime(inode, atime, 0);
1879         inode_set_mtime(inode, mtime, 0);
1880         inode_set_ctime(inode, ctime, 0);
1881
1882         ll_inode_size_unlock(inode);
1883 out:
1884         pcc_io_fini(inode, PIT_GETATTR, rc, cached);
1885         RETURN(rc);
1886 }
1887
1888 #ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
1889 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1890                              struct pipe_inode_info *pipe,
1891                              size_t count, unsigned int flags)
1892 {
1893         struct inode *inode = file_inode(in_file);
1894         struct ll_file_data *fd = in_file->private_data;
1895         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1896         bool cached = false;
1897         ssize_t result;
1898
1899         ENTRY;
1900
1901         if (!pcc_file)
1902                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1903                                                 count, flags));
1904
1905         pcc_io_init(inode, PIT_SPLICE_READ, &cached);
1906         if (!cached)
1907                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1908                                                 count, flags));
1909
1910         result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
1911
1912         pcc_io_fini(inode, PIT_SPLICE_READ, result, &cached);
1913         RETURN(result);
1914 }
1915 #endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
1916
1917 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1918               int datasync, bool *cached)
1919 {
1920         struct inode *inode = file_inode(file);
1921         struct ll_file_data *fd = file->private_data;
1922         struct pcc_file *pccf = &fd->fd_pcc_file;
1923         struct file *pcc_file = pccf->pccf_file;
1924         int rc;
1925
1926         ENTRY;
1927
1928         if (!pcc_file) {
1929                 *cached = false;
1930                 RETURN(0);
1931         }
1932
1933         if (!S_ISREG(inode->i_mode)) {
1934                 *cached = false;
1935                 RETURN(0);
1936         }
1937
1938         /*
1939          * After the file is attached into RO-PCC, its dirty pages on this
1940          * client may not be flushed. So fsync() should fall back to normal
1941          * Lustre I/O path flushing dirty data to OSTs. And flush on RO-PCC
1942          * copy is meaningless.
1943          */
1944         if (pccf->pccf_type == LU_PCC_READONLY) {
1945                 *cached = false;
1946                 RETURN(-EAGAIN);
1947         }
1948
1949         pcc_io_init(inode, PIT_FSYNC, cached);
1950         if (!*cached)
1951                 RETURN(0);
1952
1953         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1954                                                 start, end, datasync);
1955
1956         pcc_io_fini(inode, PIT_FSYNC, rc, cached);
1957         RETURN(rc);
1958 }
1959
1960 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1961                   bool *cached)
1962 {
1963         struct inode *inode = file_inode(file);
1964         struct ll_file_data *fd = file->private_data;
1965         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1966         struct pcc_inode *pcci;
1967         int rc = 0;
1968
1969         ENTRY;
1970
1971         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1972                 *cached = false;
1973                 RETURN(0);
1974         }
1975
1976         pcc_inode_lock(inode);
1977         pcci = ll_i2pcci(inode);
1978         if (pcci && pcc_inode_has_layout(pcci)) {
1979                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1980                 *cached = true;
1981                 vma->vm_file = pcc_file;
1982                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1983                 vma->vm_file = file;
1984                 /* Save the vm ops of backend PCC */
1985                 vma->vm_private_data = (void *)vma->vm_ops;
1986         } else {
1987                 *cached = false;
1988         }
1989         pcc_inode_unlock(inode);
1990
1991         RETURN(rc);
1992 }
1993
1994 void pcc_vm_open(struct vm_area_struct *vma)
1995 {
1996         struct pcc_inode *pcci;
1997         struct file *file = vma->vm_file;
1998         struct inode *inode = file_inode(file);
1999         struct ll_file_data *fd = file->private_data;
2000         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2001         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2002
2003         ENTRY;
2004
2005         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
2006                 RETURN_EXIT;
2007
2008         pcc_inode_lock(inode);
2009         pcci = ll_i2pcci(inode);
2010         if (pcci && pcc_inode_has_layout(pcci)) {
2011                 vma->vm_file = pcc_file;
2012                 pcc_vm_ops->open(vma);
2013                 vma->vm_file = file;
2014         }
2015         pcc_inode_unlock(inode);
2016         EXIT;
2017 }
2018
2019 void pcc_vm_close(struct vm_area_struct *vma)
2020 {
2021         struct file *file = vma->vm_file;
2022         struct inode *inode = file_inode(file);
2023         struct ll_file_data *fd = file->private_data;
2024         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2025         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2026
2027         ENTRY;
2028
2029         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
2030                 RETURN_EXIT;
2031
2032         pcc_inode_lock(inode);
2033         /* Layout lock maybe revoked here */
2034         vma->vm_file = pcc_file;
2035         pcc_vm_ops->close(vma);
2036         vma->vm_file = file;
2037         pcc_inode_unlock(inode);
2038         EXIT;
2039 }
2040
2041 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
2042                      bool *cached)
2043 {
2044         struct page *page = vmf->page;
2045         struct mm_struct *mm = vma->vm_mm;
2046         struct file *file = vma->vm_file;
2047         struct inode *inode = file_inode(file);
2048         struct ll_file_data *fd = file->private_data;
2049         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2050         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2051         int rc;
2052
2053         ENTRY;
2054
2055         if (!pcc_file || !pcc_vm_ops) {
2056                 *cached = false;
2057                 RETURN(0);
2058         }
2059
2060         if (!pcc_vm_ops->page_mkwrite &&
2061             page->mapping == pcc_file->f_mapping) {
2062                 CDEBUG(D_MMAP,
2063                        "%s: PCC backend fs not support ->page_mkwrite()\n",
2064                        ll_i2sbi(inode)->ll_fsname);
2065                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2066                 mmap_read_unlock(mm);
2067                 *cached = true;
2068                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2069         }
2070         /* Pause to allow for a race with concurrent detach */
2071         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
2072
2073         pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
2074         if (!*cached) {
2075                 /* This happens when the file is detached from PCC after got
2076                  * the fault page via ->fault() on the inode of the PCC copy.
2077                  * Here it can not simply fall back to normal Lustre I/O path.
2078                  * The reason is that the address space of fault page used by
2079                  * ->page_mkwrite() is still the one of PCC inode. In the
2080                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2081                  * handled as the address space of the fault page is not
2082                  * consistent with the one of the Lustre inode (though the
2083                  * fault page was truncated).
2084                  * As the file is detached from PCC, the fault page must
2085                  * be released frist, and retry the mmap write (->fault() and
2086                  * ->page_mkwrite).
2087                  * We use an ugly and tricky method by returning
2088                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2089                  * __do_page_fault and retry the memory fault handling.
2090                  */
2091                 if (page->mapping == pcc_file->f_mapping) {
2092                         pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2093                         *cached = true;
2094                         mmap_read_unlock(mm);
2095                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2096                 }
2097
2098                 RETURN(0);
2099         }
2100
2101         /*
2102          * This fault injection can also be used to simulate -ENOSPC and
2103          * -EDQUOT failure of underlying PCC backend fs.
2104          */
2105         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE))
2106                 GOTO(out, rc = VM_FAULT_SIGBUS);
2107
2108         vma->vm_file = pcc_file;
2109 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2110         rc = pcc_vm_ops->page_mkwrite(vmf);
2111 #else
2112         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2113 #endif
2114         vma->vm_file = file;
2115
2116 out:
2117         pcc_io_fini(inode, PIT_PAGE_MKWRITE, rc, cached);
2118
2119         /* VM_FAULT_SIGBUG usually means that underlying PCC backend fs returns
2120          * -EIO, -ENOSPC or -EDQUOT. Thus we can retry this IO from the normal
2121          * Lustre I/O path.
2122          */
2123         if (rc & VM_FAULT_SIGBUS) {
2124                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2125                 mmap_read_unlock(mm);
2126                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2127         }
2128         RETURN(rc);
2129 }
2130
2131 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2132               bool *cached)
2133 {
2134         struct file *file = vma->vm_file;
2135         struct inode *inode = file_inode(file);
2136         struct ll_file_data *fd = file->private_data;
2137         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2138         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2139         int rc;
2140
2141         ENTRY;
2142
2143         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2144                 *cached = false;
2145                 RETURN(0);
2146         }
2147
2148         if (!S_ISREG(inode->i_mode)) {
2149                 *cached = false;
2150                 RETURN(0);
2151         }
2152
2153         pcc_io_init(inode, PIT_FAULT, cached);
2154         if (!*cached)
2155                 RETURN(0);
2156
2157         /* Tolerate the mmap read failure for RO-PCC */
2158         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
2159                 GOTO(out, rc = VM_FAULT_SIGBUS);
2160
2161         vma->vm_file = pcc_file;
2162 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2163         rc = pcc_vm_ops->fault(vmf);
2164 #else
2165         rc = pcc_vm_ops->fault(vma, vmf);
2166 #endif
2167         vma->vm_file = file;
2168 out:
2169         pcc_io_fini(inode, PIT_FAULT, rc, cached);
2170         RETURN(rc);
2171 }
2172
2173 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2174 {
2175         pcci->pcci_type = LU_PCC_NONE;
2176         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2177         if (atomic_read(&pcci->pcci_active_ios) == 0)
2178                 return;
2179
2180         CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2181                        atomic_read(&pcci->pcci_active_ios));
2182         wait_event_idle(pcci->pcci_waitq,
2183                         atomic_read(&pcci->pcci_active_ios) == 0);
2184 }
2185
2186 void pcc_layout_invalidate(struct inode *inode)
2187 {
2188         struct pcc_inode *pcci;
2189
2190         ENTRY;
2191
2192         pcc_inode_lock(inode);
2193         pcci = ll_i2pcci(inode);
2194         if (pcci && pcc_inode_has_layout(pcci)) {
2195                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2196                 __pcc_layout_invalidate(pcci);
2197
2198                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2199                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2200
2201                 pcc_inode_put(pcci);
2202         }
2203         pcc_inode_unlock(inode);
2204
2205         EXIT;
2206 }
2207
2208 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2209 {
2210         struct dentry *parent = dget_parent(pcc_dentry);
2211         int rc;
2212
2213         rc = vfs_unlink(&nop_mnt_idmap, d_inode(parent), pcc_dentry);
2214         if (rc)
2215                 CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
2216                       ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
2217
2218         dput(parent);
2219         return rc;
2220 }
2221
2222 /* Create directory under base if directory does not exist */
2223 static struct dentry *
2224 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2225 {
2226         int rc;
2227         struct dentry *dentry;
2228         struct inode *dir = base->d_inode;
2229
2230         inode_lock(dir);
2231         dentry = lookup_one_len(name, base, strlen(name));
2232         if (IS_ERR(dentry))
2233                 goto out;
2234
2235         if (d_is_positive(dentry))
2236                 goto out;
2237
2238         rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
2239         if (rc) {
2240                 dput(dentry);
2241                 dentry = ERR_PTR(rc);
2242                 goto out;
2243         }
2244 out:
2245         inode_unlock(dir);
2246         return dentry;
2247 }
2248
2249 static struct dentry *
2250 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2251 {
2252         char *ptr, *entry_name;
2253         struct dentry *parent;
2254         struct dentry *child = ERR_PTR(-EINVAL);
2255
2256         ptr = path;
2257         while (*ptr == '/')
2258                 ptr++;
2259
2260         entry_name = ptr;
2261         parent = dget(root);
2262         while ((ptr = strchr(ptr, '/')) != NULL) {
2263                 *ptr = '\0';
2264                 child = pcc_mkdir(parent, entry_name, mode);
2265                 *ptr = '/';
2266                 dput(parent);
2267                 if (IS_ERR(child))
2268                         break;
2269
2270                 parent = child;
2271                 ptr++;
2272                 entry_name = ptr;
2273         }
2274
2275         return child;
2276 }
2277
2278 /* Create file under base. If file already exist, return failure */
2279 static struct dentry *
2280 pcc_create(struct dentry *base, const char *name, umode_t mode)
2281 {
2282         int rc;
2283         struct dentry *dentry;
2284         struct inode *dir = base->d_inode;
2285
2286         inode_lock(dir);
2287         dentry = lookup_one_len(name, base, strlen(name));
2288         if (IS_ERR(dentry))
2289                 goto out;
2290
2291         if (d_is_positive(dentry))
2292                 goto out;
2293
2294         rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
2295         if (rc) {
2296                 dput(dentry);
2297                 dentry = ERR_PTR(rc);
2298                 goto out;
2299         }
2300 out:
2301         inode_unlock(dir);
2302         return dentry;
2303 }
2304
2305 static int __pcc_inode_create(struct pcc_dataset *dataset,
2306                               struct lu_fid *fid,
2307                               struct dentry **dentry)
2308 {
2309         char *path;
2310         struct dentry *base;
2311         struct dentry *child;
2312         int rc = 0;
2313
2314         OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
2315         if (path == NULL)
2316                 return -ENOMEM;
2317
2318         pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
2319
2320         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2321         if (IS_ERR(base)) {
2322                 rc = PTR_ERR(base);
2323                 GOTO(out, rc);
2324         }
2325
2326         snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
2327         child = pcc_create(base, path, 0);
2328         if (IS_ERR(child)) {
2329                 rc = PTR_ERR(child);
2330                 GOTO(out_base, rc);
2331         }
2332         *dentry = child;
2333
2334 out_base:
2335         dput(base);
2336 out:
2337         OBD_FREE(path, PCC_DATASET_MAX_PATH);
2338         return rc;
2339 }
2340
2341 /*
2342  * Reset uid, gid or size for the PCC copy masked by @valid.
2343  * TODO: Set the project ID for PCC copy.
2344  */
2345 static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2346                                  kuid_t uid, kgid_t gid, loff_t size)
2347 {
2348         struct inode *inode = dentry->d_inode;
2349         struct iattr attr;
2350         int rc;
2351
2352         ENTRY;
2353
2354         attr.ia_valid = valid;
2355         attr.ia_uid = uid;
2356         attr.ia_gid = gid;
2357         attr.ia_size = size;
2358
2359         inode_lock(inode);
2360         rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
2361         inode_unlock(inode);
2362
2363         RETURN(rc);
2364 }
2365
2366 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2367                      struct lu_fid *fid, struct dentry **pcc_dentry)
2368 {
2369         const struct cred *old_cred;
2370         int rc;
2371
2372         old_cred = override_creds(pcc_super_cred(sb));
2373         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2374         revert_creds(old_cred);
2375         return rc;
2376 }
2377
2378 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2379 {
2380         struct dentry *pcc_dentry = pca->pca_dentry;
2381         const struct cred *old_cred;
2382         struct pcc_super *super;
2383         struct pcc_inode *pcci;
2384         int rc;
2385
2386         ENTRY;
2387
2388         if (!pca->pca_dataset)
2389                 RETURN(0);
2390
2391         if (!inode)
2392                 GOTO(out_dataset_put, rc = 0);
2393
2394         super = ll_i2pccs(inode);
2395
2396         LASSERT(pcc_dentry);
2397
2398         old_cred = override_creds(super->pccs_cred);
2399         pcc_inode_lock(inode);
2400         LASSERT(ll_i2pcci(inode) == NULL);
2401         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2402         if (pcci == NULL)
2403                 GOTO(out_put, rc = -ENOMEM);
2404
2405         rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2406                                    old_cred->suid, old_cred->sgid, 0);
2407         if (rc)
2408                 GOTO(out_put, rc);
2409
2410         pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2411                              pcci, pcc_dentry, LU_PCC_READWRITE);
2412
2413         rc = pcc_layout_xattr_set(pcci, 0);
2414         if (rc) {
2415                 if (!pcci->pcci_unlinked)
2416                         (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2417                 pcc_inode_put(pcci);
2418                 GOTO(out_unlock, rc);
2419         }
2420
2421         /* Set the layout generation of newly created file with 0 */
2422         pcc_layout_gen_set(pcci, 0);
2423
2424 out_put:
2425         if (rc) {
2426                 (void) pcc_inode_remove(inode, pcc_dentry);
2427                 dput(pcc_dentry);
2428
2429                 if (pcci)
2430                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2431         }
2432 out_unlock:
2433         pcc_inode_unlock(inode);
2434         revert_creds(old_cred);
2435 out_dataset_put:
2436         pcc_dataset_put(pca->pca_dataset);
2437         RETURN(rc);
2438 }
2439
2440 void pcc_create_attach_cleanup(struct super_block *sb,
2441                                struct pcc_create_attach *pca)
2442 {
2443         if (!pca->pca_dataset)
2444                 return;
2445
2446         if (pca->pca_dentry) {
2447                 struct dentry *parent;
2448                 struct inode *i_dir;
2449                 const struct cred *old_cred;
2450                 int rc;
2451
2452                 old_cred = override_creds(pcc_super_cred(sb));
2453                 parent = dget_parent(pca->pca_dentry);
2454                 i_dir = d_inode(parent);
2455                 rc = vfs_unlink(&nop_mnt_idmap, i_dir, pca->pca_dentry);
2456                 dput(parent);
2457                 if (rc)
2458                         CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
2459                               ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
2460                 /* ignore the unlink failure */
2461                 revert_creds(old_cred);
2462                 dput(pca->pca_dentry);
2463         }
2464
2465         pcc_dataset_put(pca->pca_dataset);
2466 }
2467
2468 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2469                           loff_t *offset)
2470 {
2471         while (count > 0) {
2472                 ssize_t size;
2473
2474                 size = cfs_kernel_write(filp, buf, count, offset);
2475                 if (size < 0)
2476                         return size;
2477                 count -= size;
2478                 buf += size;
2479         }
2480         return 0;
2481 }
2482
2483 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2484 {
2485         ssize_t rc = 0;
2486         ssize_t rc2;
2487         loff_t pos, offset = 0;
2488         size_t buf_len = 1048576;
2489         void *buf;
2490
2491         ENTRY;
2492
2493         OBD_ALLOC_LARGE(buf, buf_len);
2494         if (buf == NULL)
2495                 RETURN(-ENOMEM);
2496
2497         while (1) {
2498                 if (signal_pending(current))
2499                         GOTO(out_free, rc = -EINTR);
2500
2501                 pos = offset;
2502                 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2503                 if (rc2 < 0)
2504                         GOTO(out_free, rc = rc2);
2505                 else if (rc2 == 0)
2506                         break;
2507
2508                 pos = offset;
2509                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2510                 if (rc < 0)
2511                         GOTO(out_free, rc);
2512                 offset += rc2;
2513         }
2514
2515         rc = offset;
2516 out_free:
2517         OBD_FREE_LARGE(buf, buf_len);
2518         RETURN(rc);
2519 }
2520
2521 static int pcc_attach_allowed_check(struct inode *inode)
2522 {
2523         struct ll_inode_info *lli = ll_i2info(inode);
2524         struct pcc_inode *pcci;
2525         int rc = 0;
2526
2527         ENTRY;
2528
2529         pcc_inode_lock(inode);
2530         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2531                 GOTO(out_unlock, rc = -EBUSY);
2532
2533         pcci = ll_i2pcci(inode);
2534         if (pcci && pcc_inode_has_layout(pcci))
2535                 GOTO(out_unlock, rc = -EEXIST);
2536
2537         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2538 out_unlock:
2539         pcc_inode_unlock(inode);
2540         RETURN(rc);
2541 }
2542
2543 static int pcc_attach_data_archive(struct file *file, struct inode *inode,
2544                                    struct pcc_dataset *dataset,
2545                                    struct dentry **dentry)
2546 {
2547         const struct cred *old_cred;
2548         struct file *pcc_filp;
2549         struct path path;
2550         ssize_t ret;
2551         int rc;
2552
2553         ENTRY;
2554
2555         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2556         rc = __pcc_inode_create(dataset, &ll_i2info(inode)->lli_fid, dentry);
2557         if (rc)
2558                 GOTO(out_cred, rc);
2559
2560         path.mnt = dataset->pccd_path.mnt;
2561         path.dentry = *dentry;
2562         pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2563         if (IS_ERR_OR_NULL(pcc_filp)) {
2564                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2565                 GOTO(out_dentry, rc);
2566         }
2567
2568         rc = pcc_inode_reset_iattr(*dentry, ATTR_UID | ATTR_GID,
2569                                    old_cred->uid, old_cred->gid, 0);
2570         if (rc)
2571                 GOTO(out_fput, rc);
2572
2573         ret = pcc_copy_data(file, pcc_filp);
2574         if (ret < 0)
2575                 GOTO(out_fput, rc = ret);
2576
2577         /*
2578          * It must to truncate the PCC copy to the same size of the Lustre
2579          * copy after copy data. Otherwise, it may get wrong file size after
2580          * re-attach a file. See LU-13023 for details.
2581          */
2582         rc = pcc_inode_reset_iattr(*dentry, ATTR_SIZE, KUIDT_INIT(0),
2583                                    KGIDT_INIT(0), ret);
2584 out_fput:
2585         fput(pcc_filp);
2586 out_dentry:
2587         if (rc) {
2588                 pcc_inode_remove(inode, *dentry);
2589                 dput(*dentry);
2590         }
2591 out_cred:
2592         revert_creds(old_cred);
2593         RETURN(rc);
2594 }
2595
2596 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2597                          __u32 archive_id)
2598 {
2599         struct pcc_dataset *dataset;
2600         struct ll_inode_info *lli = ll_i2info(inode);
2601         struct pcc_super *super = ll_i2pccs(inode);
2602         struct pcc_inode *pcci;
2603         struct dentry *dentry;
2604         int rc;
2605
2606         ENTRY;
2607
2608         rc = pcc_attach_allowed_check(inode);
2609         if (rc)
2610                 RETURN(rc);
2611
2612         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2613                                   LU_PCC_READWRITE, archive_id);
2614         if (dataset == NULL)
2615                 RETURN(-ENOENT);
2616
2617         rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2618         if (rc)
2619                 GOTO(out_dataset_put, rc);
2620
2621         /* Pause to allow for a race with concurrent HSM remove */
2622         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2623
2624         pcc_inode_lock(inode);
2625         pcci = ll_i2pcci(inode);
2626         LASSERT(!pcci);
2627         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2628         if (pcci == NULL)
2629                 GOTO(out_unlock, rc = -ENOMEM);
2630
2631         pcc_inode_attach_set(super, dataset, lli, pcci,
2632                              dentry, LU_PCC_READWRITE);
2633 out_unlock:
2634         pcc_inode_unlock(inode);
2635         if (rc) {
2636                 const struct cred *old_cred;
2637
2638                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2639                 (void) pcc_inode_remove(inode, dentry);
2640                 revert_creds(old_cred);
2641                 dput(dentry);
2642         }
2643 out_dataset_put:
2644         pcc_dataset_put(dataset);
2645
2646         RETURN(rc);
2647 }
2648
2649 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2650                               __u32 gen, bool lease_broken, int rc,
2651                               bool attached)
2652 {
2653         struct ll_inode_info *lli = ll_i2info(inode);
2654         const struct cred *old_cred;
2655         struct pcc_inode *pcci;
2656         __u32 gen2;
2657
2658         ENTRY;
2659
2660         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2661         pcc_inode_lock(inode);
2662         pcci = ll_i2pcci(inode);
2663         if (rc || lease_broken) {
2664                 if (attached && pcci)
2665                         pcc_inode_put(pcci);
2666
2667                 GOTO(out_unlock, rc);
2668         }
2669
2670         /* PCC inode may be released due to layout lock revocatioin */
2671         if (!pcci)
2672                 GOTO(out_unlock, rc = -ESTALE);
2673
2674         LASSERT(attached);
2675         rc = pcc_layout_xattr_set(pcci, gen);
2676         if (rc)
2677                 GOTO(out_put, rc);
2678
2679         LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2680         rc = ll_layout_refresh(inode, &gen2);
2681         if (!rc) {
2682                 if (gen2 == gen) {
2683                         pcc_layout_gen_set(pcci, gen);
2684                 } else {
2685                         CDEBUG(D_CACHE,
2686                                DFID" layout changed from %d to %d.\n",
2687                                PFID(ll_inode2fid(inode)), gen, gen2);
2688                         GOTO(out_put, rc = -ESTALE);
2689                 }
2690         }
2691
2692 out_put:
2693         if (rc) {
2694                 if (!pcci->pcci_unlinked)
2695                         (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2696                 pcc_inode_put(pcci);
2697         }
2698 out_unlock:
2699         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2700         pcc_inode_unlock(inode);
2701         revert_creds(old_cred);
2702         RETURN(rc);
2703 }
2704
2705 static int pcc_layout_rdonly_set(struct inode *inode, __u32 *gen)
2706
2707 {
2708         struct ll_inode_info *lli = ll_i2info(inode);
2709         struct lu_extent ext = {
2710                 .e_start = 0,
2711                 .e_end = OBD_OBJECT_EOF,
2712         };
2713         struct cl_layout clt = {
2714                 .cl_layout_gen = 0,
2715                 .cl_is_released = false,
2716                 .cl_is_rdonly = false,
2717         };
2718         int retries = 0;
2719         int rc;
2720
2721         ENTRY;
2722
2723 repeat:
2724         rc = pcc_get_layout_info(inode, &clt);
2725         if (rc)
2726                 RETURN(rc);
2727
2728         /*
2729          * For the HSM released file, restore the data first.
2730          */
2731         if (clt.cl_is_released) {
2732                 retries++;
2733                 if (retries > 2)
2734                         RETURN(-EBUSY);
2735
2736                 if (ll_layout_version_get(lli) != CL_LAYOUT_GEN_NONE) {
2737                         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2738                         if (rc) {
2739                                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2740                                        PFID(&lli->lli_fid), rc);
2741                                 RETURN(rc);
2742                         }
2743                 }
2744                 rc = ll_layout_refresh(inode, gen);
2745                 if (rc)
2746                         RETURN(rc);
2747
2748                 goto repeat;
2749         }
2750
2751
2752         if (!clt.cl_is_rdonly) {
2753                 rc = ll_layout_write_intent(inode, LAYOUT_INTENT_PCCRO_SET,
2754                                             &ext);
2755                 if (rc)
2756                         RETURN(rc);
2757
2758                 rc = ll_layout_refresh(inode, gen);
2759                 if (rc)
2760                         RETURN(rc);
2761         } else { /* Readonly layout */
2762                 *gen = clt.cl_layout_gen;
2763         }
2764
2765         RETURN(rc);
2766 }
2767
2768 static int pcc_readonly_ioctl_attach(struct file *file,
2769                                      struct inode *inode,
2770                                      struct lu_pcc_attach *attach)
2771 {
2772         struct ll_sb_info *sbi = ll_i2sbi(inode);
2773         struct pcc_super *super = ll_i2pccs(inode);
2774         struct ll_inode_info *lli = ll_i2info(inode);
2775         const struct cred *old_cred;
2776         struct pcc_dataset *dataset;
2777         struct pcc_inode *pcci;
2778         struct dentry *dentry;
2779         bool attached = false;
2780         bool unlinked = false;
2781         __u32 gen;
2782         int rc;
2783
2784         ENTRY;
2785
2786         if (!test_bit(LL_SBI_LAYOUT_LOCK, sbi->ll_flags))
2787                 RETURN(-EOPNOTSUPP);
2788
2789         rc = pcc_attach_allowed_check(inode);
2790         if (rc)
2791                 RETURN(rc);
2792
2793         rc = pcc_layout_rdonly_set(inode, &gen);
2794         if (rc)
2795                 RETURN(rc);
2796
2797         dataset = pcc_dataset_get(&ll_s2sbi(inode->i_sb)->ll_pcc_super,
2798                                   LU_PCC_READONLY, attach->pcca_id);
2799         if (dataset == NULL)
2800                 RETURN(-ENOENT);
2801
2802         rc = pcc_attach_data_archive(file, inode, dataset, &dentry);
2803         if (rc)
2804                 GOTO(out_dataset_put, rc);
2805
2806         mutex_lock(&lli->lli_layout_mutex);
2807         pcc_inode_lock(inode);
2808         old_cred = override_creds(super->pccs_cred);
2809         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2810         if (gen != ll_layout_version_get(lli))
2811                 GOTO(out_put_unlock, rc = -ESTALE);
2812
2813         pcci = ll_i2pcci(inode);
2814         if (!pcci) {
2815                 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2816                 if (pcci == NULL)
2817                         GOTO(out_put_unlock, rc = -ENOMEM);
2818
2819                 pcc_inode_attach_set(super, dataset, lli, pcci,
2820                                      dentry, LU_PCC_READONLY);
2821         } else {
2822                 atomic_inc(&pcci->pcci_refcount);
2823                 path_put(&pcci->pcci_path);
2824                 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
2825                 pcci->pcci_path.dentry = dentry;
2826                 pcci->pcci_type = LU_PCC_READONLY;
2827         }
2828         attached = true;
2829         rc = pcc_layout_xattr_set(pcci, gen);
2830         if (rc) {
2831                 pcci->pcci_type = LU_PCC_NONE;
2832                 unlinked = pcci->pcci_unlinked;
2833                 GOTO(out_put_unlock, rc);
2834         }
2835
2836         pcc_layout_gen_set(pcci, gen);
2837 out_put_unlock:
2838         if (rc) {
2839                 if (!unlinked)
2840                         (void) pcc_inode_remove(inode, dentry);
2841                 if (attached)
2842                         pcc_inode_put(pcci);
2843                 else
2844                         dput(dentry);
2845         }
2846         revert_creds(old_cred);
2847         pcc_inode_unlock(inode);
2848         mutex_unlock(&lli->lli_layout_mutex);
2849 out_dataset_put:
2850         pcc_dataset_put(dataset);
2851
2852         RETURN(rc);
2853 }
2854
2855 int pcc_ioctl_attach(struct file *file, struct inode *inode,
2856                      struct lu_pcc_attach *attach)
2857 {
2858         int rc = 0;
2859
2860         ENTRY;
2861
2862         switch (attach->pcca_type) {
2863         case LU_PCC_READWRITE:
2864                 rc = -EOPNOTSUPP;
2865                 break;
2866         case LU_PCC_READONLY:
2867                 rc = pcc_readonly_ioctl_attach(file, inode, attach);
2868                 break;
2869         default:
2870                 rc = -EINVAL;
2871                 break;
2872         }
2873
2874         RETURN(rc);
2875 }
2876
2877 static int pcc_hsm_remove(struct inode *inode)
2878 {
2879         struct hsm_user_request *hur;
2880         __u32 gen;
2881         int len;
2882         int rc;
2883
2884         ENTRY;
2885
2886         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2887         if (rc) {
2888                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2889                        PFID(&ll_i2info(inode)->lli_fid), rc);
2890                 RETURN(rc);
2891         }
2892
2893         ll_layout_refresh(inode, &gen);
2894
2895         len = sizeof(struct hsm_user_request) +
2896               sizeof(struct hsm_user_item);
2897         OBD_ALLOC(hur, len);
2898         if (hur == NULL)
2899                 RETURN(-ENOMEM);
2900
2901         hur->hur_request.hr_action = HUA_REMOVE;
2902         hur->hur_request.hr_archive_id = 0;
2903         hur->hur_request.hr_flags = 0;
2904         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2905                sizeof(hur->hur_user_item[0].hui_fid));
2906         hur->hur_user_item[0].hui_extent.offset = 0;
2907         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2908         hur->hur_request.hr_itemcount = 1;
2909         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2910                            len, hur, NULL);
2911         if (rc)
2912                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2913                        PFID(&ll_i2info(inode)->lli_fid), rc);
2914
2915         OBD_FREE(hur, len);
2916         RETURN(rc);
2917 }
2918
2919 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2920 {
2921         struct ll_inode_info *lli = ll_i2info(inode);
2922         struct pcc_inode *pcci;
2923         const struct cred *old_cred;
2924         bool hsm_remove = false;
2925         int rc = 0;
2926
2927         ENTRY;
2928
2929         pcc_inode_lock(inode);
2930         pcci = lli->lli_pcc_inode;
2931         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2932             !pcc_inode_has_layout(pcci))
2933                 GOTO(out_unlock, rc = 0);
2934
2935         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2936
2937         if (pcci->pcci_type == LU_PCC_READWRITE) {
2938                 if (opt == PCC_DETACH_OPT_UNCACHE) {
2939                         hsm_remove = true;
2940                         /*
2941                          * The file will be removed from PCC, set the flags
2942                          * with PCC_DATASET_NONE even the later removal of the
2943                          * PCC copy fails.
2944                          */
2945                         lli->lli_pcc_dsflags = PCC_DATASET_NONE;
2946                 }
2947
2948                 __pcc_layout_invalidate(pcci);
2949                 pcc_inode_put(pcci);
2950         } else if (pcci->pcci_type == LU_PCC_READONLY) {
2951                 __pcc_layout_invalidate(pcci);
2952
2953                 if (opt == PCC_DETACH_OPT_UNCACHE && !pcci->pcci_unlinked) {
2954                         old_cred =  override_creds(pcc_super_cred(inode->i_sb));
2955                         rc = pcc_inode_remove(inode, pcci->pcci_path.dentry);
2956                         revert_creds(old_cred);
2957                         if (!rc)
2958                                 pcci->pcci_unlinked = true;
2959                 }
2960
2961                 pcc_inode_put(pcci);
2962         } else {
2963                 rc = -EOPNOTSUPP;
2964         }
2965
2966 out_unlock:
2967         pcc_inode_unlock(inode);
2968         if (hsm_remove) {
2969                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2970                 rc = pcc_hsm_remove(inode);
2971                 revert_creds(old_cred);
2972         }
2973
2974         RETURN(rc);
2975 }
2976
2977 int pcc_ioctl_state(struct file *file, struct inode *inode,
2978                     struct lu_pcc_state *state)
2979 {
2980         int rc = 0;
2981         int count;
2982         char *buf;
2983         char *path;
2984         int buf_len = sizeof(state->pccs_path);
2985         struct ll_file_data *fd = file->private_data;
2986         struct pcc_file *pccf = &fd->fd_pcc_file;
2987         struct pcc_inode *pcci;
2988
2989         ENTRY;
2990
2991         if (buf_len <= 0)
2992                 RETURN(-EINVAL);
2993
2994         OBD_ALLOC(buf, buf_len);
2995         if (buf == NULL)
2996                 RETURN(-ENOMEM);
2997
2998         pcc_inode_lock(inode);
2999         pcci = ll_i2pcci(inode);
3000         if (pcci == NULL) {
3001                 state->pccs_type = LU_PCC_NONE;
3002                 GOTO(out_unlock, rc = 0);
3003         }
3004
3005         count = atomic_read(&pcci->pcci_refcount);
3006         if (count == 0) {
3007                 state->pccs_type = LU_PCC_NONE;
3008                 state->pccs_open_count = 0;
3009                 GOTO(out_unlock, rc = 0);
3010         }
3011
3012         if (pcc_inode_has_layout(pcci))
3013                 count--;
3014         if (pccf->pccf_file != NULL)
3015                 count--;
3016         state->pccs_type = pcci->pcci_type;
3017         state->pccs_open_count = count;
3018         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
3019         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
3020         if (IS_ERR(path))
3021                 GOTO(out_unlock, rc = PTR_ERR(path));
3022
3023         if (strscpy(state->pccs_path, path, buf_len) < 0)
3024                 GOTO(out_unlock, rc = -ENAMETOOLONG);
3025
3026 out_unlock:
3027         pcc_inode_unlock(inode);
3028         OBD_FREE(buf, buf_len);
3029         RETURN(rc);
3030 }