Whamcloud - gitweb
LU-13030 pcc: auto attach not work after client cache clear
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127         super->pccs_generation = 1;
128
129         return 0;
130 }
131
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
134 {
135         struct pcc_match_id *id, *n;
136
137         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138                 list_del_init(&id->pmi_linkage);
139                 OBD_FREE_PTR(id);
140         }
141 }
142
143 static void pcc_fname_list_free(struct list_head *fname_list)
144 {
145         struct pcc_match_fname *fname, *n;
146
147         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149                 list_del_init(&fname->pmf_linkage);
150                 OBD_FREE_PTR(fname);
151         }
152 }
153
154 static void pcc_expression_free(struct pcc_expression *expr)
155 {
156         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157                 expr->pe_field < PCC_FIELD_MAX);
158         switch (expr->pe_field) {
159         case PCC_FIELD_UID:
160         case PCC_FIELD_GID:
161         case PCC_FIELD_PROJID:
162                 pcc_id_list_free(&expr->pe_cond);
163                 break;
164         case PCC_FIELD_FNAME:
165                 pcc_fname_list_free(&expr->pe_cond);
166                 break;
167         default:
168                 LBUG();
169         }
170         OBD_FREE_PTR(expr);
171 }
172
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
174 {
175         struct pcc_expression *expression, *n;
176
177         LASSERT(list_empty(&conjunction->pc_linkage));
178         list_for_each_entry_safe(expression, n,
179                                  &conjunction->pc_expressions,
180                                  pe_linkage) {
181                 list_del_init(&expression->pe_linkage);
182                 pcc_expression_free(expression);
183         }
184         OBD_FREE_PTR(conjunction);
185 }
186
187 static void pcc_rule_conds_free(struct list_head *cond_list)
188 {
189         struct pcc_conjunction *conjunction, *n;
190
191         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192                 list_del_init(&conjunction->pc_linkage);
193                 pcc_conjunction_free(conjunction);
194         }
195 }
196
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
198 {
199         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202                 if (cmd->u.pccc_add.pccc_conds_str)
203                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
205         }
206 }
207
208 #define PCC_DISJUNCTION_DELIM   (',')
209 #define PCC_CONJUNCTION_DELIM   ('&')
210 #define PCC_EXPRESSION_DELIM    ('=')
211
212 static int
213 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
214 {
215         struct pcc_match_fname *fname;
216
217         OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
218         if (fname == NULL)
219                 return -ENOMEM;
220
221         OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
222         if (fname->pmf_name == NULL) {
223                 OBD_FREE(fname, sizeof(struct pcc_match_fname));
224                 return -ENOMEM;
225         }
226
227         memcpy(fname->pmf_name, id->ls_str, id->ls_len);
228         list_add_tail(&fname->pmf_linkage, fname_list);
229         return 0;
230 }
231
232 static int
233 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
234 {
235         struct cfs_lstr src;
236         struct cfs_lstr res;
237         int rc = 0;
238
239         ENTRY;
240
241         src.ls_str = str;
242         src.ls_len = len;
243         INIT_LIST_HEAD(fname_list);
244         while (src.ls_str) {
245                 rc = cfs_gettok(&src, ' ', &res);
246                 if (rc == 0) {
247                         rc = -EINVAL;
248                         break;
249                 }
250                 rc = pcc_fname_list_add(&res, fname_list);
251                 if (rc)
252                         break;
253         }
254         if (rc)
255                 pcc_fname_list_free(fname_list);
256         RETURN(rc);
257 }
258
259 static int
260 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
261                   enum pcc_field type)
262 {
263         struct cfs_lstr src;
264         struct cfs_lstr res;
265         int rc = 0;
266
267         ENTRY;
268
269         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
270             type != PCC_FIELD_PROJID)
271                 RETURN(-EINVAL);
272
273         src.ls_str = str;
274         src.ls_len = len;
275         INIT_LIST_HEAD(id_list);
276         while (src.ls_str) {
277                 struct pcc_match_id *id;
278                 __u32 id_val;
279
280                 if (cfs_gettok(&src, ' ', &res) == 0)
281                         GOTO(out, rc = -EINVAL);
282
283                 if (!cfs_str2num_check(res.ls_str, res.ls_len,
284                                        &id_val, 0, (u32)~0U))
285                         GOTO(out, rc = -EINVAL);
286
287                 OBD_ALLOC_PTR(id);
288                 if (id == NULL)
289                         GOTO(out, rc = -ENOMEM);
290
291                 id->pmi_id = id_val;
292                 list_add_tail(&id->pmi_linkage, id_list);
293         }
294 out:
295         if (rc)
296                 pcc_id_list_free(id_list);
297         RETURN(rc);
298 }
299
300 static inline bool
301 pcc_check_field(struct cfs_lstr *field, char *str)
302 {
303         int len = strlen(str);
304
305         return (field->ls_len == len &&
306                 strncmp(field->ls_str, str, len) == 0);
307 }
308
309 static int
310 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
311 {
312         struct pcc_expression *expr;
313         struct cfs_lstr field;
314         int rc = 0;
315
316         OBD_ALLOC(expr, sizeof(struct pcc_expression));
317         if (expr == NULL)
318                 return -ENOMEM;
319
320         rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
321         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
322             src->ls_str[src->ls_len - 1] != '}')
323                 GOTO(out, rc = -EINVAL);
324
325         /* Skip '{' and '}' */
326         src->ls_str++;
327         src->ls_len -= 2;
328
329         if (pcc_check_field(&field, "uid")) {
330                 if (pcc_id_list_parse(src->ls_str,
331                                       src->ls_len,
332                                       &expr->pe_cond,
333                                       PCC_FIELD_UID) < 0)
334                         GOTO(out, rc = -EINVAL);
335                 expr->pe_field = PCC_FIELD_UID;
336         } else if (pcc_check_field(&field, "gid")) {
337                 if (pcc_id_list_parse(src->ls_str,
338                                       src->ls_len,
339                                       &expr->pe_cond,
340                                       PCC_FIELD_GID) < 0)
341                         GOTO(out, rc = -EINVAL);
342                 expr->pe_field = PCC_FIELD_GID;
343         } else if (pcc_check_field(&field, "projid")) {
344                 if (pcc_id_list_parse(src->ls_str,
345                                       src->ls_len,
346                                       &expr->pe_cond,
347                                       PCC_FIELD_PROJID) < 0)
348                         GOTO(out, rc = -EINVAL);
349                 expr->pe_field = PCC_FIELD_PROJID;
350         } else if (pcc_check_field(&field, "fname")) {
351                 if (pcc_fname_list_parse(src->ls_str,
352                                          src->ls_len,
353                                          &expr->pe_cond) < 0)
354                         GOTO(out, rc = -EINVAL);
355                 expr->pe_field = PCC_FIELD_FNAME;
356         } else {
357                 GOTO(out, rc = -EINVAL);
358         }
359
360         list_add_tail(&expr->pe_linkage, cond_list);
361         return 0;
362 out:
363         OBD_FREE_PTR(expr);
364         return rc;
365 }
366
367 static int
368 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
369 {
370         struct pcc_conjunction *conjunction;
371         struct cfs_lstr expr;
372         int rc = 0;
373
374         OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
375         if (conjunction == NULL)
376                 return -ENOMEM;
377
378         INIT_LIST_HEAD(&conjunction->pc_expressions);
379         list_add_tail(&conjunction->pc_linkage, cond_list);
380
381         while (src->ls_str) {
382                 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
383                 if (rc == 0) {
384                         rc = -EINVAL;
385                         break;
386                 }
387                 rc = pcc_expression_parse(&expr,
388                                           &conjunction->pc_expressions);
389                 if (rc)
390                         break;
391         }
392         return rc;
393 }
394
395 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
396 {
397         struct cfs_lstr src;
398         struct cfs_lstr res;
399         int rc = 0;
400
401         src.ls_str = str;
402         src.ls_len = len;
403         INIT_LIST_HEAD(cond_list);
404         while (src.ls_str) {
405                 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
406                 if (rc == 0) {
407                         rc = -EINVAL;
408                         break;
409                 }
410                 rc = pcc_conjunction_parse(&res, cond_list);
411                 if (rc)
412                         break;
413         }
414         return rc;
415 }
416
417 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
418 {
419         int rc;
420
421         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
422         if (cmd->u.pccc_add.pccc_conds_str == NULL)
423                 return -ENOMEM;
424
425         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
426
427         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
428                              strlen(cmd->u.pccc_add.pccc_conds_str),
429                              &cmd->u.pccc_add.pccc_conds);
430         if (rc)
431                 pcc_cmd_fini(cmd);
432
433         return rc;
434 }
435
436 static int
437 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
438 {
439         char *key, *val;
440         unsigned long id;
441         int rc;
442
443         val = buffer;
444         key = strsep(&val, "=");
445         if (val == NULL || strlen(val) == 0)
446                 return -EINVAL;
447
448         /* Key of the value pair */
449         if (strcmp(key, "rwid") == 0) {
450                 rc = kstrtoul(val, 10, &id);
451                 if (rc)
452                         return rc;
453                 if (id <= 0)
454                         return -EINVAL;
455                 cmd->u.pccc_add.pccc_rwid = id;
456         } else if (strcmp(key, "roid") == 0) {
457                 rc = kstrtoul(val, 10, &id);
458                 if (rc)
459                         return rc;
460                 if (id <= 0)
461                         return -EINVAL;
462                 cmd->u.pccc_add.pccc_roid = id;
463         } else if (strcmp(key, "auto_attach") == 0) {
464                 rc = kstrtoul(val, 10, &id);
465                 if (rc)
466                         return rc;
467                 if (id == 0)
468                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
469         } else if (strcmp(key, "open_attach") == 0) {
470                 rc = kstrtoul(val, 10, &id);
471                 if (rc)
472                         return rc;
473                 if (id == 0)
474                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
475         } else if (strcmp(key, "io_attach") == 0) {
476                 rc = kstrtoul(val, 10, &id);
477                 if (rc)
478                         return rc;
479                 if (id == 0)
480                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
481         } else if (strcmp(key, "stat_attach") == 0) {
482                 rc = kstrtoul(val, 10, &id);
483                 if (rc)
484                         return rc;
485                 if (id == 0)
486                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
487         } else if (strcmp(key, "rwpcc") == 0) {
488                 rc = kstrtoul(val, 10, &id);
489                 if (rc)
490                         return rc;
491                 if (id > 0)
492                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
493         } else if (strcmp(key, "ropcc") == 0) {
494                 rc = kstrtoul(val, 10, &id);
495                 if (rc)
496                         return rc;
497                 if (id > 0)
498                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
499         } else {
500                 return -EINVAL;
501         }
502
503         return 0;
504 }
505
506 static int
507 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
508 {
509         char *val;
510         char *token;
511         int rc;
512
513         switch (cmd->pccc_cmd) {
514         case PCC_ADD_DATASET:
515                 /* Enable auto attach by default */
516                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
517                 break;
518         case PCC_DEL_DATASET:
519         case PCC_CLEAR_ALL:
520                 break;
521         default:
522                 return -EINVAL;
523         }
524
525         val = buffer;
526         while (val != NULL && strlen(val) != 0) {
527                 token = strsep(&val, " ");
528                 rc = pcc_parse_value_pair(cmd, token);
529                 if (rc)
530                         return rc;
531         }
532
533         switch (cmd->pccc_cmd) {
534         case PCC_ADD_DATASET:
535                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
536                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
537                         return -EINVAL;
538                 /*
539                  * By default, a PCC backend can provide caching service for
540                  * both RW-PCC and RO-PCC.
541                  */
542                 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
543                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
544
545                 /* For RW-PCC, the value of @rwid must be non zero. */
546                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
547                     cmd->u.pccc_add.pccc_rwid == 0)
548                         return -EINVAL;
549
550                 break;
551         case PCC_DEL_DATASET:
552         case PCC_CLEAR_ALL:
553                 break;
554         default:
555                 return -EINVAL;
556         }
557         return 0;
558 }
559
560 static void
561 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
562 {
563         if (!list_empty(&rule->pmr_conds))
564                 pcc_rule_conds_free(&rule->pmr_conds);
565         LASSERT(rule->pmr_conds_str != NULL);
566         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
567 }
568
569 static int
570 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
571 {
572         int rc = 0;
573
574         LASSERT(cmd->u.pccc_add.pccc_conds_str);
575         OBD_ALLOC(rule->pmr_conds_str,
576                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
577         if (rule->pmr_conds_str == NULL)
578                 return -ENOMEM;
579
580         memcpy(rule->pmr_conds_str,
581                cmd->u.pccc_add.pccc_conds_str,
582                strlen(cmd->u.pccc_add.pccc_conds_str));
583
584         INIT_LIST_HEAD(&rule->pmr_conds);
585         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
586                 rc = pcc_conds_parse(rule->pmr_conds_str,
587                                           strlen(rule->pmr_conds_str),
588                                           &rule->pmr_conds);
589
590         if (rc)
591                 pcc_dataset_rule_fini(rule);
592
593         return rc;
594 }
595
596 /* Rule Matching */
597 static int
598 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
599 {
600         struct pcc_match_id *id;
601
602         list_for_each_entry(id, id_list, pmi_linkage) {
603                 if (id->pmi_id == id_val)
604                         return 1;
605         }
606         return 0;
607 }
608
609 static bool
610 cfs_match_wildcard(const char *pattern, const char *content)
611 {
612         if (*pattern == '\0' && *content == '\0')
613                 return true;
614
615         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
616                 return false;
617
618         while (*pattern == *content) {
619                 pattern++;
620                 content++;
621                 if (*pattern == '\0' && *content == '\0')
622                         return true;
623
624                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
625                     *content == '\0')
626                         return false;
627         }
628
629         if (*pattern == '*')
630                 return (cfs_match_wildcard(pattern + 1, content) ||
631                         cfs_match_wildcard(pattern, content + 1));
632
633         return false;
634 }
635
636 static int
637 pcc_fname_list_match(struct list_head *fname_list, const char *name)
638 {
639         struct pcc_match_fname *fname;
640
641         list_for_each_entry(fname, fname_list, pmf_linkage) {
642                 if (cfs_match_wildcard(fname->pmf_name, name))
643                         return 1;
644         }
645         return 0;
646 }
647
648 static int
649 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
650 {
651         switch (expr->pe_field) {
652         case PCC_FIELD_UID:
653                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
654         case PCC_FIELD_GID:
655                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
656         case PCC_FIELD_PROJID:
657                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
658         case PCC_FIELD_FNAME:
659                 return pcc_fname_list_match(&expr->pe_cond,
660                                             matcher->pm_name->name);
661         default:
662                 return 0;
663         }
664 }
665
666 static int
667 pcc_conjunction_match(struct pcc_conjunction *conjunction,
668                       struct pcc_matcher *matcher)
669 {
670         struct pcc_expression *expr;
671         int matched;
672
673         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
674                 matched = pcc_expression_match(expr, matcher);
675                 if (!matched)
676                         return 0;
677         }
678
679         return 1;
680 }
681
682 static int
683 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
684 {
685         struct pcc_conjunction *conjunction;
686         int matched;
687
688         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
689                 matched = pcc_conjunction_match(conjunction, matcher);
690                 if (matched)
691                         return 1;
692         }
693
694         return 0;
695 }
696
697 struct pcc_dataset*
698 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
699 {
700         struct pcc_dataset *dataset;
701         struct pcc_dataset *selected = NULL;
702
703         down_read(&super->pccs_rw_sem);
704         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
705                 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
706                         continue;
707
708                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
709                         atomic_inc(&dataset->pccd_refcount);
710                         selected = dataset;
711                         break;
712                 }
713         }
714         up_read(&super->pccs_rw_sem);
715         if (selected)
716                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
717                        dataset->pccd_rule.pmr_conds_str,
718                        matcher->pm_uid, matcher->pm_gid,
719                        matcher->pm_projid, matcher->pm_name->name);
720
721         return selected;
722 }
723
724 /**
725  * pcc_dataset_add - Add a Cache policy to control which files need be
726  * cached and where it will be cached.
727  *
728  * @super:      superblock of pcc
729  * @cmd:        pcc command
730  */
731 static int
732 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
733 {
734         char *pathname = cmd->pccc_pathname;
735         struct pcc_dataset *dataset;
736         struct pcc_dataset *tmp;
737         bool found = false;
738         int rc;
739
740         OBD_ALLOC_PTR(dataset);
741         if (dataset == NULL)
742                 return -ENOMEM;
743
744         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
745         if (unlikely(rc)) {
746                 OBD_FREE_PTR(dataset);
747                 return rc;
748         }
749         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
750         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
751         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
752         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
753         atomic_set(&dataset->pccd_refcount, 1);
754
755         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
756         if (rc) {
757                 pcc_dataset_put(dataset);
758                 return rc;
759         }
760
761         down_write(&super->pccs_rw_sem);
762         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
763                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
764                     (dataset->pccd_rwid != 0 &&
765                      dataset->pccd_rwid == tmp->pccd_rwid) ||
766                     (dataset->pccd_roid != 0 &&
767                      dataset->pccd_roid == tmp->pccd_roid)) {
768                         found = true;
769                         break;
770                 }
771         }
772         if (!found)
773                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
774         up_write(&super->pccs_rw_sem);
775
776         if (found) {
777                 pcc_dataset_put(dataset);
778                 rc = -EEXIST;
779         }
780
781         return rc;
782 }
783
784 struct pcc_dataset *
785 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
786 {
787         struct pcc_dataset *dataset;
788         struct pcc_dataset *selected = NULL;
789
790         if (id == 0)
791                 return NULL;
792
793         /*
794          * archive ID (read-write ID) or read-only ID is unique in the list,
795          * we just return last added one as first priority.
796          */
797         down_read(&super->pccs_rw_sem);
798         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
799                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
800                     !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
801                         continue;
802                 atomic_inc(&dataset->pccd_refcount);
803                 selected = dataset;
804                 break;
805         }
806         up_read(&super->pccs_rw_sem);
807         if (selected)
808                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
809
810         return selected;
811 }
812
813 void
814 pcc_dataset_put(struct pcc_dataset *dataset)
815 {
816         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
817                 pcc_dataset_rule_fini(&dataset->pccd_rule);
818                 path_put(&dataset->pccd_path);
819                 OBD_FREE_PTR(dataset);
820         }
821 }
822
823 static int
824 pcc_dataset_del(struct pcc_super *super, char *pathname)
825 {
826         struct list_head *l, *tmp;
827         struct pcc_dataset *dataset;
828         int rc = -ENOENT;
829
830         down_write(&super->pccs_rw_sem);
831         list_for_each_safe(l, tmp, &super->pccs_datasets) {
832                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
833                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
834                         list_del_init(&dataset->pccd_linkage);
835                         pcc_dataset_put(dataset);
836                         super->pccs_generation++;
837                         rc = 0;
838                         break;
839                 }
840         }
841         up_write(&super->pccs_rw_sem);
842         return rc;
843 }
844
845 static void
846 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
847 {
848         seq_printf(m, "%s:\n", dataset->pccd_pathname);
849         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
850         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
851         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
852 }
853
854 int
855 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
856 {
857         struct pcc_dataset *dataset;
858
859         down_read(&super->pccs_rw_sem);
860         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
861                 pcc_dataset_dump(dataset, m);
862         }
863         up_read(&super->pccs_rw_sem);
864         return 0;
865 }
866
867 static void pcc_remove_datasets(struct pcc_super *super)
868 {
869         struct pcc_dataset *dataset, *tmp;
870
871         down_write(&super->pccs_rw_sem);
872         list_for_each_entry_safe(dataset, tmp,
873                                  &super->pccs_datasets, pccd_linkage) {
874                 list_del(&dataset->pccd_linkage);
875                 pcc_dataset_put(dataset);
876         }
877         super->pccs_generation++;
878         up_write(&super->pccs_rw_sem);
879 }
880
881 void pcc_super_fini(struct pcc_super *super)
882 {
883         pcc_remove_datasets(super);
884         put_cred(super->pccs_cred);
885 }
886
887 static bool pathname_is_valid(const char *pathname)
888 {
889         /* Needs to be absolute path */
890         if (pathname == NULL || strlen(pathname) == 0 ||
891             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
892                 return false;
893         return true;
894 }
895
896 static struct pcc_cmd *
897 pcc_cmd_parse(char *buffer, unsigned long count)
898 {
899         static struct pcc_cmd *cmd;
900         char *token;
901         char *val;
902         int rc = 0;
903
904         OBD_ALLOC_PTR(cmd);
905         if (cmd == NULL)
906                 GOTO(out, rc = -ENOMEM);
907
908         /* clear all setting */
909         if (strncmp(buffer, "clear", 5) == 0) {
910                 cmd->pccc_cmd = PCC_CLEAR_ALL;
911                 GOTO(out, rc = 0);
912         }
913
914         val = buffer;
915         token = strsep(&val, " ");
916         if (val == NULL || strlen(val) == 0)
917                 GOTO(out_free_cmd, rc = -EINVAL);
918
919         /* Type of the command */
920         if (strcmp(token, "add") == 0)
921                 cmd->pccc_cmd = PCC_ADD_DATASET;
922         else if (strcmp(token, "del") == 0)
923                 cmd->pccc_cmd = PCC_DEL_DATASET;
924         else
925                 GOTO(out_free_cmd, rc = -EINVAL);
926
927         /* Pathname of the dataset */
928         token = strsep(&val, " ");
929         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
930             !pathname_is_valid(token))
931                 GOTO(out_free_cmd, rc = -EINVAL);
932         cmd->pccc_pathname = token;
933
934         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
935                 /* List of ID */
936                 LASSERT(val);
937                 token = val;
938                 val = strrchr(token, '}');
939                 if (!val)
940                         GOTO(out_free_cmd, rc = -EINVAL);
941
942                 /* Skip '}' */
943                 val++;
944                 if (*val == '\0') {
945                         val = NULL;
946                 } else if (*val == ' ') {
947                         *val = '\0';
948                         val++;
949                 } else {
950                         GOTO(out_free_cmd, rc = -EINVAL);
951                 }
952
953                 rc = pcc_id_parse(cmd, token);
954                 if (rc)
955                         GOTO(out_free_cmd, rc);
956
957                 rc = pcc_parse_value_pairs(cmd, val);
958                 if (rc)
959                         GOTO(out_cmd_fini, rc = -EINVAL);
960         }
961         goto out;
962 out_cmd_fini:
963         pcc_cmd_fini(cmd);
964 out_free_cmd:
965         OBD_FREE_PTR(cmd);
966 out:
967         if (rc)
968                 cmd = ERR_PTR(rc);
969         return cmd;
970 }
971
972 int pcc_cmd_handle(char *buffer, unsigned long count,
973                    struct pcc_super *super)
974 {
975         int rc = 0;
976         struct pcc_cmd *cmd;
977
978         cmd = pcc_cmd_parse(buffer, count);
979         if (IS_ERR(cmd))
980                 return PTR_ERR(cmd);
981
982         switch (cmd->pccc_cmd) {
983         case PCC_ADD_DATASET:
984                 rc = pcc_dataset_add(super, cmd);
985                 break;
986         case PCC_DEL_DATASET:
987                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
988                 break;
989         case PCC_CLEAR_ALL:
990                 pcc_remove_datasets(super);
991                 break;
992         default:
993                 rc = -EINVAL;
994                 break;
995         }
996
997         pcc_cmd_fini(cmd);
998         OBD_FREE_PTR(cmd);
999         return rc;
1000 }
1001
1002 static inline void pcc_inode_lock(struct inode *inode)
1003 {
1004         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
1005 }
1006
1007 static inline void pcc_inode_unlock(struct inode *inode)
1008 {
1009         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1010 }
1011
1012 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1013 {
1014         pcci->pcci_lli = lli;
1015         lli->lli_pcc_inode = pcci;
1016         atomic_set(&pcci->pcci_refcount, 0);
1017         pcci->pcci_type = LU_PCC_NONE;
1018         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1019         atomic_set(&pcci->pcci_active_ios, 0);
1020         init_waitqueue_head(&pcci->pcci_waitq);
1021 }
1022
1023 static void pcc_inode_fini(struct pcc_inode *pcci)
1024 {
1025         struct ll_inode_info *lli = pcci->pcci_lli;
1026
1027         path_put(&pcci->pcci_path);
1028         pcci->pcci_type = LU_PCC_NONE;
1029         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1030         lli->lli_pcc_inode = NULL;
1031 }
1032
1033 static void pcc_inode_get(struct pcc_inode *pcci)
1034 {
1035         atomic_inc(&pcci->pcci_refcount);
1036 }
1037
1038 static void pcc_inode_put(struct pcc_inode *pcci)
1039 {
1040         if (atomic_dec_and_test(&pcci->pcci_refcount))
1041                 pcc_inode_fini(pcci);
1042 }
1043
1044 void pcc_inode_free(struct inode *inode)
1045 {
1046         struct pcc_inode *pcci = ll_i2pcci(inode);
1047
1048         if (pcci) {
1049                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1050                 pcc_inode_put(pcci);
1051         }
1052 }
1053
1054 /*
1055  * TODO:
1056  * As Andreas suggested, we'd better use new layout to
1057  * reduce overhead:
1058  * (fid->f_oid >> 16 & oxFFFF)/FID
1059  */
1060 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1061 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1062 {
1063         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1064                         DFID_NOBRACE,
1065                         (fid)->f_oid       & 0xFFFF,
1066                         (fid)->f_oid >> 16 & 0xFFFF,
1067                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1068                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1069                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1070                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1071                         PFID(fid));
1072 }
1073
1074 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1075 {
1076         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1077 }
1078
1079 void pcc_file_init(struct pcc_file *pccf)
1080 {
1081         pccf->pccf_file = NULL;
1082         pccf->pccf_type = LU_PCC_NONE;
1083 }
1084
1085 static inline bool pcc_auto_attach_enabled(enum pcc_dataset_flags flags,
1086                                            enum pcc_io_type iot)
1087 {
1088         if (iot == PIT_OPEN)
1089                 return flags & PCC_DATASET_OPEN_ATTACH;
1090         if (iot == PIT_GETATTR)
1091                 return flags & PCC_DATASET_STAT_ATTACH;
1092         else
1093                 return flags & PCC_DATASET_AUTO_ATTACH;
1094 }
1095
1096 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1097
1098 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1099 {
1100         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1101         struct ll_inode_info *lli = pcci->pcci_lli;
1102         int rc;
1103
1104         ENTRY;
1105
1106         if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1107                 RETURN(0);
1108
1109 #ifndef HAVE_VFS_SETXATTR
1110         if (!pcc_dentry->d_inode->i_op->setxattr)
1111                 RETURN(-ENOTSUPP);
1112
1113         rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
1114                                                  &gen, sizeof(gen), 0);
1115 #else
1116         rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1117                             &gen, sizeof(gen), 0);
1118 #endif
1119         RETURN(rc);
1120 }
1121
1122 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1123 {
1124         struct lu_env *env;
1125         struct ll_inode_info *lli = ll_i2info(inode);
1126         __u16 refcheck;
1127         int rc;
1128
1129         ENTRY;
1130
1131         if (!lli->lli_clob)
1132                 RETURN(-EINVAL);
1133
1134         env = cl_env_get(&refcheck);
1135         if (IS_ERR(env))
1136                 RETURN(PTR_ERR(env));
1137
1138         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1139         if (rc)
1140                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1141                        PFID(ll_inode2fid(inode)));
1142
1143         cl_env_put(env, &refcheck);
1144         RETURN(rc);
1145 }
1146
1147 static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
1148                                     struct pcc_dataset *dataset)
1149 {
1150         return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
1151                         DFID_NOBRACE,
1152                         dataset->pccd_pathname,
1153                         (fid)->f_oid       & 0xFFFF,
1154                         (fid)->f_oid >> 16 & 0xFFFF,
1155                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1156                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1157                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1158                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1159                         PFID(fid));
1160 }
1161
1162 /* Must be called with pcci->pcci_lock held */
1163 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1164                                   struct pcc_inode *pcci,
1165                                   struct dentry *dentry,
1166                                   enum lu_pcc_type type)
1167 {
1168         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1169         pcci->pcci_path.dentry = dentry;
1170         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1171         atomic_set(&pcci->pcci_refcount, 1);
1172         pcci->pcci_type = type;
1173         pcci->pcci_attr_valid = false;
1174 }
1175
1176 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1177                                          struct pcc_dataset *dataset)
1178 {
1179         lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1180         lli->lli_pcc_dsflags = dataset->pccd_flags;
1181 }
1182
1183 static void pcc_inode_attach_set(struct pcc_super *super,
1184                                  struct pcc_dataset *dataset,
1185                                  struct ll_inode_info *lli,
1186                                  struct pcc_inode *pcci,
1187                                  struct dentry *dentry,
1188                                  enum lu_pcc_type type)
1189 {
1190         pcc_inode_init(pcci, lli);
1191         pcc_inode_attach_init(dataset, pcci, dentry, type);
1192         down_read(&super->pccs_rw_sem);
1193         pcc_inode_dsflags_set(lli, dataset);
1194         up_read(&super->pccs_rw_sem);
1195 }
1196
1197 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1198                                       __u32 gen)
1199 {
1200         pcci->pcci_layout_gen = gen;
1201 }
1202
1203 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1204 {
1205         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1206 }
1207
1208 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1209                                   enum lu_pcc_type type,
1210                                   struct pcc_dataset *dataset,
1211                                   bool *cached)
1212 {
1213         struct ll_inode_info *lli = ll_i2info(inode);
1214         struct pcc_inode *pcci = lli->lli_pcc_inode;
1215         const struct cred *old_cred;
1216         struct dentry *pcc_dentry;
1217         struct path path;
1218         char *pathname;
1219         __u32 pcc_gen;
1220         int rc;
1221
1222         ENTRY;
1223
1224         if (type == LU_PCC_READWRITE &&
1225             !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1226                 RETURN(0);
1227
1228         OBD_ALLOC(pathname, PATH_MAX);
1229         if (pathname == NULL)
1230                 RETURN(-ENOMEM);
1231
1232         pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
1233
1234         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1235         rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
1236         if (rc)
1237                 /* ignore this error */
1238                 GOTO(out, rc = 0);
1239
1240         pcc_dentry = path.dentry;
1241 #ifndef HAVE_VFS_SETXATTR
1242         if (!pcc_dentry->d_inode->i_op->getxattr)
1243                 /* ignore this error */
1244                 GOTO(out_put_path, rc = 0);
1245
1246         rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
1247                                                  &pcc_gen, sizeof(pcc_gen));
1248 #else
1249         rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1250                             &pcc_gen, sizeof(pcc_gen));
1251 #endif
1252
1253         if (rc < 0)
1254                 /* ignore this error */
1255                 GOTO(out_put_path, rc = 0);
1256
1257         rc = 0;
1258         /* The file is still valid cached in PCC, attach it immediately. */
1259         if (pcc_gen == gen) {
1260                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1261                        PFID(&lli->lli_fid), gen);
1262                 if (!pcci) {
1263                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1264                         if (pcci == NULL)
1265                                 GOTO(out_put_path, rc = -ENOMEM);
1266
1267                         pcc_inode_init(pcci, lli);
1268                         dget(pcc_dentry);
1269                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1270                 } else {
1271                         /*
1272                          * This happened when a file was once attached into
1273                          * PCC, and some processes keep this file opened
1274                          * (pcci->refcount > 1) and corresponding PCC file
1275                          * without any I/O activity, and then this file was
1276                          * detached by the manual detach command or the
1277                          * revocation of the layout lock (i.e. cached LRU lock
1278                          * shrinking).
1279                          */
1280                         pcc_inode_get(pcci);
1281                         pcci->pcci_type = type;
1282                 }
1283                 pcc_inode_dsflags_set(lli, dataset);
1284                 pcc_layout_gen_set(pcci, gen);
1285                 *cached = true;
1286         }
1287 out_put_path:
1288         path_put(&path);
1289 out:
1290         revert_creds(old_cred);
1291         OBD_FREE(pathname, PATH_MAX);
1292         RETURN(rc);
1293 }
1294
1295 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1296                                    __u32 gen, enum lu_pcc_type type,
1297                                    bool *cached)
1298 {
1299         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1300         struct ll_inode_info *lli = ll_i2info(inode);
1301         struct pcc_dataset *dataset = NULL, *tmp;
1302         int rc = 0;
1303
1304         ENTRY;
1305
1306         down_read(&super->pccs_rw_sem);
1307         list_for_each_entry_safe(dataset, tmp,
1308                                  &super->pccs_datasets, pccd_linkage) {
1309                 if (!pcc_auto_attach_enabled(dataset->pccd_flags, iot))
1310                         break;
1311
1312                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1313                 if (rc < 0 || (!rc && *cached))
1314                         break;
1315         }
1316
1317         /*
1318          * Update the saved dataset flags for the inode accordingly if failed.
1319          */
1320         if (!rc && !*cached) {
1321                 /*
1322                  * Currently auto attach strategy for a PCC backend is
1323                  * unchangeable once once it was added into the PCC datasets on
1324                  * a client as the support to change auto attach strategy is
1325                  * not implemented yet.
1326                  */
1327                 /*
1328                  * If tried to attach from one PCC backend:
1329                  * @lli_pcc_generation > 0:
1330                  * 1) The file was once attached into PCC, but now the
1331                  * corresponding PCC backend should be removed from the client;
1332                  * 2) The layout generation was changed, the data has been
1333                  * restored;
1334                  * 3) The corresponding PCC copy is not existed on PCC
1335                  * @lli_pcc_generation == 0:
1336                  * The file is never attached into PCC but in a HSM released
1337                  * state, or once attached into PCC but the inode was evicted
1338                  * from icache later.
1339                  * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1340                  * file will skip from the candidates to try auto attach until
1341                  * the file is attached ninto PCC again.
1342                  *
1343                  * If the file was never attached into PCC, or once attached but
1344                  * its inode was evicted from icache (lli_pcc_generation == 0),
1345                  * set the saved dataset flags with PCC_DATASET_NONE.
1346                  *
1347                  * If the file was once attached into PCC but the corresponding
1348                  * dataset was removed from the client, set the saved dataset
1349                  * flags with PCC_DATASET_NONE.
1350                  *
1351                  * TODO: If the file was once attached into PCC but not try to
1352                  * auto attach due to the change of the configuration parameters
1353                  * for this dataset (i.e. change from auto attach enabled to
1354                  * auto attach disabled for this dataset), update the saved
1355                  * dataset flags witha the found one.
1356                  */
1357                 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1358         }
1359         up_read(&super->pccs_rw_sem);
1360
1361         RETURN(rc);
1362 }
1363
1364 /*
1365  * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1366  * Thus the client can get archive ID from the layout directly. When try to
1367  * attach the file automatically which is in HSM released state (according to
1368  * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1369  * valid cached on PCC more precisely according to the @rwid (archive ID) in
1370  * the PCC dataset and the archive ID in HSM attrs.
1371  */
1372 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1373                                enum pcc_io_type iot)
1374 {
1375         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1376         struct cl_layout clt = {
1377                 .cl_layout_gen = 0,
1378                 .cl_is_released = false,
1379         };
1380         struct ll_inode_info *lli = ll_i2info(inode);
1381         __u32 gen;
1382         int rc;
1383
1384         ENTRY;
1385
1386         /*
1387          * Quick check whether there is PCC device.
1388          */
1389         if (list_empty(&super->pccs_datasets))
1390                 RETURN(0);
1391
1392         /*
1393          * The file layout lock was cancelled. And this open does not
1394          * obtain valid layout lock from MDT (i.e. the file is being
1395          * HSM restoring).
1396          */
1397         if (iot == PIT_OPEN) {
1398                 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1399                         RETURN(0);
1400         } else {
1401                 rc = ll_layout_refresh(inode, &gen);
1402                 if (rc)
1403                         RETURN(rc);
1404         }
1405
1406         rc = pcc_get_layout_info(inode, &clt);
1407         if (rc)
1408                 RETURN(rc);
1409
1410         if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1411                 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1412                        PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1413                 RETURN(-EINVAL);
1414         }
1415
1416         if (clt.cl_is_released)
1417                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1418                                              LU_PCC_READWRITE, cached);
1419
1420         RETURN(rc);
1421 }
1422
1423 static inline bool pcc_may_auto_attach(struct inode *inode,
1424                                        enum pcc_io_type iot)
1425 {
1426         struct ll_inode_info *lli = ll_i2info(inode);
1427         struct pcc_super *super = ll_i2pccs(inode);
1428
1429         /* Known the file was not in any PCC backend. */
1430         if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1431                 return false;
1432
1433         /*
1434          * lli_pcc_generation = 0 means that the file was never attached into
1435          * PCC, or may be once attached into PCC but detached as the inode is
1436          * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1437          * icache shrinking due to the memory pressure), which will cause the
1438          * file detach from PCC when releasing the inode from icache.
1439          * In either case, we still try to attach.
1440          */
1441         /* lli_pcc_generation == 0, or the PCC setting was changed,
1442          * or there is no PCC setup on the client and the try will return
1443          * immediately in pcc_try_auto_attch().
1444          */
1445         if (super->pccs_generation != lli->lli_pcc_generation)
1446                 return true;
1447
1448         /* The cached setting @lli_pcc_dsflags is valid */
1449         if (iot == PIT_OPEN)
1450                 return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
1451
1452         if (iot == PIT_GETATTR)
1453                 return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
1454
1455         return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
1456 }
1457
1458 int pcc_file_open(struct inode *inode, struct file *file)
1459 {
1460         struct pcc_inode *pcci;
1461         struct ll_inode_info *lli = ll_i2info(inode);
1462         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1463         struct pcc_file *pccf = &fd->fd_pcc_file;
1464         struct file *pcc_file;
1465         struct path *path;
1466         struct qstr *dname;
1467         bool cached = false;
1468         int rc = 0;
1469
1470         ENTRY;
1471
1472         if (!S_ISREG(inode->i_mode))
1473                 RETURN(0);
1474
1475         pcc_inode_lock(inode);
1476         pcci = ll_i2pcci(inode);
1477
1478         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1479                 GOTO(out_unlock, rc = 0);
1480
1481         if (!pcci || !pcc_inode_has_layout(pcci)) {
1482                 if (pcc_may_auto_attach(inode, PIT_OPEN))
1483                         rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1484
1485                 if (rc < 0 || !cached)
1486                         GOTO(out_unlock, rc);
1487
1488                 if (!pcci)
1489                         pcci = ll_i2pcci(inode);
1490         }
1491
1492         pcc_inode_get(pcci);
1493         WARN_ON(pccf->pccf_file);
1494
1495         path = &pcci->pcci_path;
1496         dname = &path->dentry->d_name;
1497         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1498                dname->name);
1499
1500 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1501         pcc_file = dentry_open(path, file->f_flags,
1502                                pcc_super_cred(inode->i_sb));
1503 #else
1504         pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1505                                pcc_super_cred(inode->i_sb));
1506 #endif
1507         if (IS_ERR_OR_NULL(pcc_file)) {
1508                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1509                 pcc_inode_put(pcci);
1510         } else {
1511                 pccf->pccf_file = pcc_file;
1512                 pccf->pccf_type = pcci->pcci_type;
1513         }
1514
1515 out_unlock:
1516         pcc_inode_unlock(inode);
1517         RETURN(rc);
1518 }
1519
1520 void pcc_file_release(struct inode *inode, struct file *file)
1521 {
1522         struct pcc_inode *pcci;
1523         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1524         struct pcc_file *pccf;
1525         struct path *path;
1526         struct qstr *dname;
1527
1528         ENTRY;
1529
1530         if (!S_ISREG(inode->i_mode) || fd == NULL)
1531                 RETURN_EXIT;
1532
1533         pccf = &fd->fd_pcc_file;
1534         pcc_inode_lock(inode);
1535         if (pccf->pccf_file == NULL)
1536                 goto out;
1537
1538         pcci = ll_i2pcci(inode);
1539         LASSERT(pcci);
1540         path = &pcci->pcci_path;
1541         dname = &path->dentry->d_name;
1542         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1543                dname->name);
1544         pcc_inode_put(pcci);
1545         fput(pccf->pccf_file);
1546         pccf->pccf_file = NULL;
1547 out:
1548         pcc_inode_unlock(inode);
1549         RETURN_EXIT;
1550 }
1551
1552 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1553 {
1554         struct pcc_inode *pcci;
1555
1556         pcc_inode_lock(inode);
1557         pcci = ll_i2pcci(inode);
1558         if (pcci && pcc_inode_has_layout(pcci)) {
1559                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1560                 atomic_inc(&pcci->pcci_active_ios);
1561                 *cached = true;
1562         } else {
1563                 *cached = false;
1564                 if (pcc_may_auto_attach(inode, iot)) {
1565                         (void) pcc_try_auto_attach(inode, cached, iot);
1566                         if (*cached) {
1567                                 pcci = ll_i2pcci(inode);
1568                                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1569                                 atomic_inc(&pcci->pcci_active_ios);
1570                         }
1571                 }
1572         }
1573         pcc_inode_unlock(inode);
1574 }
1575
1576 static void pcc_io_fini(struct inode *inode)
1577 {
1578         struct pcc_inode *pcci = ll_i2pcci(inode);
1579
1580         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1581         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1582                 wake_up_all(&pcci->pcci_waitq);
1583 }
1584
1585
1586 static ssize_t
1587 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1588 {
1589         struct file *file = iocb->ki_filp;
1590
1591 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1592         return file->f_op->read_iter(iocb, iter);
1593 #else
1594         struct iovec iov;
1595         struct iov_iter i;
1596         ssize_t bytes = 0;
1597
1598         iov_for_each(iov, i, *iter) {
1599                 ssize_t res;
1600
1601                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1602                 if (-EIOCBQUEUED == res)
1603                         res = wait_on_sync_kiocb(iocb);
1604                 if (res <= 0) {
1605                         if (bytes == 0)
1606                                 bytes = res;
1607                         break;
1608                 }
1609
1610                 bytes += res;
1611                 if (res < iov.iov_len)
1612                         break;
1613         }
1614
1615         if (bytes > 0)
1616                 iov_iter_advance(iter, bytes);
1617         return bytes;
1618 #endif
1619 }
1620
1621 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1622                            struct iov_iter *iter, bool *cached)
1623 {
1624         struct file *file = iocb->ki_filp;
1625         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1626         struct pcc_file *pccf = &fd->fd_pcc_file;
1627         struct inode *inode = file_inode(file);
1628         ssize_t result;
1629
1630         ENTRY;
1631
1632         if (pccf->pccf_file == NULL) {
1633                 *cached = false;
1634                 RETURN(0);
1635         }
1636
1637         pcc_io_init(inode, PIT_READ, cached);
1638         if (!*cached)
1639                 RETURN(0);
1640
1641         iocb->ki_filp = pccf->pccf_file;
1642         /* generic_file_aio_read does not support ext4-dax,
1643          * __pcc_file_read_iter uses ->aio_read hook directly
1644          * to add support for ext4-dax.
1645          */
1646         result = __pcc_file_read_iter(iocb, iter);
1647         iocb->ki_filp = file;
1648
1649         pcc_io_fini(inode);
1650         RETURN(result);
1651 }
1652
1653 static ssize_t
1654 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1655 {
1656         struct file *file = iocb->ki_filp;
1657
1658 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1659         return file->f_op->write_iter(iocb, iter);
1660 #else
1661         struct iovec iov;
1662         struct iov_iter i;
1663         ssize_t bytes = 0;
1664
1665         iov_for_each(iov, i, *iter) {
1666                 ssize_t res;
1667
1668                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1669                 if (-EIOCBQUEUED == res)
1670                         res = wait_on_sync_kiocb(iocb);
1671                 if (res <= 0) {
1672                         if (bytes == 0)
1673                                 bytes = res;
1674                         break;
1675                 }
1676
1677                 bytes += res;
1678                 if (res < iov.iov_len)
1679                         break;
1680         }
1681
1682         if (bytes > 0)
1683                 iov_iter_advance(iter, bytes);
1684         return bytes;
1685 #endif
1686 }
1687
1688 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1689                             struct iov_iter *iter, bool *cached)
1690 {
1691         struct file *file = iocb->ki_filp;
1692         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1693         struct pcc_file *pccf = &fd->fd_pcc_file;
1694         struct inode *inode = file_inode(file);
1695         ssize_t result;
1696
1697         ENTRY;
1698
1699         if (pccf->pccf_file == NULL) {
1700                 *cached = false;
1701                 RETURN(0);
1702         }
1703
1704         if (pccf->pccf_type != LU_PCC_READWRITE) {
1705                 *cached = false;
1706                 RETURN(-EAGAIN);
1707         }
1708
1709         pcc_io_init(inode, PIT_WRITE, cached);
1710         if (!*cached)
1711                 RETURN(0);
1712
1713         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1714                 GOTO(out, result = -ENOSPC);
1715
1716         iocb->ki_filp = pccf->pccf_file;
1717
1718         /* Since __pcc_file_write_iter makes write calls via
1719          * the normal vfs interface to the local PCC file system,
1720          * the inode lock is not needed.
1721          */
1722         result = __pcc_file_write_iter(iocb, iter);
1723         iocb->ki_filp = file;
1724 out:
1725         pcc_io_fini(inode);
1726         RETURN(result);
1727 }
1728
1729 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1730                       bool *cached)
1731 {
1732         int rc;
1733         const struct cred *old_cred;
1734         struct iattr attr2 = *attr;
1735         struct dentry *pcc_dentry;
1736         struct pcc_inode *pcci;
1737
1738         ENTRY;
1739
1740         if (!S_ISREG(inode->i_mode)) {
1741                 *cached = false;
1742                 RETURN(0);
1743         }
1744
1745         pcc_io_init(inode, PIT_SETATTR, cached);
1746         if (!*cached)
1747                 RETURN(0);
1748
1749         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1750                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1751                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1752         pcci = ll_i2pcci(inode);
1753         pcc_dentry = pcci->pcci_path.dentry;
1754         inode_lock(pcc_dentry->d_inode);
1755         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1756         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1757         revert_creds(old_cred);
1758         inode_unlock(pcc_dentry->d_inode);
1759
1760         pcc_io_fini(inode);
1761         RETURN(rc);
1762 }
1763
1764 int pcc_inode_getattr(struct inode *inode, bool *cached)
1765 {
1766         struct ll_inode_info *lli = ll_i2info(inode);
1767         const struct cred *old_cred;
1768         struct kstat stat;
1769         s64 atime;
1770         s64 mtime;
1771         s64 ctime;
1772         int rc;
1773
1774         ENTRY;
1775
1776         if (!S_ISREG(inode->i_mode)) {
1777                 *cached = false;
1778                 RETURN(0);
1779         }
1780
1781         pcc_io_init(inode, PIT_GETATTR, cached);
1782         if (!*cached)
1783                 RETURN(0);
1784
1785         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1786         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1787         revert_creds(old_cred);
1788         if (rc)
1789                 GOTO(out, rc);
1790
1791         ll_inode_size_lock(inode);
1792         if (inode->i_atime.tv_sec < lli->lli_atime ||
1793             lli->lli_update_atime) {
1794                 inode->i_atime.tv_sec = lli->lli_atime;
1795                 lli->lli_update_atime = 0;
1796         }
1797         inode->i_mtime.tv_sec = lli->lli_mtime;
1798         inode->i_ctime.tv_sec = lli->lli_ctime;
1799
1800         atime = inode->i_atime.tv_sec;
1801         mtime = inode->i_mtime.tv_sec;
1802         ctime = inode->i_ctime.tv_sec;
1803
1804         if (atime < stat.atime.tv_sec)
1805                 atime = stat.atime.tv_sec;
1806
1807         if (ctime < stat.ctime.tv_sec)
1808                 ctime = stat.ctime.tv_sec;
1809
1810         if (mtime < stat.mtime.tv_sec)
1811                 mtime = stat.mtime.tv_sec;
1812
1813         i_size_write(inode, stat.size);
1814         inode->i_blocks = stat.blocks;
1815
1816         inode->i_atime.tv_sec = atime;
1817         inode->i_mtime.tv_sec = mtime;
1818         inode->i_ctime.tv_sec = ctime;
1819
1820         ll_inode_size_unlock(inode);
1821 out:
1822         pcc_io_fini(inode);
1823         RETURN(rc);
1824 }
1825
1826 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1827                              struct pipe_inode_info *pipe,
1828                              size_t count, unsigned int flags,
1829                              bool *cached)
1830 {
1831         struct inode *inode = file_inode(in_file);
1832         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1833         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1834         ssize_t result;
1835
1836         ENTRY;
1837
1838         *cached = false;
1839         if (!pcc_file)
1840                 RETURN(0);
1841
1842         if (!file_inode(pcc_file)->i_fop->splice_read)
1843                 RETURN(-ENOTSUPP);
1844
1845         pcc_io_init(inode, PIT_SPLICE_READ, cached);
1846         if (!*cached)
1847                 RETURN(0);
1848
1849         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1850                                                           ppos, pipe, count,
1851                                                           flags);
1852
1853         pcc_io_fini(inode);
1854         RETURN(result);
1855 }
1856
1857 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1858               int datasync, bool *cached)
1859 {
1860         struct inode *inode = file_inode(file);
1861         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1862         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1863         int rc;
1864
1865         ENTRY;
1866
1867         if (!pcc_file) {
1868                 *cached = false;
1869                 RETURN(0);
1870         }
1871
1872         pcc_io_init(inode, PIT_FSYNC, cached);
1873         if (!*cached)
1874                 RETURN(0);
1875
1876         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1877                                                 start, end, datasync);
1878
1879         pcc_io_fini(inode);
1880         RETURN(rc);
1881 }
1882
1883 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1884                   bool *cached)
1885 {
1886         struct inode *inode = file_inode(file);
1887         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1888         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1889         struct pcc_inode *pcci;
1890         int rc = 0;
1891
1892         ENTRY;
1893
1894         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1895                 *cached = false;
1896                 RETURN(0);
1897         }
1898
1899         pcc_inode_lock(inode);
1900         pcci = ll_i2pcci(inode);
1901         if (pcci && pcc_inode_has_layout(pcci)) {
1902                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1903                 *cached = true;
1904                 vma->vm_file = pcc_file;
1905                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1906                 vma->vm_file = file;
1907                 /* Save the vm ops of backend PCC */
1908                 vma->vm_private_data = (void *)vma->vm_ops;
1909         } else {
1910                 *cached = false;
1911         }
1912         pcc_inode_unlock(inode);
1913
1914         RETURN(rc);
1915 }
1916
1917 void pcc_vm_open(struct vm_area_struct *vma)
1918 {
1919         struct pcc_inode *pcci;
1920         struct file *file = vma->vm_file;
1921         struct inode *inode = file_inode(file);
1922         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1923         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1924         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1925
1926         ENTRY;
1927
1928         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1929                 RETURN_EXIT;
1930
1931         pcc_inode_lock(inode);
1932         pcci = ll_i2pcci(inode);
1933         if (pcci && pcc_inode_has_layout(pcci)) {
1934                 vma->vm_file = pcc_file;
1935                 pcc_vm_ops->open(vma);
1936                 vma->vm_file = file;
1937         }
1938         pcc_inode_unlock(inode);
1939         EXIT;
1940 }
1941
1942 void pcc_vm_close(struct vm_area_struct *vma)
1943 {
1944         struct file *file = vma->vm_file;
1945         struct inode *inode = file_inode(file);
1946         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1947         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1948         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1949
1950         ENTRY;
1951
1952         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1953                 RETURN_EXIT;
1954
1955         pcc_inode_lock(inode);
1956         /* Layout lock maybe revoked here */
1957         vma->vm_file = pcc_file;
1958         pcc_vm_ops->close(vma);
1959         vma->vm_file = file;
1960         pcc_inode_unlock(inode);
1961         EXIT;
1962 }
1963
1964 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1965                      bool *cached)
1966 {
1967         struct page *page = vmf->page;
1968         struct mm_struct *mm = vma->vm_mm;
1969         struct file *file = vma->vm_file;
1970         struct inode *inode = file_inode(file);
1971         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1972         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1973         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1974         int rc;
1975
1976         ENTRY;
1977
1978         if (!pcc_file || !pcc_vm_ops) {
1979                 *cached = false;
1980                 RETURN(0);
1981         }
1982
1983         if (!pcc_vm_ops->page_mkwrite &&
1984             page->mapping == pcc_file->f_mapping) {
1985                 CDEBUG(D_MMAP,
1986                        "%s: PCC backend fs not support ->page_mkwrite()\n",
1987                        ll_i2sbi(inode)->ll_fsname);
1988                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1989                 up_read(&mm->mmap_sem);
1990                 *cached = true;
1991                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1992         }
1993         /* Pause to allow for a race with concurrent detach */
1994         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1995
1996         pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
1997         if (!*cached) {
1998                 /* This happens when the file is detached from PCC after got
1999                  * the fault page via ->fault() on the inode of the PCC copy.
2000                  * Here it can not simply fall back to normal Lustre I/O path.
2001                  * The reason is that the address space of fault page used by
2002                  * ->page_mkwrite() is still the one of PCC inode. In the
2003                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2004                  * handled as the address space of the fault page is not
2005                  * consistent with the one of the Lustre inode (though the
2006                  * fault page was truncated).
2007                  * As the file is detached from PCC, the fault page must
2008                  * be released frist, and retry the mmap write (->fault() and
2009                  * ->page_mkwrite).
2010                  * We use an ugly and tricky method by returning
2011                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2012                  * __do_page_fault and retry the memory fault handling.
2013                  */
2014                 if (page->mapping == pcc_file->f_mapping) {
2015                         *cached = true;
2016                         up_read(&mm->mmap_sem);
2017                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2018                 }
2019
2020                 RETURN(0);
2021         }
2022
2023         /*
2024          * This fault injection can also be used to simulate -ENOSPC and
2025          * -EDQUOT failure of underlying PCC backend fs.
2026          */
2027         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
2028                 pcc_io_fini(inode);
2029                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2030                 up_read(&mm->mmap_sem);
2031                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2032         }
2033
2034         vma->vm_file = pcc_file;
2035 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2036         rc = pcc_vm_ops->page_mkwrite(vmf);
2037 #else
2038         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2039 #endif
2040         vma->vm_file = file;
2041
2042         pcc_io_fini(inode);
2043         RETURN(rc);
2044 }
2045
2046 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2047               bool *cached)
2048 {
2049         struct file *file = vma->vm_file;
2050         struct inode *inode = file_inode(file);
2051         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2052         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2053         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2054         int rc;
2055
2056         ENTRY;
2057
2058         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2059                 *cached = false;
2060                 RETURN(0);
2061         }
2062
2063         pcc_io_init(inode, PIT_FAULT, cached);
2064         if (!*cached)
2065                 RETURN(0);
2066
2067         vma->vm_file = pcc_file;
2068 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2069         rc = pcc_vm_ops->fault(vmf);
2070 #else
2071         rc = pcc_vm_ops->fault(vma, vmf);
2072 #endif
2073         vma->vm_file = file;
2074
2075         pcc_io_fini(inode);
2076         RETURN(rc);
2077 }
2078
2079 static void pcc_layout_wait(struct pcc_inode *pcci)
2080 {
2081         struct l_wait_info lwi = { 0 };
2082
2083         while (atomic_read(&pcci->pcci_active_ios) > 0) {
2084                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2085                        atomic_read(&pcci->pcci_active_ios));
2086                 l_wait_event(pcci->pcci_waitq,
2087                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
2088         }
2089 }
2090
2091 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2092 {
2093         pcci->pcci_type = LU_PCC_NONE;
2094         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2095         pcc_layout_wait(pcci);
2096 }
2097
2098 void pcc_layout_invalidate(struct inode *inode)
2099 {
2100         struct pcc_inode *pcci;
2101
2102         ENTRY;
2103
2104         pcc_inode_lock(inode);
2105         pcci = ll_i2pcci(inode);
2106         if (pcci && pcc_inode_has_layout(pcci)) {
2107                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2108                 __pcc_layout_invalidate(pcci);
2109
2110                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2111                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2112
2113                 pcc_inode_put(pcci);
2114         }
2115         pcc_inode_unlock(inode);
2116
2117         EXIT;
2118 }
2119
2120 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2121 {
2122         int rc;
2123
2124         rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
2125         if (rc)
2126                 CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
2127                       ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
2128                       pcc_dentry->d_name.name, rc);
2129
2130         return rc;
2131 }
2132
2133 /* Create directory under base if directory does not exist */
2134 static struct dentry *
2135 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2136 {
2137         int rc;
2138         struct dentry *dentry;
2139         struct inode *dir = base->d_inode;
2140
2141         inode_lock(dir);
2142         dentry = lookup_one_len(name, base, strlen(name));
2143         if (IS_ERR(dentry))
2144                 goto out;
2145
2146         if (d_is_positive(dentry))
2147                 goto out;
2148
2149         rc = vfs_mkdir(dir, dentry, mode);
2150         if (rc) {
2151                 dput(dentry);
2152                 dentry = ERR_PTR(rc);
2153                 goto out;
2154         }
2155 out:
2156         inode_unlock(dir);
2157         return dentry;
2158 }
2159
2160 static struct dentry *
2161 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2162 {
2163         char *ptr, *entry_name;
2164         struct dentry *parent;
2165         struct dentry *child = ERR_PTR(-EINVAL);
2166
2167         ptr = path;
2168         while (*ptr == '/')
2169                 ptr++;
2170
2171         entry_name = ptr;
2172         parent = dget(root);
2173         while ((ptr = strchr(ptr, '/')) != NULL) {
2174                 *ptr = '\0';
2175                 child = pcc_mkdir(parent, entry_name, mode);
2176                 *ptr = '/';
2177                 dput(parent);
2178                 if (IS_ERR(child))
2179                         break;
2180
2181                 parent = child;
2182                 ptr++;
2183                 entry_name = ptr;
2184         }
2185
2186         return child;
2187 }
2188
2189 /* Create file under base. If file already exist, return failure */
2190 static struct dentry *
2191 pcc_create(struct dentry *base, const char *name, umode_t mode)
2192 {
2193         int rc;
2194         struct dentry *dentry;
2195         struct inode *dir = base->d_inode;
2196
2197         inode_lock(dir);
2198         dentry = lookup_one_len(name, base, strlen(name));
2199         if (IS_ERR(dentry))
2200                 goto out;
2201
2202         if (d_is_positive(dentry))
2203                 goto out;
2204
2205         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
2206         if (rc) {
2207                 dput(dentry);
2208                 dentry = ERR_PTR(rc);
2209                 goto out;
2210         }
2211 out:
2212         inode_unlock(dir);
2213         return dentry;
2214 }
2215
2216 static int __pcc_inode_create(struct pcc_dataset *dataset,
2217                               struct lu_fid *fid,
2218                               struct dentry **dentry)
2219 {
2220         char *path;
2221         struct dentry *base;
2222         struct dentry *child;
2223         int rc = 0;
2224
2225         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
2226         if (path == NULL)
2227                 return -ENOMEM;
2228
2229         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
2230
2231         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2232         if (IS_ERR(base)) {
2233                 rc = PTR_ERR(base);
2234                 GOTO(out, rc);
2235         }
2236
2237         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
2238         child = pcc_create(base, path, 0);
2239         if (IS_ERR(child)) {
2240                 rc = PTR_ERR(child);
2241                 GOTO(out_base, rc);
2242         }
2243         *dentry = child;
2244
2245 out_base:
2246         dput(base);
2247 out:
2248         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
2249         return rc;
2250 }
2251
2252 /*
2253  * Reset uid, gid or size for the PCC copy masked by @valid.
2254  * TODO: Set the project ID for PCC copy.
2255  */
2256 int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2257                           kuid_t uid, kgid_t gid, loff_t size)
2258 {
2259         struct inode *inode = dentry->d_inode;
2260         struct iattr attr;
2261         int rc;
2262
2263         ENTRY;
2264
2265         attr.ia_valid = valid;
2266         attr.ia_uid = uid;
2267         attr.ia_gid = gid;
2268         attr.ia_size = size;
2269
2270         inode_lock(inode);
2271         rc = notify_change(dentry, &attr, NULL);
2272         inode_unlock(inode);
2273
2274         RETURN(rc);
2275 }
2276
2277 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2278                      struct lu_fid *fid, struct dentry **pcc_dentry)
2279 {
2280         const struct cred *old_cred;
2281         int rc;
2282
2283         old_cred = override_creds(pcc_super_cred(sb));
2284         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2285         revert_creds(old_cred);
2286         return rc;
2287 }
2288
2289 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2290 {
2291         struct dentry *pcc_dentry = pca->pca_dentry;
2292         struct pcc_super *super = ll_i2pccs(inode);
2293         const struct cred *old_cred;
2294         struct pcc_inode *pcci;
2295         int rc;
2296
2297         ENTRY;
2298
2299         if (!pca->pca_dataset)
2300                 RETURN(0);
2301
2302         if (!inode)
2303                 GOTO(out_dataset_put, rc = 0);
2304
2305         LASSERT(pcc_dentry);
2306
2307         old_cred = override_creds(super->pccs_cred);
2308         pcc_inode_lock(inode);
2309         LASSERT(ll_i2pcci(inode) == NULL);
2310         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2311         if (pcci == NULL)
2312                 GOTO(out_put, rc = -ENOMEM);
2313
2314         rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2315                                    old_cred->suid, old_cred->sgid, 0);
2316         if (rc)
2317                 GOTO(out_put, rc);
2318
2319         pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2320                              pcci, pcc_dentry, LU_PCC_READWRITE);
2321
2322         rc = pcc_layout_xattr_set(pcci, 0);
2323         if (rc) {
2324                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2325                 pcc_inode_put(pcci);
2326                 GOTO(out_unlock, rc);
2327         }
2328
2329         /* Set the layout generation of newly created file with 0 */
2330         pcc_layout_gen_set(pcci, 0);
2331
2332 out_put:
2333         if (rc) {
2334                 (void) pcc_inode_remove(inode, pcc_dentry);
2335                 dput(pcc_dentry);
2336
2337                 if (pcci)
2338                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2339         }
2340 out_unlock:
2341         pcc_inode_unlock(inode);
2342         revert_creds(old_cred);
2343 out_dataset_put:
2344         pcc_dataset_put(pca->pca_dataset);
2345         RETURN(rc);
2346 }
2347
2348 void pcc_create_attach_cleanup(struct super_block *sb,
2349                                struct pcc_create_attach *pca)
2350 {
2351         if (!pca->pca_dataset)
2352                 return;
2353
2354         if (pca->pca_dentry) {
2355                 const struct cred *old_cred;
2356                 int rc;
2357
2358                 old_cred = override_creds(pcc_super_cred(sb));
2359                 rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
2360                                    pca->pca_dentry);
2361                 if (rc)
2362                         CWARN("failed to unlink PCC file %.*s, rc = %d\n",
2363                               pca->pca_dentry->d_name.len,
2364                               pca->pca_dentry->d_name.name, rc);
2365                 /* ignore the unlink failure */
2366                 revert_creds(old_cred);
2367                 dput(pca->pca_dentry);
2368         }
2369
2370         pcc_dataset_put(pca->pca_dataset);
2371 }
2372
2373 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2374                           loff_t *offset)
2375 {
2376         while (count > 0) {
2377                 ssize_t size;
2378
2379                 size = cfs_kernel_write(filp, buf, count, offset);
2380                 if (size < 0)
2381                         return size;
2382                 count -= size;
2383                 buf += size;
2384         }
2385         return 0;
2386 }
2387
2388 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2389 {
2390         ssize_t rc = 0;
2391         ssize_t rc2;
2392         loff_t pos, offset = 0;
2393         size_t buf_len = 1048576;
2394         void *buf;
2395
2396         ENTRY;
2397
2398         OBD_ALLOC_LARGE(buf, buf_len);
2399         if (buf == NULL)
2400                 RETURN(-ENOMEM);
2401
2402         while (1) {
2403                 if (signal_pending(current))
2404                         GOTO(out_free, rc = -EINTR);
2405
2406                 pos = offset;
2407                 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2408                 if (rc2 < 0)
2409                         GOTO(out_free, rc = rc2);
2410                 else if (rc2 == 0)
2411                         break;
2412
2413                 pos = offset;
2414                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2415                 if (rc < 0)
2416                         GOTO(out_free, rc);
2417                 offset += rc2;
2418         }
2419
2420         rc = offset;
2421 out_free:
2422         OBD_FREE_LARGE(buf, buf_len);
2423         RETURN(rc);
2424 }
2425
2426 static int pcc_attach_allowed_check(struct inode *inode)
2427 {
2428         struct ll_inode_info *lli = ll_i2info(inode);
2429         struct pcc_inode *pcci;
2430         int rc = 0;
2431
2432         ENTRY;
2433
2434         pcc_inode_lock(inode);
2435         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2436                 GOTO(out_unlock, rc = -EBUSY);
2437
2438         pcci = ll_i2pcci(inode);
2439         if (pcci && pcc_inode_has_layout(pcci))
2440                 GOTO(out_unlock, rc = -EEXIST);
2441
2442         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2443 out_unlock:
2444         pcc_inode_unlock(inode);
2445         RETURN(rc);
2446 }
2447
2448 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2449                          __u32 archive_id)
2450 {
2451         struct pcc_dataset *dataset;
2452         struct ll_inode_info *lli = ll_i2info(inode);
2453         struct pcc_super *super = ll_i2pccs(inode);
2454         struct pcc_inode *pcci;
2455         const struct cred *old_cred;
2456         struct dentry *dentry;
2457         struct file *pcc_filp;
2458         struct path path;
2459         ssize_t ret;
2460         int rc;
2461
2462         ENTRY;
2463
2464         rc = pcc_attach_allowed_check(inode);
2465         if (rc)
2466                 RETURN(rc);
2467
2468         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2469                                   LU_PCC_READWRITE, archive_id);
2470         if (dataset == NULL)
2471                 RETURN(-ENOENT);
2472
2473         old_cred = override_creds(super->pccs_cred);
2474         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2475         if (rc)
2476                 GOTO(out_dataset_put, rc);
2477
2478         path.mnt = dataset->pccd_path.mnt;
2479         path.dentry = dentry;
2480 #ifdef HAVE_DENTRY_OPEN_USE_PATH
2481         pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2482 #else
2483         pcc_filp = dentry_open(path.dentry, path.mnt, O_WRONLY | O_LARGEFILE,
2484                                current_cred());
2485 #endif
2486         if (IS_ERR_OR_NULL(pcc_filp)) {
2487                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2488                 GOTO(out_dentry, rc);
2489         }
2490
2491         rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
2492                                    old_cred->uid, old_cred->gid, 0);
2493         if (rc)
2494                 GOTO(out_fput, rc);
2495
2496         ret = pcc_copy_data(file, pcc_filp);
2497         if (ret < 0)
2498                 GOTO(out_fput, rc = ret);
2499
2500         /*
2501          * It must to truncate the PCC copy to the same size of the Lustre
2502          * copy after copy data. Otherwise, it may get wrong file size after
2503          * re-attach a file. See LU-13023 for details.
2504          */
2505         rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
2506                                    KGIDT_INIT(0), ret);
2507         if (rc)
2508                 GOTO(out_fput, rc);
2509
2510         /* Pause to allow for a race with concurrent HSM remove */
2511         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2512
2513         pcc_inode_lock(inode);
2514         pcci = ll_i2pcci(inode);
2515         LASSERT(!pcci);
2516         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2517         if (pcci == NULL)
2518                 GOTO(out_unlock, rc = -ENOMEM);
2519
2520         pcc_inode_attach_set(super, dataset, lli, pcci,
2521                              dentry, LU_PCC_READWRITE);
2522 out_unlock:
2523         pcc_inode_unlock(inode);
2524 out_fput:
2525         fput(pcc_filp);
2526 out_dentry:
2527         if (rc) {
2528                 (void) pcc_inode_remove(inode, dentry);
2529                 dput(dentry);
2530         }
2531 out_dataset_put:
2532         pcc_dataset_put(dataset);
2533         revert_creds(old_cred);
2534
2535         RETURN(rc);
2536 }
2537
2538 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2539                               __u32 gen, bool lease_broken, int rc,
2540                               bool attached)
2541 {
2542         struct ll_inode_info *lli = ll_i2info(inode);
2543         const struct cred *old_cred;
2544         struct pcc_inode *pcci;
2545         __u32 gen2;
2546
2547         ENTRY;
2548
2549         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2550         pcc_inode_lock(inode);
2551         pcci = ll_i2pcci(inode);
2552         if (rc || lease_broken) {
2553                 if (attached && pcci)
2554                         pcc_inode_put(pcci);
2555
2556                 GOTO(out_unlock, rc);
2557         }
2558
2559         /* PCC inode may be released due to layout lock revocatioin */
2560         if (!pcci)
2561                 GOTO(out_unlock, rc = -ESTALE);
2562
2563         LASSERT(attached);
2564         rc = pcc_layout_xattr_set(pcci, gen);
2565         if (rc)
2566                 GOTO(out_put, rc);
2567
2568         LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2569         rc = ll_layout_refresh(inode, &gen2);
2570         if (!rc) {
2571                 if (gen2 == gen) {
2572                         pcc_layout_gen_set(pcci, gen);
2573                 } else {
2574                         CDEBUG(D_CACHE,
2575                                DFID" layout changed from %d to %d.\n",
2576                                PFID(ll_inode2fid(inode)), gen, gen2);
2577                         GOTO(out_put, rc = -ESTALE);
2578                 }
2579         }
2580
2581 out_put:
2582         if (rc) {
2583                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2584                 pcc_inode_put(pcci);
2585         }
2586 out_unlock:
2587         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2588         pcc_inode_unlock(inode);
2589         revert_creds(old_cred);
2590         RETURN(rc);
2591 }
2592
2593 static int pcc_hsm_remove(struct inode *inode)
2594 {
2595         struct hsm_user_request *hur;
2596         __u32 gen;
2597         int len;
2598         int rc;
2599
2600         ENTRY;
2601
2602         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2603         if (rc) {
2604                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2605                        PFID(&ll_i2info(inode)->lli_fid), rc);
2606                 RETURN(rc);
2607         }
2608
2609         ll_layout_refresh(inode, &gen);
2610
2611         len = sizeof(struct hsm_user_request) +
2612               sizeof(struct hsm_user_item);
2613         OBD_ALLOC(hur, len);
2614         if (hur == NULL)
2615                 RETURN(-ENOMEM);
2616
2617         hur->hur_request.hr_action = HUA_REMOVE;
2618         hur->hur_request.hr_archive_id = 0;
2619         hur->hur_request.hr_flags = 0;
2620         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2621                sizeof(hur->hur_user_item[0].hui_fid));
2622         hur->hur_user_item[0].hui_extent.offset = 0;
2623         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2624         hur->hur_request.hr_itemcount = 1;
2625         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2626                            len, hur, NULL);
2627         if (rc)
2628                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2629                        PFID(&ll_i2info(inode)->lli_fid), rc);
2630
2631         OBD_FREE(hur, len);
2632         RETURN(rc);
2633 }
2634
2635 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2636 {
2637         struct ll_inode_info *lli = ll_i2info(inode);
2638         struct pcc_inode *pcci;
2639         bool hsm_remove = false;
2640         int rc = 0;
2641
2642         ENTRY;
2643
2644         pcc_inode_lock(inode);
2645         pcci = lli->lli_pcc_inode;
2646         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2647             !pcc_inode_has_layout(pcci))
2648                 GOTO(out_unlock, rc = 0);
2649
2650         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2651
2652         if (pcci->pcci_type == LU_PCC_READWRITE) {
2653                 if (opt == PCC_DETACH_OPT_UNCACHE) {
2654                         hsm_remove = true;
2655                         /*
2656                          * The file will be removed from PCC, set the flags
2657                          * with PCC_DATASET_NONE even the later removal of the
2658                          * PCC copy fails.
2659                          */
2660                         lli->lli_pcc_dsflags = PCC_DATASET_NONE;
2661                 }
2662
2663                 __pcc_layout_invalidate(pcci);
2664                 pcc_inode_put(pcci);
2665         }
2666
2667 out_unlock:
2668         pcc_inode_unlock(inode);
2669         if (hsm_remove) {
2670                 const struct cred *old_cred;
2671
2672                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2673                 rc = pcc_hsm_remove(inode);
2674                 revert_creds(old_cred);
2675         }
2676
2677         RETURN(rc);
2678 }
2679
2680 int pcc_ioctl_state(struct file *file, struct inode *inode,
2681                     struct lu_pcc_state *state)
2682 {
2683         int rc = 0;
2684         int count;
2685         char *buf;
2686         char *path;
2687         int buf_len = sizeof(state->pccs_path);
2688         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2689         struct pcc_file *pccf = &fd->fd_pcc_file;
2690         struct pcc_inode *pcci;
2691
2692         ENTRY;
2693
2694         if (buf_len <= 0)
2695                 RETURN(-EINVAL);
2696
2697         OBD_ALLOC(buf, buf_len);
2698         if (buf == NULL)
2699                 RETURN(-ENOMEM);
2700
2701         pcc_inode_lock(inode);
2702         pcci = ll_i2pcci(inode);
2703         if (pcci == NULL) {
2704                 state->pccs_type = LU_PCC_NONE;
2705                 GOTO(out_unlock, rc = 0);
2706         }
2707
2708         count = atomic_read(&pcci->pcci_refcount);
2709         if (count == 0) {
2710                 state->pccs_type = LU_PCC_NONE;
2711                 state->pccs_open_count = 0;
2712                 GOTO(out_unlock, rc = 0);
2713         }
2714
2715         if (pcc_inode_has_layout(pcci))
2716                 count--;
2717         if (pccf->pccf_file != NULL)
2718                 count--;
2719         state->pccs_type = pcci->pcci_type;
2720         state->pccs_open_count = count;
2721         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2722         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2723         if (IS_ERR(path))
2724                 GOTO(out_unlock, rc = PTR_ERR(path));
2725
2726         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2727                 GOTO(out_unlock, rc = -ENAMETOOLONG);
2728
2729 out_unlock:
2730         pcc_inode_unlock(inode);
2731         OBD_FREE(buf, buf_len);
2732         RETURN(rc);
2733 }