Whamcloud - gitweb
LU-15535 llite: deadlock on lli_lsm_sem
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127         super->pccs_generation = 1;
128
129         return 0;
130 }
131
132 /* Rule based auto caching */
133 static void pcc_id_list_free(struct list_head *id_list)
134 {
135         struct pcc_match_id *id, *n;
136
137         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
138                 list_del_init(&id->pmi_linkage);
139                 OBD_FREE_PTR(id);
140         }
141 }
142
143 static void pcc_fname_list_free(struct list_head *fname_list)
144 {
145         struct pcc_match_fname *fname, *n;
146
147         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
148                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
149                 list_del_init(&fname->pmf_linkage);
150                 OBD_FREE_PTR(fname);
151         }
152 }
153
154 static void pcc_expression_free(struct pcc_expression *expr)
155 {
156         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
157                 expr->pe_field < PCC_FIELD_MAX);
158         switch (expr->pe_field) {
159         case PCC_FIELD_UID:
160         case PCC_FIELD_GID:
161         case PCC_FIELD_PROJID:
162                 pcc_id_list_free(&expr->pe_cond);
163                 break;
164         case PCC_FIELD_FNAME:
165                 pcc_fname_list_free(&expr->pe_cond);
166                 break;
167         default:
168                 LBUG();
169         }
170         OBD_FREE_PTR(expr);
171 }
172
173 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
174 {
175         struct pcc_expression *expression, *n;
176
177         LASSERT(list_empty(&conjunction->pc_linkage));
178         list_for_each_entry_safe(expression, n,
179                                  &conjunction->pc_expressions,
180                                  pe_linkage) {
181                 list_del_init(&expression->pe_linkage);
182                 pcc_expression_free(expression);
183         }
184         OBD_FREE_PTR(conjunction);
185 }
186
187 static void pcc_rule_conds_free(struct list_head *cond_list)
188 {
189         struct pcc_conjunction *conjunction, *n;
190
191         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
192                 list_del_init(&conjunction->pc_linkage);
193                 pcc_conjunction_free(conjunction);
194         }
195 }
196
197 static void pcc_cmd_fini(struct pcc_cmd *cmd)
198 {
199         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
200                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
201                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
202                 if (cmd->u.pccc_add.pccc_conds_str)
203                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
204                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
205         }
206 }
207
208 #define PCC_DISJUNCTION_DELIM   (",")
209 #define PCC_CONJUNCTION_DELIM   ("&")
210 #define PCC_EXPRESSION_DELIM    ("=")
211
212 static int
213 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
214 {
215         struct pcc_match_fname *fname;
216
217         OBD_ALLOC_PTR(fname);
218         if (fname == NULL)
219                 return -ENOMEM;
220
221         OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
222         if (fname->pmf_name == NULL) {
223                 OBD_FREE_PTR(fname);
224                 return -ENOMEM;
225         }
226
227         memcpy(fname->pmf_name, id->ls_str, id->ls_len);
228         list_add_tail(&fname->pmf_linkage, fname_list);
229         return 0;
230 }
231
232 static int
233 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
234 {
235         struct cfs_lstr src;
236         struct cfs_lstr res;
237         int rc = 0;
238
239         ENTRY;
240
241         src.ls_str = str;
242         src.ls_len = len;
243         INIT_LIST_HEAD(fname_list);
244         while (src.ls_str) {
245                 rc = cfs_gettok(&src, ' ', &res);
246                 if (rc == 0) {
247                         rc = -EINVAL;
248                         break;
249                 }
250                 rc = pcc_fname_list_add(&res, fname_list);
251                 if (rc)
252                         break;
253         }
254         if (rc)
255                 pcc_fname_list_free(fname_list);
256         RETURN(rc);
257 }
258
259 static int
260 pcc_id_list_parse(char *str, struct list_head *id_list,
261                   enum pcc_field type)
262 {
263         int rc = 0;
264
265         ENTRY;
266
267         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
268             type != PCC_FIELD_PROJID)
269                 RETURN(-EINVAL);
270
271         INIT_LIST_HEAD(id_list);
272         while (str) {
273                 char *num;
274                 struct pcc_match_id *id;
275                 unsigned long id_val;
276
277                 num = strsep(&str, " ");
278                 if (!*num)
279                         continue;
280                 rc = kstrtoul(num, 0, &id_val);
281                 if (rc)
282                         GOTO(out, rc);
283
284                 OBD_ALLOC_PTR(id);
285                 if (id == NULL)
286                         GOTO(out, rc = -ENOMEM);
287
288                 id->pmi_id = id_val;
289                 list_add_tail(&id->pmi_linkage, id_list);
290         }
291         if (list_empty(id_list))
292                 rc = -EINVAL;
293 out:
294         if (rc)
295                 pcc_id_list_free(id_list);
296         RETURN(rc);
297 }
298
299 static int
300 pcc_expression_parse(char *str, struct list_head *cond_list)
301 {
302         struct pcc_expression *expr;
303         char *field;
304         int len;
305         int rc = 0;
306
307         OBD_ALLOC_PTR(expr);
308         if (expr == NULL)
309                 return -ENOMEM;
310
311         field = strim(strsep(&str, PCC_EXPRESSION_DELIM));
312         if (!*field || !str)
313                 /* No LHS or no '=' */
314                 GOTO(out, rc = -EINVAL);
315         str = skip_spaces(str);
316         len = strlen(str);
317         if (str[0] != '{' || str[len - 1] != '}')
318                 GOTO(out, rc = -EINVAL);
319
320         /* Skip '{' and '}' */
321         str[len - 1] = '\0';
322         str += 1;
323         len -= 2;
324
325         if (strcmp(field, "uid") == 0) {
326                 if (pcc_id_list_parse(str,
327                                       &expr->pe_cond,
328                                       PCC_FIELD_UID) < 0)
329                         GOTO(out, rc = -EINVAL);
330                 expr->pe_field = PCC_FIELD_UID;
331         } else if (strcmp(field, "gid") == 0) {
332                 if (pcc_id_list_parse(str,
333                                       &expr->pe_cond,
334                                       PCC_FIELD_GID) < 0)
335                         GOTO(out, rc = -EINVAL);
336                 expr->pe_field = PCC_FIELD_GID;
337         } else if (strcmp(field, "projid") == 0) {
338                 if (pcc_id_list_parse(str,
339                                       &expr->pe_cond,
340                                       PCC_FIELD_PROJID) < 0)
341                         GOTO(out, rc = -EINVAL);
342                 expr->pe_field = PCC_FIELD_PROJID;
343         } else if (strcmp(field, "fname") == 0) {
344                 if (pcc_fname_list_parse(str, len,
345                                          &expr->pe_cond) < 0)
346                         GOTO(out, rc = -EINVAL);
347                 expr->pe_field = PCC_FIELD_FNAME;
348         } else {
349                 GOTO(out, rc = -EINVAL);
350         }
351
352         list_add_tail(&expr->pe_linkage, cond_list);
353         return 0;
354 out:
355         OBD_FREE_PTR(expr);
356         return rc;
357 }
358
359 static int
360 pcc_conjunction_parse(char *str, struct list_head *cond_list)
361 {
362         struct pcc_conjunction *conjunction;
363         int rc = 0;
364
365         OBD_ALLOC_PTR(conjunction);
366         if (conjunction == NULL)
367                 return -ENOMEM;
368
369         INIT_LIST_HEAD(&conjunction->pc_expressions);
370         list_add_tail(&conjunction->pc_linkage, cond_list);
371
372         while (rc == 0 && str) {
373                 char *expr = strsep(&str, PCC_CONJUNCTION_DELIM);
374
375                 rc = pcc_expression_parse(expr, &conjunction->pc_expressions);
376         }
377         return rc;
378 }
379
380 static int pcc_conds_parse(char *orig, struct list_head *cond_list)
381 {
382         char *str;
383         int rc = 0;
384
385         orig = kstrdup(orig, GFP_KERNEL);
386         if (!orig)
387                 return -ENOMEM;
388         str = orig;
389
390         INIT_LIST_HEAD(cond_list);
391         while (rc == 0 && str) {
392                 char *term = strsep(&str, PCC_DISJUNCTION_DELIM);
393
394                 rc = pcc_conjunction_parse(term, cond_list);
395         }
396         kfree(orig);
397         return rc;
398 }
399
400 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
401 {
402         int rc;
403
404         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
405         if (cmd->u.pccc_add.pccc_conds_str == NULL)
406                 return -ENOMEM;
407
408         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
409
410         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
411                              &cmd->u.pccc_add.pccc_conds);
412         if (rc)
413                 pcc_cmd_fini(cmd);
414
415         return rc;
416 }
417
418 static int
419 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
420 {
421         char *key, *val;
422         unsigned long id;
423         int rc;
424
425         val = buffer;
426         key = strsep(&val, "=");
427         if (val == NULL || strlen(val) == 0)
428                 return -EINVAL;
429
430         /* Key of the value pair */
431         if (strcmp(key, "rwid") == 0) {
432                 rc = kstrtoul(val, 10, &id);
433                 if (rc)
434                         return rc;
435                 if (id <= 0)
436                         return -EINVAL;
437                 cmd->u.pccc_add.pccc_rwid = id;
438         } else if (strcmp(key, "roid") == 0) {
439                 rc = kstrtoul(val, 10, &id);
440                 if (rc)
441                         return rc;
442                 if (id <= 0)
443                         return -EINVAL;
444                 cmd->u.pccc_add.pccc_roid = id;
445         } else if (strcmp(key, "auto_attach") == 0) {
446                 rc = kstrtoul(val, 10, &id);
447                 if (rc)
448                         return rc;
449                 if (id == 0)
450                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
451         } else if (strcmp(key, "open_attach") == 0) {
452                 rc = kstrtoul(val, 10, &id);
453                 if (rc)
454                         return rc;
455                 if (id == 0)
456                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
457         } else if (strcmp(key, "io_attach") == 0) {
458                 rc = kstrtoul(val, 10, &id);
459                 if (rc)
460                         return rc;
461                 if (id == 0)
462                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
463         } else if (strcmp(key, "stat_attach") == 0) {
464                 rc = kstrtoul(val, 10, &id);
465                 if (rc)
466                         return rc;
467                 if (id == 0)
468                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
469         } else if (strcmp(key, "rwpcc") == 0) {
470                 rc = kstrtoul(val, 10, &id);
471                 if (rc)
472                         return rc;
473                 if (id > 0)
474                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
475         } else if (strcmp(key, "ropcc") == 0) {
476                 rc = kstrtoul(val, 10, &id);
477                 if (rc)
478                         return rc;
479                 if (id > 0)
480                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
481         } else {
482                 return -EINVAL;
483         }
484
485         return 0;
486 }
487
488 static int
489 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
490 {
491         char *val;
492         char *token;
493         int rc;
494
495         switch (cmd->pccc_cmd) {
496         case PCC_ADD_DATASET:
497                 /* Enable auto attach by default */
498                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
499                 break;
500         case PCC_DEL_DATASET:
501         case PCC_CLEAR_ALL:
502                 break;
503         default:
504                 return -EINVAL;
505         }
506
507         val = buffer;
508         while (val != NULL && strlen(val) != 0) {
509                 token = strsep(&val, " ");
510                 rc = pcc_parse_value_pair(cmd, token);
511                 if (rc)
512                         return rc;
513         }
514
515         switch (cmd->pccc_cmd) {
516         case PCC_ADD_DATASET:
517                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
518                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
519                         return -EINVAL;
520                 /*
521                  * By default, a PCC backend can provide caching service for
522                  * both RW-PCC and RO-PCC.
523                  */
524                 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
525                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
526
527                 /* For RW-PCC, the value of @rwid must be non zero. */
528                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
529                     cmd->u.pccc_add.pccc_rwid == 0)
530                         return -EINVAL;
531
532                 break;
533         case PCC_DEL_DATASET:
534         case PCC_CLEAR_ALL:
535                 break;
536         default:
537                 return -EINVAL;
538         }
539         return 0;
540 }
541
542 static void
543 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
544 {
545         if (!list_empty(&rule->pmr_conds))
546                 pcc_rule_conds_free(&rule->pmr_conds);
547         LASSERT(rule->pmr_conds_str != NULL);
548         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
549 }
550
551 static int
552 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
553 {
554         int rc = 0;
555
556         LASSERT(cmd->u.pccc_add.pccc_conds_str);
557         OBD_ALLOC(rule->pmr_conds_str,
558                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
559         if (rule->pmr_conds_str == NULL)
560                 return -ENOMEM;
561
562         memcpy(rule->pmr_conds_str,
563                cmd->u.pccc_add.pccc_conds_str,
564                strlen(cmd->u.pccc_add.pccc_conds_str));
565
566         INIT_LIST_HEAD(&rule->pmr_conds);
567         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
568                 rc = pcc_conds_parse(rule->pmr_conds_str,
569                                      &rule->pmr_conds);
570
571         if (rc)
572                 pcc_dataset_rule_fini(rule);
573
574         return rc;
575 }
576
577 /* Rule Matching */
578 static int
579 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
580 {
581         struct pcc_match_id *id;
582
583         list_for_each_entry(id, id_list, pmi_linkage) {
584                 if (id->pmi_id == id_val)
585                         return 1;
586         }
587         return 0;
588 }
589
590 static bool
591 cfs_match_wildcard(const char *pattern, const char *content)
592 {
593         if (*pattern == '\0' && *content == '\0')
594                 return true;
595
596         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
597                 return false;
598
599         while (*pattern == *content) {
600                 pattern++;
601                 content++;
602                 if (*pattern == '\0' && *content == '\0')
603                         return true;
604
605                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
606                     *content == '\0')
607                         return false;
608         }
609
610         if (*pattern == '*')
611                 return (cfs_match_wildcard(pattern + 1, content) ||
612                         cfs_match_wildcard(pattern, content + 1));
613
614         return false;
615 }
616
617 static int
618 pcc_fname_list_match(struct list_head *fname_list, const char *name)
619 {
620         struct pcc_match_fname *fname;
621
622         list_for_each_entry(fname, fname_list, pmf_linkage) {
623                 if (cfs_match_wildcard(fname->pmf_name, name))
624                         return 1;
625         }
626         return 0;
627 }
628
629 static int
630 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
631 {
632         switch (expr->pe_field) {
633         case PCC_FIELD_UID:
634                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
635         case PCC_FIELD_GID:
636                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
637         case PCC_FIELD_PROJID:
638                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
639         case PCC_FIELD_FNAME:
640                 return pcc_fname_list_match(&expr->pe_cond,
641                                             matcher->pm_name->name);
642         default:
643                 return 0;
644         }
645 }
646
647 static int
648 pcc_conjunction_match(struct pcc_conjunction *conjunction,
649                       struct pcc_matcher *matcher)
650 {
651         struct pcc_expression *expr;
652         int matched;
653
654         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
655                 matched = pcc_expression_match(expr, matcher);
656                 if (!matched)
657                         return 0;
658         }
659
660         return 1;
661 }
662
663 static int
664 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
665 {
666         struct pcc_conjunction *conjunction;
667         int matched;
668
669         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
670                 matched = pcc_conjunction_match(conjunction, matcher);
671                 if (matched)
672                         return 1;
673         }
674
675         return 0;
676 }
677
678 struct pcc_dataset*
679 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
680 {
681         struct pcc_dataset *dataset;
682         struct pcc_dataset *selected = NULL;
683
684         down_read(&super->pccs_rw_sem);
685         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
686                 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
687                         continue;
688
689                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
690                         atomic_inc(&dataset->pccd_refcount);
691                         selected = dataset;
692                         break;
693                 }
694         }
695         up_read(&super->pccs_rw_sem);
696         if (selected)
697                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
698                        dataset->pccd_rule.pmr_conds_str,
699                        matcher->pm_uid, matcher->pm_gid,
700                        matcher->pm_projid, matcher->pm_name->name);
701
702         return selected;
703 }
704
705 /**
706  * pcc_dataset_add - Add a Cache policy to control which files need be
707  * cached and where it will be cached.
708  *
709  * @super:      superblock of pcc
710  * @cmd:        pcc command
711  */
712 static int
713 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
714 {
715         char *pathname = cmd->pccc_pathname;
716         struct pcc_dataset *dataset;
717         struct pcc_dataset *tmp;
718         bool found = false;
719         int rc;
720
721         OBD_ALLOC_PTR(dataset);
722         if (dataset == NULL)
723                 return -ENOMEM;
724
725         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
726         if (unlikely(rc)) {
727                 OBD_FREE_PTR(dataset);
728                 return rc;
729         }
730         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
731         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
732         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
733         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
734         atomic_set(&dataset->pccd_refcount, 1);
735
736         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
737         if (rc) {
738                 pcc_dataset_put(dataset);
739                 return rc;
740         }
741
742         down_write(&super->pccs_rw_sem);
743         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
744                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
745                     (dataset->pccd_rwid != 0 &&
746                      dataset->pccd_rwid == tmp->pccd_rwid) ||
747                     (dataset->pccd_roid != 0 &&
748                      dataset->pccd_roid == tmp->pccd_roid)) {
749                         found = true;
750                         break;
751                 }
752         }
753         if (!found)
754                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
755         up_write(&super->pccs_rw_sem);
756
757         if (found) {
758                 pcc_dataset_put(dataset);
759                 rc = -EEXIST;
760         }
761
762         return rc;
763 }
764
765 static struct pcc_dataset *
766 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
767 {
768         struct pcc_dataset *dataset;
769         struct pcc_dataset *selected = NULL;
770
771         if (id == 0)
772                 return NULL;
773
774         /*
775          * archive ID (read-write ID) or read-only ID is unique in the list,
776          * we just return last added one as first priority.
777          */
778         down_read(&super->pccs_rw_sem);
779         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
780                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
781                     !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
782                         continue;
783                 atomic_inc(&dataset->pccd_refcount);
784                 selected = dataset;
785                 break;
786         }
787         up_read(&super->pccs_rw_sem);
788         if (selected)
789                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
790
791         return selected;
792 }
793
794 void
795 pcc_dataset_put(struct pcc_dataset *dataset)
796 {
797         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
798                 pcc_dataset_rule_fini(&dataset->pccd_rule);
799                 path_put(&dataset->pccd_path);
800                 OBD_FREE_PTR(dataset);
801         }
802 }
803
804 static int
805 pcc_dataset_del(struct pcc_super *super, char *pathname)
806 {
807         struct list_head *l, *tmp;
808         struct pcc_dataset *dataset;
809         int rc = -ENOENT;
810
811         down_write(&super->pccs_rw_sem);
812         list_for_each_safe(l, tmp, &super->pccs_datasets) {
813                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
814                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
815                         list_del_init(&dataset->pccd_linkage);
816                         pcc_dataset_put(dataset);
817                         super->pccs_generation++;
818                         rc = 0;
819                         break;
820                 }
821         }
822         up_write(&super->pccs_rw_sem);
823         return rc;
824 }
825
826 static void
827 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
828 {
829         seq_printf(m, "%s:\n", dataset->pccd_pathname);
830         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
831         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
832         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
833 }
834
835 int
836 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
837 {
838         struct pcc_dataset *dataset;
839
840         down_read(&super->pccs_rw_sem);
841         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
842                 pcc_dataset_dump(dataset, m);
843         }
844         up_read(&super->pccs_rw_sem);
845         return 0;
846 }
847
848 static void pcc_remove_datasets(struct pcc_super *super)
849 {
850         struct pcc_dataset *dataset, *tmp;
851
852         down_write(&super->pccs_rw_sem);
853         list_for_each_entry_safe(dataset, tmp,
854                                  &super->pccs_datasets, pccd_linkage) {
855                 list_del(&dataset->pccd_linkage);
856                 pcc_dataset_put(dataset);
857         }
858         super->pccs_generation++;
859         up_write(&super->pccs_rw_sem);
860 }
861
862 void pcc_super_fini(struct pcc_super *super)
863 {
864         pcc_remove_datasets(super);
865         put_cred(super->pccs_cred);
866 }
867
868 static bool pathname_is_valid(const char *pathname)
869 {
870         /* Needs to be absolute path */
871         if (pathname == NULL || strlen(pathname) == 0 ||
872             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
873                 return false;
874         return true;
875 }
876
877 static struct pcc_cmd *
878 pcc_cmd_parse(char *buffer, unsigned long count)
879 {
880         static struct pcc_cmd *cmd;
881         char *token;
882         char *val;
883         int rc = 0;
884
885         OBD_ALLOC_PTR(cmd);
886         if (cmd == NULL)
887                 GOTO(out, rc = -ENOMEM);
888
889         /* clear all setting */
890         if (strncmp(buffer, "clear", 5) == 0) {
891                 cmd->pccc_cmd = PCC_CLEAR_ALL;
892                 GOTO(out, rc = 0);
893         }
894
895         val = buffer;
896         token = strsep(&val, " ");
897         if (val == NULL || strlen(val) == 0)
898                 GOTO(out_free_cmd, rc = -EINVAL);
899
900         /* Type of the command */
901         if (strcmp(token, "add") == 0)
902                 cmd->pccc_cmd = PCC_ADD_DATASET;
903         else if (strcmp(token, "del") == 0)
904                 cmd->pccc_cmd = PCC_DEL_DATASET;
905         else
906                 GOTO(out_free_cmd, rc = -EINVAL);
907
908         /* Pathname of the dataset */
909         token = strsep(&val, " ");
910         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
911             !pathname_is_valid(token))
912                 GOTO(out_free_cmd, rc = -EINVAL);
913         cmd->pccc_pathname = token;
914
915         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
916                 /* List of ID */
917                 LASSERT(val);
918                 token = val;
919                 val = strrchr(token, '}');
920                 if (!val)
921                         GOTO(out_free_cmd, rc = -EINVAL);
922
923                 /* Skip '}' */
924                 val++;
925                 if (*val == '\0') {
926                         val = NULL;
927                 } else if (*val == ' ') {
928                         *val = '\0';
929                         val++;
930                 } else {
931                         GOTO(out_free_cmd, rc = -EINVAL);
932                 }
933
934                 rc = pcc_id_parse(cmd, token);
935                 if (rc)
936                         GOTO(out_free_cmd, rc);
937
938                 rc = pcc_parse_value_pairs(cmd, val);
939                 if (rc)
940                         GOTO(out_cmd_fini, rc = -EINVAL);
941         }
942         goto out;
943 out_cmd_fini:
944         pcc_cmd_fini(cmd);
945 out_free_cmd:
946         OBD_FREE_PTR(cmd);
947 out:
948         if (rc)
949                 cmd = ERR_PTR(rc);
950         return cmd;
951 }
952
953 int pcc_cmd_handle(char *buffer, unsigned long count,
954                    struct pcc_super *super)
955 {
956         int rc = 0;
957         struct pcc_cmd *cmd;
958
959         cmd = pcc_cmd_parse(buffer, count);
960         if (IS_ERR(cmd))
961                 return PTR_ERR(cmd);
962
963         switch (cmd->pccc_cmd) {
964         case PCC_ADD_DATASET:
965                 rc = pcc_dataset_add(super, cmd);
966                 break;
967         case PCC_DEL_DATASET:
968                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
969                 break;
970         case PCC_CLEAR_ALL:
971                 pcc_remove_datasets(super);
972                 break;
973         default:
974                 rc = -EINVAL;
975                 break;
976         }
977
978         pcc_cmd_fini(cmd);
979         OBD_FREE_PTR(cmd);
980         return rc;
981 }
982
983 static inline void pcc_inode_lock(struct inode *inode)
984 {
985         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
986 }
987
988 static inline void pcc_inode_unlock(struct inode *inode)
989 {
990         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
991 }
992
993 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
994 {
995         pcci->pcci_lli = lli;
996         lli->lli_pcc_inode = pcci;
997         atomic_set(&pcci->pcci_refcount, 0);
998         pcci->pcci_type = LU_PCC_NONE;
999         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1000         atomic_set(&pcci->pcci_active_ios, 0);
1001         init_waitqueue_head(&pcci->pcci_waitq);
1002 }
1003
1004 static void pcc_inode_fini(struct pcc_inode *pcci)
1005 {
1006         struct ll_inode_info *lli = pcci->pcci_lli;
1007
1008         path_put(&pcci->pcci_path);
1009         pcci->pcci_type = LU_PCC_NONE;
1010         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1011         lli->lli_pcc_inode = NULL;
1012 }
1013
1014 static void pcc_inode_get(struct pcc_inode *pcci)
1015 {
1016         atomic_inc(&pcci->pcci_refcount);
1017 }
1018
1019 static void pcc_inode_put(struct pcc_inode *pcci)
1020 {
1021         if (atomic_dec_and_test(&pcci->pcci_refcount))
1022                 pcc_inode_fini(pcci);
1023 }
1024
1025 void pcc_inode_free(struct inode *inode)
1026 {
1027         struct pcc_inode *pcci = ll_i2pcci(inode);
1028
1029         if (pcci) {
1030                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1031                 pcc_inode_put(pcci);
1032         }
1033 }
1034
1035 /*
1036  * TODO:
1037  * As Andreas suggested, we'd better use new layout to
1038  * reduce overhead:
1039  * (fid->f_oid >> 16 & oxFFFF)/FID
1040  */
1041 #define PCC_DATASET_MAX_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1042 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1043 {
1044         return scnprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1045                          DFID_NOBRACE,
1046                          (fid)->f_oid       & 0xFFFF,
1047                          (fid)->f_oid >> 16 & 0xFFFF,
1048                          (unsigned int)((fid)->f_seq       & 0xFFFF),
1049                          (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1050                          (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1051                          (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1052                          PFID(fid));
1053 }
1054
1055 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1056 {
1057         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1058 }
1059
1060 void pcc_file_init(struct pcc_file *pccf)
1061 {
1062         pccf->pccf_file = NULL;
1063         pccf->pccf_type = LU_PCC_NONE;
1064 }
1065
1066 static inline bool pcc_auto_attach_enabled(enum pcc_dataset_flags flags,
1067                                            enum pcc_io_type iot)
1068 {
1069         if (iot == PIT_OPEN)
1070                 return flags & PCC_DATASET_OPEN_ATTACH;
1071         if (iot == PIT_GETATTR)
1072                 return flags & PCC_DATASET_STAT_ATTACH;
1073         else
1074                 return flags & PCC_DATASET_AUTO_ATTACH;
1075 }
1076
1077 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1078
1079 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1080 {
1081         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1082         struct ll_inode_info *lli = pcci->pcci_lli;
1083         int rc;
1084
1085         ENTRY;
1086
1087         if (!(lli->lli_pcc_dsflags & PCC_DATASET_AUTO_ATTACH))
1088                 RETURN(0);
1089
1090         rc = ll_vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1091                              &gen, sizeof(gen), 0);
1092
1093         RETURN(rc);
1094 }
1095
1096 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1097 {
1098         struct lu_env *env;
1099         struct ll_inode_info *lli = ll_i2info(inode);
1100         __u16 refcheck;
1101         int rc;
1102
1103         ENTRY;
1104
1105         if (!lli->lli_clob)
1106                 RETURN(-EINVAL);
1107
1108         env = cl_env_get(&refcheck);
1109         if (IS_ERR(env))
1110                 RETURN(PTR_ERR(env));
1111
1112         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1113         if (rc < 0)
1114                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1115                        PFID(ll_inode2fid(inode)));
1116
1117         cl_env_put(env, &refcheck);
1118         RETURN(rc < 0 ? rc : 0);
1119 }
1120
1121 /* Must be called with pcci->pcci_lock held */
1122 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1123                                   struct pcc_inode *pcci,
1124                                   struct dentry *dentry,
1125                                   enum lu_pcc_type type)
1126 {
1127         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1128         pcci->pcci_path.dentry = dentry;
1129         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1130         atomic_set(&pcci->pcci_refcount, 1);
1131         pcci->pcci_type = type;
1132         pcci->pcci_attr_valid = false;
1133 }
1134
1135 static inline void pcc_inode_dsflags_set(struct ll_inode_info *lli,
1136                                          struct pcc_dataset *dataset)
1137 {
1138         lli->lli_pcc_generation = ll_info2pccs(lli)->pccs_generation;
1139         lli->lli_pcc_dsflags = dataset->pccd_flags;
1140 }
1141
1142 static void pcc_inode_attach_set(struct pcc_super *super,
1143                                  struct pcc_dataset *dataset,
1144                                  struct ll_inode_info *lli,
1145                                  struct pcc_inode *pcci,
1146                                  struct dentry *dentry,
1147                                  enum lu_pcc_type type)
1148 {
1149         pcc_inode_init(pcci, lli);
1150         pcc_inode_attach_init(dataset, pcci, dentry, type);
1151         down_read(&super->pccs_rw_sem);
1152         pcc_inode_dsflags_set(lli, dataset);
1153         up_read(&super->pccs_rw_sem);
1154 }
1155
1156 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1157                                       __u32 gen)
1158 {
1159         pcci->pcci_layout_gen = gen;
1160 }
1161
1162 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1163 {
1164         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1165 }
1166
1167 static struct dentry *pcc_lookup(struct dentry *base, char *pathname)
1168 {
1169         char *ptr = NULL, *component;
1170         struct dentry *parent;
1171         struct dentry *child = ERR_PTR(-ENOENT);
1172
1173         ptr = pathname;
1174
1175         /* move past any initial '/' to the start of the first path component*/
1176         while (*ptr == '/')
1177                 ptr++;
1178
1179         /* store the start of the first path component */
1180         component = ptr;
1181
1182         parent = dget(base);
1183         while (ptr) {
1184                 /* find the start of the next component - if we don't find it,
1185                  * the current component is the last component
1186                  */
1187                 ptr = strchr(ptr, '/');
1188                 /* put a NUL char in place of the '/' before the next compnent
1189                  * so we can treat this component as a string; note the full
1190                  * path string is NUL terminated to this is not needed for the
1191                  * last component
1192                  */
1193                 if (ptr)
1194                         *ptr = '\0';
1195
1196                 /* look up the current component */
1197                 inode_lock(parent->d_inode);
1198                 child = lookup_one_len(component, parent, strlen(component));
1199                 inode_unlock(parent->d_inode);
1200
1201                 /* repair the path string: put '/' back in place of the NUL */
1202                 if (ptr)
1203                         *ptr = '/';
1204
1205                 dput(parent);
1206
1207                 if (IS_ERR_OR_NULL(child))
1208                         break;
1209
1210                 /* we may find a cached negative dentry */
1211                 if (!d_is_positive(child)) {
1212                         dput(child);
1213                         child = NULL;
1214                         break;
1215                 }
1216
1217                 /* descend in to the next level of the path */
1218                 parent = child;
1219
1220                 /* move the pointer past the '/' to the next component */
1221                 if (ptr)
1222                         ptr++;
1223                 component = ptr;
1224         }
1225
1226         /* NULL child means we didn't find anything */
1227         if (!child)
1228                 child = ERR_PTR(-ENOENT);
1229
1230         return child;
1231 }
1232
1233 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1234                                   enum lu_pcc_type type,
1235                                   struct pcc_dataset *dataset,
1236                                   bool *cached)
1237 {
1238         struct ll_inode_info *lli = ll_i2info(inode);
1239         struct pcc_inode *pcci = lli->lli_pcc_inode;
1240         const struct cred *old_cred;
1241         struct dentry *pcc_dentry = NULL;
1242         char pathname[PCC_DATASET_MAX_PATH];
1243         __u32 pcc_gen;
1244         int rc;
1245
1246         ENTRY;
1247
1248         if (type == LU_PCC_READWRITE &&
1249             !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1250                 RETURN(0);
1251
1252         rc = pcc_fid2dataset_path(pathname, PCC_DATASET_MAX_PATH,
1253                                   &lli->lli_fid);
1254
1255         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1256         pcc_dentry = pcc_lookup(dataset->pccd_path.dentry, pathname);
1257         if (IS_ERR(pcc_dentry)) {
1258                 rc = PTR_ERR(pcc_dentry);
1259                 CDEBUG(D_CACHE, "%s: path lookup error on "DFID":%s: rc = %d\n",
1260                        ll_i2sbi(inode)->ll_fsname, PFID(&lli->lli_fid),
1261                        pathname, rc);
1262                 /* ignore this error */
1263                 GOTO(out, rc = 0);
1264         }
1265
1266         rc = ll_vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1267                              &pcc_gen, sizeof(pcc_gen));
1268         if (rc < 0)
1269                 /* ignore this error */
1270                 GOTO(out_put_pcc_dentry, rc = 0);
1271
1272         rc = 0;
1273         /* The file is still valid cached in PCC, attach it immediately. */
1274         if (pcc_gen == gen) {
1275                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1276                        PFID(&lli->lli_fid), gen);
1277                 if (!pcci) {
1278                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1279                         if (pcci == NULL)
1280                                 GOTO(out_put_pcc_dentry, rc = -ENOMEM);
1281
1282                         pcc_inode_init(pcci, lli);
1283                         dget(pcc_dentry);
1284                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1285                 } else {
1286                         /*
1287                          * This happened when a file was once attached into
1288                          * PCC, and some processes keep this file opened
1289                          * (pcci->refcount > 1) and corresponding PCC file
1290                          * without any I/O activity, and then this file was
1291                          * detached by the manual detach command or the
1292                          * revocation of the layout lock (i.e. cached LRU lock
1293                          * shrinking).
1294                          */
1295                         pcc_inode_get(pcci);
1296                         pcci->pcci_type = type;
1297                 }
1298                 pcc_inode_dsflags_set(lli, dataset);
1299                 pcc_layout_gen_set(pcci, gen);
1300                 *cached = true;
1301         }
1302 out_put_pcc_dentry:
1303         dput(pcc_dentry);
1304 out:
1305         revert_creds(old_cred);
1306         RETURN(rc);
1307 }
1308
1309 static int pcc_try_datasets_attach(struct inode *inode, enum pcc_io_type iot,
1310                                    __u32 gen, enum lu_pcc_type type,
1311                                    bool *cached)
1312 {
1313         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1314         struct ll_inode_info *lli = ll_i2info(inode);
1315         struct pcc_dataset *dataset = NULL, *tmp;
1316         int rc = 0;
1317
1318         ENTRY;
1319
1320         down_read(&super->pccs_rw_sem);
1321         list_for_each_entry_safe(dataset, tmp,
1322                                  &super->pccs_datasets, pccd_linkage) {
1323                 if (!pcc_auto_attach_enabled(dataset->pccd_flags, iot))
1324                         break;
1325
1326                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1327                 if (rc < 0 || (!rc && *cached))
1328                         break;
1329         }
1330
1331         /*
1332          * Update the saved dataset flags for the inode accordingly if failed.
1333          */
1334         if (!rc && !*cached) {
1335                 /*
1336                  * Currently auto attach strategy for a PCC backend is
1337                  * unchangeable once once it was added into the PCC datasets on
1338                  * a client as the support to change auto attach strategy is
1339                  * not implemented yet.
1340                  */
1341                 /*
1342                  * If tried to attach from one PCC backend:
1343                  * @lli_pcc_generation > 0:
1344                  * 1) The file was once attached into PCC, but now the
1345                  * corresponding PCC backend should be removed from the client;
1346                  * 2) The layout generation was changed, the data has been
1347                  * restored;
1348                  * 3) The corresponding PCC copy is not existed on PCC
1349                  * @lli_pcc_generation == 0:
1350                  * The file is never attached into PCC but in a HSM released
1351                  * state, or once attached into PCC but the inode was evicted
1352                  * from icache later.
1353                  * Set the saved dataset flags with PCC_DATASET_NONE. Then this
1354                  * file will skip from the candidates to try auto attach until
1355                  * the file is attached into PCC again.
1356                  *
1357                  * If the file was never attached into PCC, or once attached but
1358                  * its inode was evicted from icache (lli_pcc_generation == 0),
1359                  * or the corresponding dataset was removed from the client,
1360                  * set the saved dataset flags with PCC_DATASET_NONE.
1361                  *
1362                  * TODO: If the file was once attached into PCC but not try to
1363                  * auto attach due to the change of the configuration parameters
1364                  * for this dataset (i.e. change from auto attach enabled to
1365                  * auto attach disabled for this dataset), update the saved
1366                  * dataset flags with the found one.
1367                  */
1368                 lli->lli_pcc_dsflags = PCC_DATASET_NONE;
1369         }
1370         up_read(&super->pccs_rw_sem);
1371
1372         RETURN(rc);
1373 }
1374
1375 /*
1376  * TODO: For RW-PCC, it is desirable to store HSM info as a layout (LU-10606).
1377  * Thus the client can get archive ID from the layout directly. When try to
1378  * attach the file automatically which is in HSM released state (according to
1379  * LOV_PATTERN_F_RELEASED in the layout), it can determine whether the file is
1380  * valid cached on PCC more precisely according to the @rwid (archive ID) in
1381  * the PCC dataset and the archive ID in HSM attrs.
1382  */
1383 static int pcc_try_auto_attach(struct inode *inode, bool *cached,
1384                                enum pcc_io_type iot)
1385 {
1386         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1387         struct cl_layout clt = {
1388                 .cl_layout_gen = 0,
1389                 .cl_is_released = false,
1390         };
1391         struct ll_inode_info *lli = ll_i2info(inode);
1392         __u32 gen;
1393         int rc;
1394
1395         ENTRY;
1396
1397         /*
1398          * Quick check whether there is PCC device.
1399          */
1400         if (list_empty(&super->pccs_datasets))
1401                 RETURN(0);
1402
1403         /*
1404          * The file layout lock was cancelled. And this open does not
1405          * obtain valid layout lock from MDT (i.e. the file is being
1406          * HSM restoring).
1407          */
1408         if (iot == PIT_OPEN) {
1409                 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1410                         RETURN(0);
1411         } else {
1412                 rc = ll_layout_refresh(inode, &gen);
1413                 if (rc)
1414                         RETURN(rc);
1415         }
1416
1417         rc = pcc_get_layout_info(inode, &clt);
1418         if (rc)
1419                 RETURN(rc);
1420
1421         if (iot != PIT_OPEN && gen != clt.cl_layout_gen) {
1422                 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1423                        PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1424                 RETURN(-EINVAL);
1425         }
1426
1427         if (clt.cl_is_released)
1428                 rc = pcc_try_datasets_attach(inode, iot, clt.cl_layout_gen,
1429                                              LU_PCC_READWRITE, cached);
1430
1431         RETURN(rc);
1432 }
1433
1434 static inline bool pcc_may_auto_attach(struct inode *inode,
1435                                        enum pcc_io_type iot)
1436 {
1437         struct ll_inode_info *lli = ll_i2info(inode);
1438         struct pcc_super *super = ll_i2pccs(inode);
1439
1440         /* Known the file was not in any PCC backend. */
1441         if (lli->lli_pcc_dsflags & PCC_DATASET_NONE)
1442                 return false;
1443
1444         /*
1445          * lli_pcc_generation == 0 means that the file was never attached into
1446          * PCC, or may be once attached into PCC but detached as the inode is
1447          * evicted from icache (i.e. "echo 3 > /proc/sys/vm/drop_caches" or
1448          * icache shrinking due to the memory pressure), which will cause the
1449          * file detach from PCC when releasing the inode from icache.
1450          * In either case, we still try to attach.
1451          */
1452         /* lli_pcc_generation == 0, or the PCC setting was changed,
1453          * or there is no PCC setup on the client and the try will return
1454          * immediately in pcc_try_auto_attach().
1455          */
1456         if (super->pccs_generation != lli->lli_pcc_generation)
1457                 return true;
1458
1459         /* The cached setting @lli_pcc_dsflags is valid */
1460         if (iot == PIT_OPEN)
1461                 return lli->lli_pcc_dsflags & PCC_DATASET_OPEN_ATTACH;
1462
1463         if (iot == PIT_GETATTR)
1464                 return lli->lli_pcc_dsflags & PCC_DATASET_STAT_ATTACH;
1465
1466         return lli->lli_pcc_dsflags & PCC_DATASET_IO_ATTACH;
1467 }
1468
1469 int pcc_file_open(struct inode *inode, struct file *file)
1470 {
1471         struct pcc_inode *pcci;
1472         struct ll_inode_info *lli = ll_i2info(inode);
1473         struct ll_file_data *fd = file->private_data;
1474         struct pcc_file *pccf = &fd->fd_pcc_file;
1475         struct file *pcc_file;
1476         struct path *path;
1477         bool cached = false;
1478         int rc = 0;
1479
1480         ENTRY;
1481
1482         if (!S_ISREG(inode->i_mode))
1483                 RETURN(0);
1484
1485         if (IS_ENCRYPTED(inode))
1486                 RETURN(0);
1487
1488         pcc_inode_lock(inode);
1489         pcci = ll_i2pcci(inode);
1490
1491         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1492                 GOTO(out_unlock, rc = 0);
1493
1494         if (!pcci || !pcc_inode_has_layout(pcci)) {
1495                 if (pcc_may_auto_attach(inode, PIT_OPEN))
1496                         rc = pcc_try_auto_attach(inode, &cached, PIT_OPEN);
1497
1498                 if (rc < 0 || !cached)
1499                         GOTO(out_unlock, rc);
1500
1501                 if (!pcci)
1502                         pcci = ll_i2pcci(inode);
1503         }
1504
1505         pcc_inode_get(pcci);
1506         WARN_ON(pccf->pccf_file);
1507
1508         path = &pcci->pcci_path;
1509         CDEBUG(D_CACHE, "opening pcc file '%pd'\n", path->dentry);
1510
1511         pcc_file = dentry_open(path, file->f_flags,
1512                                pcc_super_cred(inode->i_sb));
1513         if (IS_ERR_OR_NULL(pcc_file)) {
1514                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1515                 pcc_inode_put(pcci);
1516         } else {
1517                 pccf->pccf_file = pcc_file;
1518                 pccf->pccf_type = pcci->pcci_type;
1519         }
1520
1521 out_unlock:
1522         pcc_inode_unlock(inode);
1523         RETURN(rc);
1524 }
1525
1526 void pcc_file_release(struct inode *inode, struct file *file)
1527 {
1528         struct pcc_inode *pcci;
1529         struct ll_file_data *fd = file->private_data;
1530         struct pcc_file *pccf;
1531         struct path *path;
1532
1533         ENTRY;
1534
1535         if (!S_ISREG(inode->i_mode) || fd == NULL)
1536                 RETURN_EXIT;
1537
1538         pccf = &fd->fd_pcc_file;
1539         pcc_inode_lock(inode);
1540         if (pccf->pccf_file == NULL)
1541                 goto out;
1542
1543         pcci = ll_i2pcci(inode);
1544         LASSERT(pcci);
1545         path = &pcci->pcci_path;
1546         CDEBUG(D_CACHE, "releasing pcc file \"%pd\"\n", path->dentry);
1547         pcc_inode_put(pcci);
1548         fput(pccf->pccf_file);
1549         pccf->pccf_file = NULL;
1550 out:
1551         pcc_inode_unlock(inode);
1552         RETURN_EXIT;
1553 }
1554
1555 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1556 {
1557         struct pcc_inode *pcci;
1558
1559         pcc_inode_lock(inode);
1560         pcci = ll_i2pcci(inode);
1561         if (pcci && pcc_inode_has_layout(pcci)) {
1562                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1563                 atomic_inc(&pcci->pcci_active_ios);
1564                 *cached = true;
1565         } else {
1566                 *cached = false;
1567                 if (pcc_may_auto_attach(inode, iot)) {
1568                         (void) pcc_try_auto_attach(inode, cached, iot);
1569                         if (*cached) {
1570                                 pcci = ll_i2pcci(inode);
1571                                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1572                                 atomic_inc(&pcci->pcci_active_ios);
1573                         }
1574                 }
1575         }
1576         pcc_inode_unlock(inode);
1577 }
1578
1579 static void pcc_io_fini(struct inode *inode)
1580 {
1581         struct pcc_inode *pcci = ll_i2pcci(inode);
1582
1583         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1584         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1585                 wake_up(&pcci->pcci_waitq);
1586 }
1587
1588
1589 static ssize_t
1590 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1591 {
1592         struct file *file = iocb->ki_filp;
1593
1594 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1595         return file->f_op->read_iter(iocb, iter);
1596 #else
1597         struct iovec iov;
1598         struct iov_iter i;
1599         ssize_t bytes = 0;
1600
1601         iov_for_each(iov, i, *iter) {
1602                 ssize_t res;
1603
1604                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1605                 if (-EIOCBQUEUED == res)
1606                         res = wait_on_sync_kiocb(iocb);
1607                 if (res <= 0) {
1608                         if (bytes == 0)
1609                                 bytes = res;
1610                         break;
1611                 }
1612
1613                 bytes += res;
1614                 if (res < iov.iov_len)
1615                         break;
1616         }
1617
1618         if (bytes > 0)
1619                 iov_iter_advance(iter, bytes);
1620         return bytes;
1621 #endif
1622 }
1623
1624 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1625                            struct iov_iter *iter, bool *cached)
1626 {
1627         struct file *file = iocb->ki_filp;
1628         struct ll_file_data *fd = file->private_data;
1629         struct pcc_file *pccf = &fd->fd_pcc_file;
1630         struct inode *inode = file_inode(file);
1631         ssize_t result;
1632
1633         ENTRY;
1634
1635         if (pccf->pccf_file == NULL) {
1636                 *cached = false;
1637                 RETURN(0);
1638         }
1639
1640         pcc_io_init(inode, PIT_READ, cached);
1641         if (!*cached)
1642                 RETURN(0);
1643
1644         iocb->ki_filp = pccf->pccf_file;
1645         /* generic_file_aio_read does not support ext4-dax,
1646          * __pcc_file_read_iter uses ->aio_read hook directly
1647          * to add support for ext4-dax.
1648          */
1649         result = __pcc_file_read_iter(iocb, iter);
1650         iocb->ki_filp = file;
1651
1652         pcc_io_fini(inode);
1653         RETURN(result);
1654 }
1655
1656 static ssize_t
1657 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1658 {
1659         struct file *file = iocb->ki_filp;
1660
1661 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1662         return file->f_op->write_iter(iocb, iter);
1663 #else
1664         struct iovec iov;
1665         struct iov_iter i;
1666         ssize_t bytes = 0;
1667
1668         iov_for_each(iov, i, *iter) {
1669                 ssize_t res;
1670
1671                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1672                 if (-EIOCBQUEUED == res)
1673                         res = wait_on_sync_kiocb(iocb);
1674                 if (res <= 0) {
1675                         if (bytes == 0)
1676                                 bytes = res;
1677                         break;
1678                 }
1679
1680                 bytes += res;
1681                 if (res < iov.iov_len)
1682                         break;
1683         }
1684
1685         if (bytes > 0)
1686                 iov_iter_advance(iter, bytes);
1687         return bytes;
1688 #endif
1689 }
1690
1691 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1692                             struct iov_iter *iter, bool *cached)
1693 {
1694         struct file *file = iocb->ki_filp;
1695         struct ll_file_data *fd = file->private_data;
1696         struct pcc_file *pccf = &fd->fd_pcc_file;
1697         struct inode *inode = file_inode(file);
1698         ssize_t result;
1699
1700         ENTRY;
1701
1702         if (pccf->pccf_file == NULL) {
1703                 *cached = false;
1704                 RETURN(0);
1705         }
1706
1707         if (pccf->pccf_type != LU_PCC_READWRITE) {
1708                 *cached = false;
1709                 RETURN(-EAGAIN);
1710         }
1711
1712         pcc_io_init(inode, PIT_WRITE, cached);
1713         if (!*cached)
1714                 RETURN(0);
1715
1716         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1717                 GOTO(out, result = -ENOSPC);
1718
1719         iocb->ki_filp = pccf->pccf_file;
1720
1721         /* Since __pcc_file_write_iter makes write calls via
1722          * the normal vfs interface to the local PCC file system,
1723          * the inode lock is not needed.
1724          */
1725         result = __pcc_file_write_iter(iocb, iter);
1726         iocb->ki_filp = file;
1727 out:
1728         pcc_io_fini(inode);
1729         RETURN(result);
1730 }
1731
1732 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1733                       bool *cached)
1734 {
1735         int rc;
1736         const struct cred *old_cred;
1737         struct iattr attr2 = *attr;
1738         struct dentry *pcc_dentry;
1739         struct pcc_inode *pcci;
1740
1741         ENTRY;
1742
1743         if (!S_ISREG(inode->i_mode)) {
1744                 *cached = false;
1745                 RETURN(0);
1746         }
1747
1748         pcc_io_init(inode, PIT_SETATTR, cached);
1749         if (!*cached)
1750                 RETURN(0);
1751
1752         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1753                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1754                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1755         pcci = ll_i2pcci(inode);
1756         pcc_dentry = pcci->pcci_path.dentry;
1757         inode_lock(pcc_dentry->d_inode);
1758         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1759 #ifdef HAVE_USER_NAMESPACE_ARG
1760         rc = pcc_dentry->d_inode->i_op->setattr(&nop_mnt_idmap, pcc_dentry,
1761                                                 &attr2);
1762 #else
1763         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1764 #endif
1765         revert_creds(old_cred);
1766         inode_unlock(pcc_dentry->d_inode);
1767
1768         pcc_io_fini(inode);
1769         RETURN(rc);
1770 }
1771
1772 int pcc_inode_getattr(struct inode *inode, u32 request_mask,
1773                       unsigned int flags, bool *cached)
1774 {
1775         struct ll_inode_info *lli = ll_i2info(inode);
1776         const struct cred *old_cred;
1777         struct kstat stat;
1778         s64 atime;
1779         s64 mtime;
1780         s64 ctime;
1781         int rc;
1782
1783         ENTRY;
1784
1785         if (!S_ISREG(inode->i_mode)) {
1786                 *cached = false;
1787                 RETURN(0);
1788         }
1789
1790         pcc_io_init(inode, PIT_GETATTR, cached);
1791         if (!*cached)
1792                 RETURN(0);
1793
1794         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1795         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat, request_mask,
1796                             flags);
1797         revert_creds(old_cred);
1798         if (rc)
1799                 GOTO(out, rc);
1800
1801         ll_inode_size_lock(inode);
1802         if (test_and_clear_bit(LLIF_UPDATE_ATIME, &lli->lli_flags) ||
1803             inode->i_atime.tv_sec < lli->lli_atime)
1804                 inode->i_atime.tv_sec = lli->lli_atime;
1805
1806         inode->i_mtime.tv_sec = lli->lli_mtime;
1807         inode->i_ctime.tv_sec = lli->lli_ctime;
1808
1809         atime = inode->i_atime.tv_sec;
1810         mtime = inode->i_mtime.tv_sec;
1811         ctime = inode->i_ctime.tv_sec;
1812
1813         if (atime < stat.atime.tv_sec)
1814                 atime = stat.atime.tv_sec;
1815
1816         if (ctime < stat.ctime.tv_sec)
1817                 ctime = stat.ctime.tv_sec;
1818
1819         if (mtime < stat.mtime.tv_sec)
1820                 mtime = stat.mtime.tv_sec;
1821
1822         i_size_write(inode, stat.size);
1823         inode->i_blocks = stat.blocks;
1824
1825         inode->i_atime.tv_sec = atime;
1826         inode->i_mtime.tv_sec = mtime;
1827         inode->i_ctime.tv_sec = ctime;
1828
1829         ll_inode_size_unlock(inode);
1830 out:
1831         pcc_io_fini(inode);
1832         RETURN(rc);
1833 }
1834
1835 #ifdef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
1836 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1837                              struct pipe_inode_info *pipe,
1838                              size_t count, unsigned int flags)
1839 {
1840         struct inode *inode = file_inode(in_file);
1841         struct ll_file_data *fd = in_file->private_data;
1842         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1843         bool cached = false;
1844         ssize_t result;
1845
1846         ENTRY;
1847
1848         if (!pcc_file)
1849                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1850                                                 count, flags));
1851
1852         pcc_io_init(inode, PIT_SPLICE_READ, &cached);
1853         if (!cached)
1854                 RETURN(default_file_splice_read(in_file, ppos, pipe,
1855                                                 count, flags));
1856
1857         result = default_file_splice_read(pcc_file, ppos, pipe, count, flags);
1858
1859         pcc_io_fini(inode);
1860         RETURN(result);
1861 }
1862 #endif /* HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT */
1863
1864 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1865               int datasync, bool *cached)
1866 {
1867         struct inode *inode = file_inode(file);
1868         struct ll_file_data *fd = file->private_data;
1869         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1870         int rc;
1871
1872         ENTRY;
1873
1874         if (!pcc_file) {
1875                 *cached = false;
1876                 RETURN(0);
1877         }
1878
1879         pcc_io_init(inode, PIT_FSYNC, cached);
1880         if (!*cached)
1881                 RETURN(0);
1882
1883         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1884                                                 start, end, datasync);
1885
1886         pcc_io_fini(inode);
1887         RETURN(rc);
1888 }
1889
1890 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1891                   bool *cached)
1892 {
1893         struct inode *inode = file_inode(file);
1894         struct ll_file_data *fd = file->private_data;
1895         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1896         struct pcc_inode *pcci;
1897         int rc = 0;
1898
1899         ENTRY;
1900
1901         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1902                 *cached = false;
1903                 RETURN(0);
1904         }
1905
1906         pcc_inode_lock(inode);
1907         pcci = ll_i2pcci(inode);
1908         if (pcci && pcc_inode_has_layout(pcci)) {
1909                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1910                 *cached = true;
1911                 vma->vm_file = pcc_file;
1912                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1913                 vma->vm_file = file;
1914                 /* Save the vm ops of backend PCC */
1915                 vma->vm_private_data = (void *)vma->vm_ops;
1916         } else {
1917                 *cached = false;
1918         }
1919         pcc_inode_unlock(inode);
1920
1921         RETURN(rc);
1922 }
1923
1924 void pcc_vm_open(struct vm_area_struct *vma)
1925 {
1926         struct pcc_inode *pcci;
1927         struct file *file = vma->vm_file;
1928         struct inode *inode = file_inode(file);
1929         struct ll_file_data *fd = file->private_data;
1930         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1931         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1932
1933         ENTRY;
1934
1935         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1936                 RETURN_EXIT;
1937
1938         pcc_inode_lock(inode);
1939         pcci = ll_i2pcci(inode);
1940         if (pcci && pcc_inode_has_layout(pcci)) {
1941                 vma->vm_file = pcc_file;
1942                 pcc_vm_ops->open(vma);
1943                 vma->vm_file = file;
1944         }
1945         pcc_inode_unlock(inode);
1946         EXIT;
1947 }
1948
1949 void pcc_vm_close(struct vm_area_struct *vma)
1950 {
1951         struct file *file = vma->vm_file;
1952         struct inode *inode = file_inode(file);
1953         struct ll_file_data *fd = file->private_data;
1954         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1955         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1956
1957         ENTRY;
1958
1959         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1960                 RETURN_EXIT;
1961
1962         pcc_inode_lock(inode);
1963         /* Layout lock maybe revoked here */
1964         vma->vm_file = pcc_file;
1965         pcc_vm_ops->close(vma);
1966         vma->vm_file = file;
1967         pcc_inode_unlock(inode);
1968         EXIT;
1969 }
1970
1971 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1972                      bool *cached)
1973 {
1974         struct page *page = vmf->page;
1975         struct mm_struct *mm = vma->vm_mm;
1976         struct file *file = vma->vm_file;
1977         struct inode *inode = file_inode(file);
1978         struct ll_file_data *fd = file->private_data;
1979         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1980         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1981         int rc;
1982
1983         ENTRY;
1984
1985         if (!pcc_file || !pcc_vm_ops) {
1986                 *cached = false;
1987                 RETURN(0);
1988         }
1989
1990         if (!pcc_vm_ops->page_mkwrite &&
1991             page->mapping == pcc_file->f_mapping) {
1992                 CDEBUG(D_MMAP,
1993                        "%s: PCC backend fs not support ->page_mkwrite()\n",
1994                        ll_i2sbi(inode)->ll_fsname);
1995                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1996                 mmap_read_unlock(mm);
1997                 *cached = true;
1998                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1999         }
2000         /* Pause to allow for a race with concurrent detach */
2001         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
2002
2003         pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
2004         if (!*cached) {
2005                 /* This happens when the file is detached from PCC after got
2006                  * the fault page via ->fault() on the inode of the PCC copy.
2007                  * Here it can not simply fall back to normal Lustre I/O path.
2008                  * The reason is that the address space of fault page used by
2009                  * ->page_mkwrite() is still the one of PCC inode. In the
2010                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
2011                  * handled as the address space of the fault page is not
2012                  * consistent with the one of the Lustre inode (though the
2013                  * fault page was truncated).
2014                  * As the file is detached from PCC, the fault page must
2015                  * be released frist, and retry the mmap write (->fault() and
2016                  * ->page_mkwrite).
2017                  * We use an ugly and tricky method by returning
2018                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
2019                  * __do_page_fault and retry the memory fault handling.
2020                  */
2021                 if (page->mapping == pcc_file->f_mapping) {
2022                         *cached = true;
2023                         mmap_read_unlock(mm);
2024                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2025                 }
2026
2027                 RETURN(0);
2028         }
2029
2030         /*
2031          * This fault injection can also be used to simulate -ENOSPC and
2032          * -EDQUOT failure of underlying PCC backend fs.
2033          */
2034         if (CFS_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
2035                 pcc_io_fini(inode);
2036                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
2037                 mmap_read_unlock(mm);
2038                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
2039         }
2040
2041         vma->vm_file = pcc_file;
2042 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2043         rc = pcc_vm_ops->page_mkwrite(vmf);
2044 #else
2045         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
2046 #endif
2047         vma->vm_file = file;
2048
2049         pcc_io_fini(inode);
2050         RETURN(rc);
2051 }
2052
2053 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
2054               bool *cached)
2055 {
2056         struct file *file = vma->vm_file;
2057         struct inode *inode = file_inode(file);
2058         struct ll_file_data *fd = file->private_data;
2059         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
2060         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
2061         int rc;
2062
2063         ENTRY;
2064
2065         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
2066                 *cached = false;
2067                 RETURN(0);
2068         }
2069
2070         pcc_io_init(inode, PIT_FAULT, cached);
2071         if (!*cached)
2072                 RETURN(0);
2073
2074         vma->vm_file = pcc_file;
2075 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
2076         rc = pcc_vm_ops->fault(vmf);
2077 #else
2078         rc = pcc_vm_ops->fault(vma, vmf);
2079 #endif
2080         vma->vm_file = file;
2081
2082         pcc_io_fini(inode);
2083         RETURN(rc);
2084 }
2085
2086 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
2087 {
2088         pcci->pcci_type = LU_PCC_NONE;
2089         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
2090         if (atomic_read(&pcci->pcci_active_ios) == 0)
2091                 return;
2092
2093         CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
2094                        atomic_read(&pcci->pcci_active_ios));
2095         wait_event_idle(pcci->pcci_waitq,
2096                         atomic_read(&pcci->pcci_active_ios) == 0);
2097 }
2098
2099 void pcc_layout_invalidate(struct inode *inode)
2100 {
2101         struct pcc_inode *pcci;
2102
2103         ENTRY;
2104
2105         pcc_inode_lock(inode);
2106         pcci = ll_i2pcci(inode);
2107         if (pcci && pcc_inode_has_layout(pcci)) {
2108                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2109                 __pcc_layout_invalidate(pcci);
2110
2111                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
2112                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
2113
2114                 pcc_inode_put(pcci);
2115         }
2116         pcc_inode_unlock(inode);
2117
2118         EXIT;
2119 }
2120
2121 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2122 {
2123         int rc;
2124
2125         rc = vfs_unlink(&nop_mnt_idmap,
2126                         pcc_dentry->d_parent->d_inode, pcc_dentry);
2127         if (rc)
2128                 CWARN("%s: failed to unlink PCC file %pd, rc = %d\n",
2129                       ll_i2sbi(inode)->ll_fsname, pcc_dentry, rc);
2130
2131         return rc;
2132 }
2133
2134 /* Create directory under base if directory does not exist */
2135 static struct dentry *
2136 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2137 {
2138         int rc;
2139         struct dentry *dentry;
2140         struct inode *dir = base->d_inode;
2141
2142         inode_lock(dir);
2143         dentry = lookup_one_len(name, base, strlen(name));
2144         if (IS_ERR(dentry))
2145                 goto out;
2146
2147         if (d_is_positive(dentry))
2148                 goto out;
2149
2150         rc = vfs_mkdir(&nop_mnt_idmap, dir, dentry, mode);
2151         if (rc) {
2152                 dput(dentry);
2153                 dentry = ERR_PTR(rc);
2154                 goto out;
2155         }
2156 out:
2157         inode_unlock(dir);
2158         return dentry;
2159 }
2160
2161 static struct dentry *
2162 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2163 {
2164         char *ptr, *entry_name;
2165         struct dentry *parent;
2166         struct dentry *child = ERR_PTR(-EINVAL);
2167
2168         ptr = path;
2169         while (*ptr == '/')
2170                 ptr++;
2171
2172         entry_name = ptr;
2173         parent = dget(root);
2174         while ((ptr = strchr(ptr, '/')) != NULL) {
2175                 *ptr = '\0';
2176                 child = pcc_mkdir(parent, entry_name, mode);
2177                 *ptr = '/';
2178                 dput(parent);
2179                 if (IS_ERR(child))
2180                         break;
2181
2182                 parent = child;
2183                 ptr++;
2184                 entry_name = ptr;
2185         }
2186
2187         return child;
2188 }
2189
2190 /* Create file under base. If file already exist, return failure */
2191 static struct dentry *
2192 pcc_create(struct dentry *base, const char *name, umode_t mode)
2193 {
2194         int rc;
2195         struct dentry *dentry;
2196         struct inode *dir = base->d_inode;
2197
2198         inode_lock(dir);
2199         dentry = lookup_one_len(name, base, strlen(name));
2200         if (IS_ERR(dentry))
2201                 goto out;
2202
2203         if (d_is_positive(dentry))
2204                 goto out;
2205
2206         rc = vfs_create(&nop_mnt_idmap, dir, dentry, mode, false);
2207         if (rc) {
2208                 dput(dentry);
2209                 dentry = ERR_PTR(rc);
2210                 goto out;
2211         }
2212 out:
2213         inode_unlock(dir);
2214         return dentry;
2215 }
2216
2217 static int __pcc_inode_create(struct pcc_dataset *dataset,
2218                               struct lu_fid *fid,
2219                               struct dentry **dentry)
2220 {
2221         char *path;
2222         struct dentry *base;
2223         struct dentry *child;
2224         int rc = 0;
2225
2226         OBD_ALLOC(path, PCC_DATASET_MAX_PATH);
2227         if (path == NULL)
2228                 return -ENOMEM;
2229
2230         pcc_fid2dataset_path(path, PCC_DATASET_MAX_PATH, fid);
2231
2232         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2233         if (IS_ERR(base)) {
2234                 rc = PTR_ERR(base);
2235                 GOTO(out, rc);
2236         }
2237
2238         snprintf(path, PCC_DATASET_MAX_PATH, DFID_NOBRACE, PFID(fid));
2239         child = pcc_create(base, path, 0);
2240         if (IS_ERR(child)) {
2241                 rc = PTR_ERR(child);
2242                 GOTO(out_base, rc);
2243         }
2244         *dentry = child;
2245
2246 out_base:
2247         dput(base);
2248 out:
2249         OBD_FREE(path, PCC_DATASET_MAX_PATH);
2250         return rc;
2251 }
2252
2253 /*
2254  * Reset uid, gid or size for the PCC copy masked by @valid.
2255  * TODO: Set the project ID for PCC copy.
2256  */
2257 static int pcc_inode_reset_iattr(struct dentry *dentry, unsigned int valid,
2258                                  kuid_t uid, kgid_t gid, loff_t size)
2259 {
2260         struct inode *inode = dentry->d_inode;
2261         struct iattr attr;
2262         int rc;
2263
2264         ENTRY;
2265
2266         attr.ia_valid = valid;
2267         attr.ia_uid = uid;
2268         attr.ia_gid = gid;
2269         attr.ia_size = size;
2270
2271         inode_lock(inode);
2272         rc = notify_change(&nop_mnt_idmap, dentry, &attr, NULL);
2273         inode_unlock(inode);
2274
2275         RETURN(rc);
2276 }
2277
2278 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2279                      struct lu_fid *fid, struct dentry **pcc_dentry)
2280 {
2281         const struct cred *old_cred;
2282         int rc;
2283
2284         old_cred = override_creds(pcc_super_cred(sb));
2285         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2286         revert_creds(old_cred);
2287         return rc;
2288 }
2289
2290 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2291 {
2292         struct dentry *pcc_dentry = pca->pca_dentry;
2293         struct pcc_super *super = ll_i2pccs(inode);
2294         const struct cred *old_cred;
2295         struct pcc_inode *pcci;
2296         int rc;
2297
2298         ENTRY;
2299
2300         if (!pca->pca_dataset)
2301                 RETURN(0);
2302
2303         if (!inode)
2304                 GOTO(out_dataset_put, rc = 0);
2305
2306         LASSERT(pcc_dentry);
2307
2308         old_cred = override_creds(super->pccs_cred);
2309         pcc_inode_lock(inode);
2310         LASSERT(ll_i2pcci(inode) == NULL);
2311         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2312         if (pcci == NULL)
2313                 GOTO(out_put, rc = -ENOMEM);
2314
2315         rc = pcc_inode_reset_iattr(pcc_dentry, ATTR_UID | ATTR_GID,
2316                                    old_cred->suid, old_cred->sgid, 0);
2317         if (rc)
2318                 GOTO(out_put, rc);
2319
2320         pcc_inode_attach_set(super, pca->pca_dataset, ll_i2info(inode),
2321                              pcci, pcc_dentry, LU_PCC_READWRITE);
2322
2323         rc = pcc_layout_xattr_set(pcci, 0);
2324         if (rc) {
2325                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2326                 pcc_inode_put(pcci);
2327                 GOTO(out_unlock, rc);
2328         }
2329
2330         /* Set the layout generation of newly created file with 0 */
2331         pcc_layout_gen_set(pcci, 0);
2332
2333 out_put:
2334         if (rc) {
2335                 (void) pcc_inode_remove(inode, pcc_dentry);
2336                 dput(pcc_dentry);
2337
2338                 if (pcci)
2339                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2340         }
2341 out_unlock:
2342         pcc_inode_unlock(inode);
2343         revert_creds(old_cred);
2344 out_dataset_put:
2345         pcc_dataset_put(pca->pca_dataset);
2346         RETURN(rc);
2347 }
2348
2349 void pcc_create_attach_cleanup(struct super_block *sb,
2350                                struct pcc_create_attach *pca)
2351 {
2352         if (!pca->pca_dataset)
2353                 return;
2354
2355         if (pca->pca_dentry) {
2356                 const struct cred *old_cred;
2357                 int rc;
2358
2359                 old_cred = override_creds(pcc_super_cred(sb));
2360                 rc = vfs_unlink(&nop_mnt_idmap,
2361                                 pca->pca_dentry->d_parent->d_inode,
2362                                 pca->pca_dentry);
2363                 if (rc)
2364                         CWARN("%s: failed to unlink PCC file %pd: rc = %d\n",
2365                               ll_s2sbi(sb)->ll_fsname, pca->pca_dentry, rc);
2366                 /* ignore the unlink failure */
2367                 revert_creds(old_cred);
2368                 dput(pca->pca_dentry);
2369         }
2370
2371         pcc_dataset_put(pca->pca_dataset);
2372 }
2373
2374 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2375                           loff_t *offset)
2376 {
2377         while (count > 0) {
2378                 ssize_t size;
2379
2380                 size = cfs_kernel_write(filp, buf, count, offset);
2381                 if (size < 0)
2382                         return size;
2383                 count -= size;
2384                 buf += size;
2385         }
2386         return 0;
2387 }
2388
2389 static ssize_t pcc_copy_data(struct file *src, struct file *dst)
2390 {
2391         ssize_t rc = 0;
2392         ssize_t rc2;
2393         loff_t pos, offset = 0;
2394         size_t buf_len = 1048576;
2395         void *buf;
2396
2397         ENTRY;
2398
2399         OBD_ALLOC_LARGE(buf, buf_len);
2400         if (buf == NULL)
2401                 RETURN(-ENOMEM);
2402
2403         while (1) {
2404                 if (signal_pending(current))
2405                         GOTO(out_free, rc = -EINTR);
2406
2407                 pos = offset;
2408                 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2409                 if (rc2 < 0)
2410                         GOTO(out_free, rc = rc2);
2411                 else if (rc2 == 0)
2412                         break;
2413
2414                 pos = offset;
2415                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2416                 if (rc < 0)
2417                         GOTO(out_free, rc);
2418                 offset += rc2;
2419         }
2420
2421         rc = offset;
2422 out_free:
2423         OBD_FREE_LARGE(buf, buf_len);
2424         RETURN(rc);
2425 }
2426
2427 static int pcc_attach_allowed_check(struct inode *inode)
2428 {
2429         struct ll_inode_info *lli = ll_i2info(inode);
2430         struct pcc_inode *pcci;
2431         int rc = 0;
2432
2433         ENTRY;
2434
2435         pcc_inode_lock(inode);
2436         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2437                 GOTO(out_unlock, rc = -EBUSY);
2438
2439         pcci = ll_i2pcci(inode);
2440         if (pcci && pcc_inode_has_layout(pcci))
2441                 GOTO(out_unlock, rc = -EEXIST);
2442
2443         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2444 out_unlock:
2445         pcc_inode_unlock(inode);
2446         RETURN(rc);
2447 }
2448
2449 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2450                          __u32 archive_id)
2451 {
2452         struct pcc_dataset *dataset;
2453         struct ll_inode_info *lli = ll_i2info(inode);
2454         struct pcc_super *super = ll_i2pccs(inode);
2455         struct pcc_inode *pcci;
2456         const struct cred *old_cred;
2457         struct dentry *dentry;
2458         struct file *pcc_filp;
2459         struct path path;
2460         ssize_t ret;
2461         int rc;
2462
2463         ENTRY;
2464
2465         rc = pcc_attach_allowed_check(inode);
2466         if (rc)
2467                 RETURN(rc);
2468
2469         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2470                                   LU_PCC_READWRITE, archive_id);
2471         if (dataset == NULL)
2472                 RETURN(-ENOENT);
2473
2474         old_cred = override_creds(super->pccs_cred);
2475         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2476         if (rc)
2477                 GOTO(out_dataset_put, rc);
2478
2479         path.mnt = dataset->pccd_path.mnt;
2480         path.dentry = dentry;
2481         pcc_filp = dentry_open(&path, O_WRONLY | O_LARGEFILE, current_cred());
2482         if (IS_ERR_OR_NULL(pcc_filp)) {
2483                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2484                 GOTO(out_dentry, rc);
2485         }
2486
2487         rc = pcc_inode_reset_iattr(dentry, ATTR_UID | ATTR_GID,
2488                                    old_cred->uid, old_cred->gid, 0);
2489         if (rc)
2490                 GOTO(out_fput, rc);
2491
2492         ret = pcc_copy_data(file, pcc_filp);
2493         if (ret < 0)
2494                 GOTO(out_fput, rc = ret);
2495
2496         /*
2497          * It must to truncate the PCC copy to the same size of the Lustre
2498          * copy after copy data. Otherwise, it may get wrong file size after
2499          * re-attach a file. See LU-13023 for details.
2500          */
2501         rc = pcc_inode_reset_iattr(dentry, ATTR_SIZE, KUIDT_INIT(0),
2502                                    KGIDT_INIT(0), ret);
2503         if (rc)
2504                 GOTO(out_fput, rc);
2505
2506         /* Pause to allow for a race with concurrent HSM remove */
2507         CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2508
2509         pcc_inode_lock(inode);
2510         pcci = ll_i2pcci(inode);
2511         LASSERT(!pcci);
2512         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2513         if (pcci == NULL)
2514                 GOTO(out_unlock, rc = -ENOMEM);
2515
2516         pcc_inode_attach_set(super, dataset, lli, pcci,
2517                              dentry, LU_PCC_READWRITE);
2518 out_unlock:
2519         pcc_inode_unlock(inode);
2520 out_fput:
2521         fput(pcc_filp);
2522 out_dentry:
2523         if (rc) {
2524                 (void) pcc_inode_remove(inode, dentry);
2525                 dput(dentry);
2526         }
2527 out_dataset_put:
2528         pcc_dataset_put(dataset);
2529         revert_creds(old_cred);
2530
2531         RETURN(rc);
2532 }
2533
2534 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2535                               __u32 gen, bool lease_broken, int rc,
2536                               bool attached)
2537 {
2538         struct ll_inode_info *lli = ll_i2info(inode);
2539         const struct cred *old_cred;
2540         struct pcc_inode *pcci;
2541         __u32 gen2;
2542
2543         ENTRY;
2544
2545         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2546         pcc_inode_lock(inode);
2547         pcci = ll_i2pcci(inode);
2548         if (rc || lease_broken) {
2549                 if (attached && pcci)
2550                         pcc_inode_put(pcci);
2551
2552                 GOTO(out_unlock, rc);
2553         }
2554
2555         /* PCC inode may be released due to layout lock revocatioin */
2556         if (!pcci)
2557                 GOTO(out_unlock, rc = -ESTALE);
2558
2559         LASSERT(attached);
2560         rc = pcc_layout_xattr_set(pcci, gen);
2561         if (rc)
2562                 GOTO(out_put, rc);
2563
2564         LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2565         rc = ll_layout_refresh(inode, &gen2);
2566         if (!rc) {
2567                 if (gen2 == gen) {
2568                         pcc_layout_gen_set(pcci, gen);
2569                 } else {
2570                         CDEBUG(D_CACHE,
2571                                DFID" layout changed from %d to %d.\n",
2572                                PFID(ll_inode2fid(inode)), gen, gen2);
2573                         GOTO(out_put, rc = -ESTALE);
2574                 }
2575         }
2576
2577 out_put:
2578         if (rc) {
2579                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2580                 pcc_inode_put(pcci);
2581         }
2582 out_unlock:
2583         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2584         pcc_inode_unlock(inode);
2585         revert_creds(old_cred);
2586         RETURN(rc);
2587 }
2588
2589 static int pcc_hsm_remove(struct inode *inode)
2590 {
2591         struct hsm_user_request *hur;
2592         __u32 gen;
2593         int len;
2594         int rc;
2595
2596         ENTRY;
2597
2598         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2599         if (rc) {
2600                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2601                        PFID(&ll_i2info(inode)->lli_fid), rc);
2602                 RETURN(rc);
2603         }
2604
2605         ll_layout_refresh(inode, &gen);
2606
2607         len = sizeof(struct hsm_user_request) +
2608               sizeof(struct hsm_user_item);
2609         OBD_ALLOC(hur, len);
2610         if (hur == NULL)
2611                 RETURN(-ENOMEM);
2612
2613         hur->hur_request.hr_action = HUA_REMOVE;
2614         hur->hur_request.hr_archive_id = 0;
2615         hur->hur_request.hr_flags = 0;
2616         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2617                sizeof(hur->hur_user_item[0].hui_fid));
2618         hur->hur_user_item[0].hui_extent.offset = 0;
2619         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2620         hur->hur_request.hr_itemcount = 1;
2621         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2622                            len, hur, NULL);
2623         if (rc)
2624                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2625                        PFID(&ll_i2info(inode)->lli_fid), rc);
2626
2627         OBD_FREE(hur, len);
2628         RETURN(rc);
2629 }
2630
2631 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2632 {
2633         struct ll_inode_info *lli = ll_i2info(inode);
2634         struct pcc_inode *pcci;
2635         bool hsm_remove = false;
2636         int rc = 0;
2637
2638         ENTRY;
2639
2640         pcc_inode_lock(inode);
2641         pcci = lli->lli_pcc_inode;
2642         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2643             !pcc_inode_has_layout(pcci))
2644                 GOTO(out_unlock, rc = 0);
2645
2646         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2647
2648         if (pcci->pcci_type == LU_PCC_READWRITE) {
2649                 if (opt == PCC_DETACH_OPT_UNCACHE) {
2650                         hsm_remove = true;
2651                         /*
2652                          * The file will be removed from PCC, set the flags
2653                          * with PCC_DATASET_NONE even the later removal of the
2654                          * PCC copy fails.
2655                          */
2656                         lli->lli_pcc_dsflags = PCC_DATASET_NONE;
2657                 }
2658
2659                 __pcc_layout_invalidate(pcci);
2660                 pcc_inode_put(pcci);
2661         }
2662
2663 out_unlock:
2664         pcc_inode_unlock(inode);
2665         if (hsm_remove) {
2666                 const struct cred *old_cred;
2667
2668                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2669                 rc = pcc_hsm_remove(inode);
2670                 revert_creds(old_cred);
2671         }
2672
2673         RETURN(rc);
2674 }
2675
2676 int pcc_ioctl_state(struct file *file, struct inode *inode,
2677                     struct lu_pcc_state *state)
2678 {
2679         int rc = 0;
2680         int count;
2681         char *buf;
2682         char *path;
2683         int buf_len = sizeof(state->pccs_path);
2684         struct ll_file_data *fd = file->private_data;
2685         struct pcc_file *pccf = &fd->fd_pcc_file;
2686         struct pcc_inode *pcci;
2687
2688         ENTRY;
2689
2690         if (buf_len <= 0)
2691                 RETURN(-EINVAL);
2692
2693         OBD_ALLOC(buf, buf_len);
2694         if (buf == NULL)
2695                 RETURN(-ENOMEM);
2696
2697         pcc_inode_lock(inode);
2698         pcci = ll_i2pcci(inode);
2699         if (pcci == NULL) {
2700                 state->pccs_type = LU_PCC_NONE;
2701                 GOTO(out_unlock, rc = 0);
2702         }
2703
2704         count = atomic_read(&pcci->pcci_refcount);
2705         if (count == 0) {
2706                 state->pccs_type = LU_PCC_NONE;
2707                 state->pccs_open_count = 0;
2708                 GOTO(out_unlock, rc = 0);
2709         }
2710
2711         if (pcc_inode_has_layout(pcci))
2712                 count--;
2713         if (pccf->pccf_file != NULL)
2714                 count--;
2715         state->pccs_type = pcci->pcci_type;
2716         state->pccs_open_count = count;
2717         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2718         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2719         if (IS_ERR(path))
2720                 GOTO(out_unlock, rc = PTR_ERR(path));
2721
2722         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2723                 GOTO(out_unlock, rc = -ENAMETOOLONG);
2724
2725 out_unlock:
2726         pcc_inode_unlock(inode);
2727         OBD_FREE(buf, buf_len);
2728         RETURN(rc);
2729 }