Whamcloud - gitweb
32e5f652b9242dd199d028f4a056d674b8f6552d
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         spin_lock_init(&super->pccs_lock);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127
128         return 0;
129 }
130
131 /* Rule based auto caching */
132 static void pcc_id_list_free(struct list_head *id_list)
133 {
134         struct pcc_match_id *id, *n;
135
136         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
137                 list_del_init(&id->pmi_linkage);
138                 OBD_FREE_PTR(id);
139         }
140 }
141
142 static void pcc_fname_list_free(struct list_head *fname_list)
143 {
144         struct pcc_match_fname *fname, *n;
145
146         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
147                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
148                 list_del_init(&fname->pmf_linkage);
149                 OBD_FREE_PTR(fname);
150         }
151 }
152
153 static void pcc_expression_free(struct pcc_expression *expr)
154 {
155         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
156                 expr->pe_field < PCC_FIELD_MAX);
157         switch (expr->pe_field) {
158         case PCC_FIELD_UID:
159         case PCC_FIELD_GID:
160         case PCC_FIELD_PROJID:
161                 pcc_id_list_free(&expr->pe_cond);
162                 break;
163         case PCC_FIELD_FNAME:
164                 pcc_fname_list_free(&expr->pe_cond);
165                 break;
166         default:
167                 LBUG();
168         }
169         OBD_FREE_PTR(expr);
170 }
171
172 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
173 {
174         struct pcc_expression *expression, *n;
175
176         LASSERT(list_empty(&conjunction->pc_linkage));
177         list_for_each_entry_safe(expression, n,
178                                  &conjunction->pc_expressions,
179                                  pe_linkage) {
180                 list_del_init(&expression->pe_linkage);
181                 pcc_expression_free(expression);
182         }
183         OBD_FREE_PTR(conjunction);
184 }
185
186 static void pcc_rule_conds_free(struct list_head *cond_list)
187 {
188         struct pcc_conjunction *conjunction, *n;
189
190         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
191                 list_del_init(&conjunction->pc_linkage);
192                 pcc_conjunction_free(conjunction);
193         }
194 }
195
196 static void pcc_cmd_fini(struct pcc_cmd *cmd)
197 {
198         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
199                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
200                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
201                 if (cmd->u.pccc_add.pccc_conds_str)
202                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
203                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
204         }
205 }
206
207 #define PCC_DISJUNCTION_DELIM   (',')
208 #define PCC_CONJUNCTION_DELIM   ('&')
209 #define PCC_EXPRESSION_DELIM    ('=')
210
211 static int
212 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
213 {
214         struct pcc_match_fname *fname;
215
216         OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
217         if (fname == NULL)
218                 return -ENOMEM;
219
220         OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
221         if (fname->pmf_name == NULL) {
222                 OBD_FREE(fname, sizeof(struct pcc_match_fname));
223                 return -ENOMEM;
224         }
225
226         memcpy(fname->pmf_name, id->ls_str, id->ls_len);
227         list_add_tail(&fname->pmf_linkage, fname_list);
228         return 0;
229 }
230
231 static int
232 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
233 {
234         struct cfs_lstr src;
235         struct cfs_lstr res;
236         int rc = 0;
237
238         ENTRY;
239
240         src.ls_str = str;
241         src.ls_len = len;
242         INIT_LIST_HEAD(fname_list);
243         while (src.ls_str) {
244                 rc = cfs_gettok(&src, ' ', &res);
245                 if (rc == 0) {
246                         rc = -EINVAL;
247                         break;
248                 }
249                 rc = pcc_fname_list_add(&res, fname_list);
250                 if (rc)
251                         break;
252         }
253         if (rc)
254                 pcc_fname_list_free(fname_list);
255         RETURN(rc);
256 }
257
258 static int
259 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
260                   enum pcc_field type)
261 {
262         struct cfs_lstr src;
263         struct cfs_lstr res;
264         int rc = 0;
265
266         ENTRY;
267
268         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
269             type != PCC_FIELD_PROJID)
270                 RETURN(-EINVAL);
271
272         src.ls_str = str;
273         src.ls_len = len;
274         INIT_LIST_HEAD(id_list);
275         while (src.ls_str) {
276                 struct pcc_match_id *id;
277                 __u32 id_val;
278
279                 if (cfs_gettok(&src, ' ', &res) == 0)
280                         GOTO(out, rc = -EINVAL);
281
282                 if (!cfs_str2num_check(res.ls_str, res.ls_len,
283                                        &id_val, 0, (u32)~0U))
284                         GOTO(out, rc = -EINVAL);
285
286                 OBD_ALLOC_PTR(id);
287                 if (id == NULL)
288                         GOTO(out, rc = -ENOMEM);
289
290                 id->pmi_id = id_val;
291                 list_add_tail(&id->pmi_linkage, id_list);
292         }
293 out:
294         if (rc)
295                 pcc_id_list_free(id_list);
296         RETURN(rc);
297 }
298
299 static inline bool
300 pcc_check_field(struct cfs_lstr *field, char *str)
301 {
302         int len = strlen(str);
303
304         return (field->ls_len == len &&
305                 strncmp(field->ls_str, str, len) == 0);
306 }
307
308 static int
309 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
310 {
311         struct pcc_expression *expr;
312         struct cfs_lstr field;
313         int rc = 0;
314
315         OBD_ALLOC(expr, sizeof(struct pcc_expression));
316         if (expr == NULL)
317                 return -ENOMEM;
318
319         rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
320         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
321             src->ls_str[src->ls_len - 1] != '}')
322                 GOTO(out, rc = -EINVAL);
323
324         /* Skip '{' and '}' */
325         src->ls_str++;
326         src->ls_len -= 2;
327
328         if (pcc_check_field(&field, "uid")) {
329                 if (pcc_id_list_parse(src->ls_str,
330                                       src->ls_len,
331                                       &expr->pe_cond,
332                                       PCC_FIELD_UID) < 0)
333                         GOTO(out, rc = -EINVAL);
334                 expr->pe_field = PCC_FIELD_UID;
335         } else if (pcc_check_field(&field, "gid")) {
336                 if (pcc_id_list_parse(src->ls_str,
337                                       src->ls_len,
338                                       &expr->pe_cond,
339                                       PCC_FIELD_GID) < 0)
340                         GOTO(out, rc = -EINVAL);
341                 expr->pe_field = PCC_FIELD_GID;
342         } else if (pcc_check_field(&field, "projid")) {
343                 if (pcc_id_list_parse(src->ls_str,
344                                       src->ls_len,
345                                       &expr->pe_cond,
346                                       PCC_FIELD_PROJID) < 0)
347                         GOTO(out, rc = -EINVAL);
348                 expr->pe_field = PCC_FIELD_PROJID;
349         } else if (pcc_check_field(&field, "fname")) {
350                 if (pcc_fname_list_parse(src->ls_str,
351                                          src->ls_len,
352                                          &expr->pe_cond) < 0)
353                         GOTO(out, rc = -EINVAL);
354                 expr->pe_field = PCC_FIELD_FNAME;
355         } else {
356                 GOTO(out, rc = -EINVAL);
357         }
358
359         list_add_tail(&expr->pe_linkage, cond_list);
360         return 0;
361 out:
362         OBD_FREE_PTR(expr);
363         return rc;
364 }
365
366 static int
367 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
368 {
369         struct pcc_conjunction *conjunction;
370         struct cfs_lstr expr;
371         int rc = 0;
372
373         OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
374         if (conjunction == NULL)
375                 return -ENOMEM;
376
377         INIT_LIST_HEAD(&conjunction->pc_expressions);
378         list_add_tail(&conjunction->pc_linkage, cond_list);
379
380         while (src->ls_str) {
381                 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
382                 if (rc == 0) {
383                         rc = -EINVAL;
384                         break;
385                 }
386                 rc = pcc_expression_parse(&expr,
387                                           &conjunction->pc_expressions);
388                 if (rc)
389                         break;
390         }
391         return rc;
392 }
393
394 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
395 {
396         struct cfs_lstr src;
397         struct cfs_lstr res;
398         int rc = 0;
399
400         src.ls_str = str;
401         src.ls_len = len;
402         INIT_LIST_HEAD(cond_list);
403         while (src.ls_str) {
404                 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
405                 if (rc == 0) {
406                         rc = -EINVAL;
407                         break;
408                 }
409                 rc = pcc_conjunction_parse(&res, cond_list);
410                 if (rc)
411                         break;
412         }
413         return rc;
414 }
415
416 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
417 {
418         int rc;
419
420         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
421         if (cmd->u.pccc_add.pccc_conds_str == NULL)
422                 return -ENOMEM;
423
424         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
425
426         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
427                              strlen(cmd->u.pccc_add.pccc_conds_str),
428                              &cmd->u.pccc_add.pccc_conds);
429         if (rc)
430                 pcc_cmd_fini(cmd);
431
432         return rc;
433 }
434
435 static int
436 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
437 {
438         char *key, *val;
439         unsigned long id;
440         int rc;
441
442         val = buffer;
443         key = strsep(&val, "=");
444         if (val == NULL || strlen(val) == 0)
445                 return -EINVAL;
446
447         /* Key of the value pair */
448         if (strcmp(key, "rwid") == 0) {
449                 rc = kstrtoul(val, 10, &id);
450                 if (rc)
451                         return rc;
452                 if (id <= 0)
453                         return -EINVAL;
454                 cmd->u.pccc_add.pccc_rwid = id;
455         } else if (strcmp(key, "roid") == 0) {
456                 rc = kstrtoul(val, 10, &id);
457                 if (rc)
458                         return rc;
459                 if (id <= 0)
460                         return -EINVAL;
461                 cmd->u.pccc_add.pccc_roid = id;
462         } else {
463                 return -EINVAL;
464         }
465
466         return 0;
467 }
468
469 static int
470 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
471 {
472         char *val;
473         char *token;
474         int rc;
475
476         val = buffer;
477         while (val != NULL && strlen(val) != 0) {
478                 token = strsep(&val, " ");
479                 rc = pcc_parse_value_pair(cmd, token);
480                 if (rc)
481                         return rc;
482         }
483
484         return 0;
485 }
486
487 static void
488 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
489 {
490         if (!list_empty(&rule->pmr_conds))
491                 pcc_rule_conds_free(&rule->pmr_conds);
492         LASSERT(rule->pmr_conds_str != NULL);
493         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
494 }
495
496 static int
497 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
498 {
499         int rc = 0;
500
501         LASSERT(cmd->u.pccc_add.pccc_conds_str);
502         OBD_ALLOC(rule->pmr_conds_str,
503                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
504         if (rule->pmr_conds_str == NULL)
505                 return -ENOMEM;
506
507         memcpy(rule->pmr_conds_str,
508                cmd->u.pccc_add.pccc_conds_str,
509                strlen(cmd->u.pccc_add.pccc_conds_str));
510
511         INIT_LIST_HEAD(&rule->pmr_conds);
512         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
513                 rc = pcc_conds_parse(rule->pmr_conds_str,
514                                           strlen(rule->pmr_conds_str),
515                                           &rule->pmr_conds);
516
517         if (rc)
518                 pcc_dataset_rule_fini(rule);
519
520         return rc;
521 }
522
523 /* Rule Matching */
524 static int
525 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
526 {
527         struct pcc_match_id *id;
528
529         list_for_each_entry(id, id_list, pmi_linkage) {
530                 if (id->pmi_id == id_val)
531                         return 1;
532         }
533         return 0;
534 }
535
536 static bool
537 cfs_match_wildcard(const char *pattern, const char *content)
538 {
539         if (*pattern == '\0' && *content == '\0')
540                 return true;
541
542         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
543                 return false;
544
545         while (*pattern == *content) {
546                 pattern++;
547                 content++;
548                 if (*pattern == '\0' && *content == '\0')
549                         return true;
550
551                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
552                     *content == '\0')
553                         return false;
554         }
555
556         if (*pattern == '*')
557                 return (cfs_match_wildcard(pattern + 1, content) ||
558                         cfs_match_wildcard(pattern, content + 1));
559
560         return false;
561 }
562
563 static int
564 pcc_fname_list_match(struct list_head *fname_list, const char *name)
565 {
566         struct pcc_match_fname *fname;
567
568         list_for_each_entry(fname, fname_list, pmf_linkage) {
569                 if (cfs_match_wildcard(fname->pmf_name, name))
570                         return 1;
571         }
572         return 0;
573 }
574
575 static int
576 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
577 {
578         switch (expr->pe_field) {
579         case PCC_FIELD_UID:
580                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
581         case PCC_FIELD_GID:
582                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
583         case PCC_FIELD_PROJID:
584                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
585         case PCC_FIELD_FNAME:
586                 return pcc_fname_list_match(&expr->pe_cond,
587                                             matcher->pm_name->name);
588         default:
589                 return 0;
590         }
591 }
592
593 static int
594 pcc_conjunction_match(struct pcc_conjunction *conjunction,
595                       struct pcc_matcher *matcher)
596 {
597         struct pcc_expression *expr;
598         int matched;
599
600         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
601                 matched = pcc_expression_match(expr, matcher);
602                 if (!matched)
603                         return 0;
604         }
605
606         return 1;
607 }
608
609 static int
610 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
611 {
612         struct pcc_conjunction *conjunction;
613         int matched;
614
615         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
616                 matched = pcc_conjunction_match(conjunction, matcher);
617                 if (matched)
618                         return 1;
619         }
620
621         return 0;
622 }
623
624 struct pcc_dataset*
625 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
626 {
627         struct pcc_dataset *dataset;
628         struct pcc_dataset *selected = NULL;
629
630         spin_lock(&super->pccs_lock);
631         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
632                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
633                         atomic_inc(&dataset->pccd_refcount);
634                         selected = dataset;
635                         break;
636                 }
637         }
638         spin_unlock(&super->pccs_lock);
639         if (selected)
640                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
641                        dataset->pccd_rule.pmr_conds_str,
642                        matcher->pm_uid, matcher->pm_gid,
643                        matcher->pm_projid, matcher->pm_name->name);
644
645         return selected;
646 }
647
648 /**
649  * pcc_dataset_add - Add a Cache policy to control which files need be
650  * cached and where it will be cached.
651  *
652  * @super:      superblock of pcc
653  * @cmd:        pcc command
654  */
655 static int
656 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
657 {
658         char *pathname = cmd->pccc_pathname;
659         struct pcc_dataset *dataset;
660         struct pcc_dataset *tmp;
661         bool found = false;
662         int rc;
663
664         OBD_ALLOC_PTR(dataset);
665         if (dataset == NULL)
666                 return -ENOMEM;
667
668         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
669         if (unlikely(rc)) {
670                 OBD_FREE_PTR(dataset);
671                 return rc;
672         }
673         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
674         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
675         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
676         atomic_set(&dataset->pccd_refcount, 1);
677
678         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
679         if (rc) {
680                 pcc_dataset_put(dataset);
681                 return rc;
682         }
683
684         spin_lock(&super->pccs_lock);
685         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
686                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
687                     (dataset->pccd_rwid != 0 &&
688                      dataset->pccd_rwid == tmp->pccd_rwid) ||
689                     (dataset->pccd_roid != 0 &&
690                      dataset->pccd_roid == tmp->pccd_roid)) {
691                         found = true;
692                         break;
693                 }
694         }
695         if (!found)
696                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
697         spin_unlock(&super->pccs_lock);
698
699         if (found) {
700                 pcc_dataset_put(dataset);
701                 rc = -EEXIST;
702         }
703
704         return rc;
705 }
706
707 struct pcc_dataset *
708 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
709 {
710         struct pcc_dataset *dataset;
711         struct pcc_dataset *selected = NULL;
712
713         if (id == 0)
714                 return NULL;
715
716         /*
717          * archive ID (read-write ID) or read-only ID is unique in the list,
718          * we just return last added one as first priority.
719          */
720         spin_lock(&super->pccs_lock);
721         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
722                 if (type == LU_PCC_READWRITE && dataset->pccd_rwid != id)
723                         continue;
724                 atomic_inc(&dataset->pccd_refcount);
725                 selected = dataset;
726                 break;
727         }
728         spin_unlock(&super->pccs_lock);
729         if (selected)
730                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
731
732         return selected;
733 }
734
735 void
736 pcc_dataset_put(struct pcc_dataset *dataset)
737 {
738         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
739                 pcc_dataset_rule_fini(&dataset->pccd_rule);
740                 path_put(&dataset->pccd_path);
741                 OBD_FREE_PTR(dataset);
742         }
743 }
744
745 static int
746 pcc_dataset_del(struct pcc_super *super, char *pathname)
747 {
748         struct list_head *l, *tmp;
749         struct pcc_dataset *dataset;
750         int rc = -ENOENT;
751
752         spin_lock(&super->pccs_lock);
753         list_for_each_safe(l, tmp, &super->pccs_datasets) {
754                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
755                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
756                         list_del(&dataset->pccd_linkage);
757                         pcc_dataset_put(dataset);
758                         rc = 0;
759                         break;
760                 }
761         }
762         spin_unlock(&super->pccs_lock);
763         return rc;
764 }
765
766 static void
767 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
768 {
769         seq_printf(m, "%s:\n", dataset->pccd_pathname);
770         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
771         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
772 }
773
774 int
775 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
776 {
777         struct pcc_dataset *dataset;
778
779         spin_lock(&super->pccs_lock);
780         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
781                 pcc_dataset_dump(dataset, m);
782         }
783         spin_unlock(&super->pccs_lock);
784         return 0;
785 }
786
787 static void pcc_remove_datasets(struct pcc_super *super)
788 {
789         struct pcc_dataset *dataset, *tmp;
790
791         list_for_each_entry_safe(dataset, tmp,
792                                  &super->pccs_datasets, pccd_linkage) {
793                 list_del(&dataset->pccd_linkage);
794                 pcc_dataset_put(dataset);
795         }
796 }
797
798 void pcc_super_fini(struct pcc_super *super)
799 {
800         pcc_remove_datasets(super);
801         put_cred(super->pccs_cred);
802 }
803
804 static bool pathname_is_valid(const char *pathname)
805 {
806         /* Needs to be absolute path */
807         if (pathname == NULL || strlen(pathname) == 0 ||
808             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
809                 return false;
810         return true;
811 }
812
813 static struct pcc_cmd *
814 pcc_cmd_parse(char *buffer, unsigned long count)
815 {
816         static struct pcc_cmd *cmd;
817         char *token;
818         char *val;
819         int rc = 0;
820
821         OBD_ALLOC_PTR(cmd);
822         if (cmd == NULL)
823                 GOTO(out, rc = -ENOMEM);
824
825         /* clear all setting */
826         if (strncmp(buffer, "clear", 5) == 0) {
827                 cmd->pccc_cmd = PCC_CLEAR_ALL;
828                 GOTO(out, rc = 0);
829         }
830
831         val = buffer;
832         token = strsep(&val, " ");
833         if (val == NULL || strlen(val) == 0)
834                 GOTO(out_free_cmd, rc = -EINVAL);
835
836         /* Type of the command */
837         if (strcmp(token, "add") == 0)
838                 cmd->pccc_cmd = PCC_ADD_DATASET;
839         else if (strcmp(token, "del") == 0)
840                 cmd->pccc_cmd = PCC_DEL_DATASET;
841         else
842                 GOTO(out_free_cmd, rc = -EINVAL);
843
844         /* Pathname of the dataset */
845         token = strsep(&val, " ");
846         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
847             !pathname_is_valid(token))
848                 GOTO(out_free_cmd, rc = -EINVAL);
849         cmd->pccc_pathname = token;
850
851         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
852                 /* List of ID */
853                 LASSERT(val);
854                 token = val;
855                 val = strrchr(token, '}');
856                 if (!val)
857                         GOTO(out_free_cmd, rc = -EINVAL);
858
859                 /* Skip '}' */
860                 val++;
861                 if (*val == '\0') {
862                         val = NULL;
863                 } else if (*val == ' ') {
864                         *val = '\0';
865                         val++;
866                 } else {
867                         GOTO(out_free_cmd, rc = -EINVAL);
868                 }
869
870                 rc = pcc_id_parse(cmd, token);
871                 if (rc)
872                         GOTO(out_free_cmd, rc);
873
874                 rc = pcc_parse_value_pairs(cmd, val);
875                 if (rc)
876                         GOTO(out_cmd_fini, rc = -EINVAL);
877         }
878         goto out;
879 out_cmd_fini:
880         pcc_cmd_fini(cmd);
881 out_free_cmd:
882         OBD_FREE_PTR(cmd);
883 out:
884         if (rc)
885                 cmd = ERR_PTR(rc);
886         return cmd;
887 }
888
889 int pcc_cmd_handle(char *buffer, unsigned long count,
890                    struct pcc_super *super)
891 {
892         int rc = 0;
893         struct pcc_cmd *cmd;
894
895         cmd = pcc_cmd_parse(buffer, count);
896         if (IS_ERR(cmd))
897                 return PTR_ERR(cmd);
898
899         switch (cmd->pccc_cmd) {
900         case PCC_ADD_DATASET:
901                 rc = pcc_dataset_add(super, cmd);
902                 break;
903         case PCC_DEL_DATASET:
904                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
905                 break;
906         case PCC_CLEAR_ALL:
907                 pcc_remove_datasets(super);
908                 break;
909         default:
910                 rc = -EINVAL;
911                 break;
912         }
913
914         pcc_cmd_fini(cmd);
915         OBD_FREE_PTR(cmd);
916         return rc;
917 }
918
919 static inline void pcc_inode_lock(struct inode *inode)
920 {
921         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
922 }
923
924 static inline void pcc_inode_unlock(struct inode *inode)
925 {
926         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
927 }
928
929 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
930 {
931         pcci->pcci_lli = lli;
932         lli->lli_pcc_inode = pcci;
933         atomic_set(&pcci->pcci_refcount, 0);
934         pcci->pcci_type = LU_PCC_NONE;
935         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
936         atomic_set(&pcci->pcci_active_ios, 0);
937         init_waitqueue_head(&pcci->pcci_waitq);
938 }
939
940 static void pcc_inode_fini(struct pcc_inode *pcci)
941 {
942         struct ll_inode_info *lli = pcci->pcci_lli;
943
944         path_put(&pcci->pcci_path);
945         pcci->pcci_type = LU_PCC_NONE;
946         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
947         lli->lli_pcc_inode = NULL;
948 }
949
950 static void pcc_inode_get(struct pcc_inode *pcci)
951 {
952         atomic_inc(&pcci->pcci_refcount);
953 }
954
955 static void pcc_inode_put(struct pcc_inode *pcci)
956 {
957         if (atomic_dec_and_test(&pcci->pcci_refcount))
958                 pcc_inode_fini(pcci);
959 }
960
961 void pcc_inode_free(struct inode *inode)
962 {
963         struct pcc_inode *pcci = ll_i2pcci(inode);
964
965         if (pcci) {
966                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
967                 pcc_inode_put(pcci);
968         }
969 }
970
971 /*
972  * TODO:
973  * As Andreas suggested, we'd better use new layout to
974  * reduce overhead:
975  * (fid->f_oid >> 16 & oxFFFF)/FID
976  */
977 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
978 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
979 {
980         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
981                         DFID_NOBRACE,
982                         (fid)->f_oid       & 0xFFFF,
983                         (fid)->f_oid >> 16 & 0xFFFF,
984                         (unsigned int)((fid)->f_seq       & 0xFFFF),
985                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
986                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
987                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
988                         PFID(fid));
989 }
990
991 static inline const struct cred *pcc_super_cred(struct super_block *sb)
992 {
993         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
994 }
995
996 void pcc_file_init(struct pcc_file *pccf)
997 {
998         pccf->pccf_file = NULL;
999         pccf->pccf_type = LU_PCC_NONE;
1000 }
1001
1002 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1003 {
1004         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1005 }
1006
1007 int pcc_file_open(struct inode *inode, struct file *file)
1008 {
1009         struct pcc_inode *pcci;
1010         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1011         struct pcc_file *pccf = &fd->fd_pcc_file;
1012         struct file *pcc_file;
1013         struct path *path;
1014         struct qstr *dname;
1015         int rc = 0;
1016
1017         ENTRY;
1018
1019         if (!S_ISREG(inode->i_mode))
1020                 RETURN(0);
1021
1022         pcc_inode_lock(inode);
1023         pcci = ll_i2pcci(inode);
1024         if (!pcci)
1025                 GOTO(out_unlock, rc = 0);
1026
1027         if (atomic_read(&pcci->pcci_refcount) == 0 ||
1028             !pcc_inode_has_layout(pcci))
1029                 GOTO(out_unlock, rc = 0);
1030
1031         pcc_inode_get(pcci);
1032         WARN_ON(pccf->pccf_file);
1033
1034         path = &pcci->pcci_path;
1035         dname = &path->dentry->d_name;
1036         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1037                dname->name);
1038
1039 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1040         pcc_file = dentry_open(path, file->f_flags,
1041                                pcc_super_cred(inode->i_sb));
1042 #else
1043         pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1044                                pcc_super_cred(inode->i_sb));
1045 #endif
1046         if (IS_ERR_OR_NULL(pcc_file)) {
1047                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1048                 pcc_inode_put(pcci);
1049         } else {
1050                 pccf->pccf_file = pcc_file;
1051                 pccf->pccf_type = pcci->pcci_type;
1052         }
1053
1054 out_unlock:
1055         pcc_inode_unlock(inode);
1056         RETURN(rc);
1057 }
1058
1059 void pcc_file_release(struct inode *inode, struct file *file)
1060 {
1061         struct pcc_inode *pcci;
1062         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1063         struct pcc_file *pccf;
1064         struct path *path;
1065         struct qstr *dname;
1066
1067         ENTRY;
1068
1069         if (!S_ISREG(inode->i_mode) || fd == NULL)
1070                 RETURN_EXIT;
1071
1072         pccf = &fd->fd_pcc_file;
1073         pcc_inode_lock(inode);
1074         if (pccf->pccf_file == NULL)
1075                 goto out;
1076
1077         pcci = ll_i2pcci(inode);
1078         LASSERT(pcci);
1079         path = &pcci->pcci_path;
1080         dname = &path->dentry->d_name;
1081         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1082                dname->name);
1083         pcc_inode_put(pcci);
1084         fput(pccf->pccf_file);
1085         pccf->pccf_file = NULL;
1086 out:
1087         pcc_inode_unlock(inode);
1088         RETURN_EXIT;
1089 }
1090
1091 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1092                                       __u32 gen)
1093 {
1094         pcci->pcci_layout_gen = gen;
1095 }
1096
1097 static void pcc_io_init(struct inode *inode, bool *cached)
1098 {
1099         struct pcc_inode *pcci;
1100
1101         pcc_inode_lock(inode);
1102         pcci = ll_i2pcci(inode);
1103         if (pcci && pcc_inode_has_layout(pcci)) {
1104                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1105                 atomic_inc(&pcci->pcci_active_ios);
1106                 *cached = true;
1107         } else {
1108                 *cached = false;
1109         }
1110         pcc_inode_unlock(inode);
1111 }
1112
1113 static void pcc_io_fini(struct inode *inode)
1114 {
1115         struct pcc_inode *pcci = ll_i2pcci(inode);
1116
1117         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1118         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1119                 wake_up_all(&pcci->pcci_waitq);
1120 }
1121
1122
1123 static ssize_t
1124 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1125 {
1126         struct file *file = iocb->ki_filp;
1127
1128 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1129         return file->f_op->read_iter(iocb, iter);
1130 #else
1131         struct iovec iov;
1132         struct iov_iter i;
1133         ssize_t bytes = 0;
1134
1135         iov_for_each(iov, i, *iter) {
1136                 ssize_t res;
1137
1138                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1139                 if (-EIOCBQUEUED == res)
1140                         res = wait_on_sync_kiocb(iocb);
1141                 if (res <= 0) {
1142                         if (bytes == 0)
1143                                 bytes = res;
1144                         break;
1145                 }
1146
1147                 bytes += res;
1148                 if (res < iov.iov_len)
1149                         break;
1150         }
1151
1152         if (bytes > 0)
1153                 iov_iter_advance(iter, bytes);
1154         return bytes;
1155 #endif
1156 }
1157
1158 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1159                            struct iov_iter *iter, bool *cached)
1160 {
1161         struct file *file = iocb->ki_filp;
1162         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1163         struct pcc_file *pccf = &fd->fd_pcc_file;
1164         struct inode *inode = file_inode(file);
1165         ssize_t result;
1166
1167         ENTRY;
1168
1169         if (pccf->pccf_file == NULL) {
1170                 *cached = false;
1171                 RETURN(0);
1172         }
1173
1174         pcc_io_init(inode, cached);
1175         if (!*cached)
1176                 RETURN(0);
1177
1178         iocb->ki_filp = pccf->pccf_file;
1179         /* generic_file_aio_read does not support ext4-dax,
1180          * __pcc_file_read_iter uses ->aio_read hook directly
1181          * to add support for ext4-dax.
1182          */
1183         result = __pcc_file_read_iter(iocb, iter);
1184         iocb->ki_filp = file;
1185
1186         pcc_io_fini(inode);
1187         RETURN(result);
1188 }
1189
1190 static ssize_t
1191 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1192 {
1193         struct file *file = iocb->ki_filp;
1194
1195 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1196         return file->f_op->write_iter(iocb, iter);
1197 #else
1198         struct iovec iov;
1199         struct iov_iter i;
1200         ssize_t bytes = 0;
1201
1202         iov_for_each(iov, i, *iter) {
1203                 ssize_t res;
1204
1205                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1206                 if (-EIOCBQUEUED == res)
1207                         res = wait_on_sync_kiocb(iocb);
1208                 if (res <= 0) {
1209                         if (bytes == 0)
1210                                 bytes = res;
1211                         break;
1212                 }
1213
1214                 bytes += res;
1215                 if (res < iov.iov_len)
1216                         break;
1217         }
1218
1219         if (bytes > 0)
1220                 iov_iter_advance(iter, bytes);
1221         return bytes;
1222 #endif
1223 }
1224
1225 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1226                             struct iov_iter *iter, bool *cached)
1227 {
1228         struct file *file = iocb->ki_filp;
1229         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1230         struct pcc_file *pccf = &fd->fd_pcc_file;
1231         struct inode *inode = file_inode(file);
1232         ssize_t result;
1233
1234         ENTRY;
1235
1236         if (pccf->pccf_file == NULL) {
1237                 *cached = false;
1238                 RETURN(0);
1239         }
1240
1241         if (pccf->pccf_type != LU_PCC_READWRITE) {
1242                 *cached = false;
1243                 RETURN(-EAGAIN);
1244         }
1245
1246         pcc_io_init(inode, cached);
1247         if (!*cached)
1248                 RETURN(0);
1249
1250         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1251                 GOTO(out, result = -ENOSPC);
1252
1253         iocb->ki_filp = pccf->pccf_file;
1254
1255         /* Since __pcc_file_write_iter makes write calls via
1256          * the normal vfs interface to the local PCC file system,
1257          * the inode lock is not needed.
1258          */
1259         result = __pcc_file_write_iter(iocb, iter);
1260         iocb->ki_filp = file;
1261 out:
1262         pcc_io_fini(inode);
1263         RETURN(result);
1264 }
1265
1266 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1267                       bool *cached)
1268 {
1269         int rc;
1270         const struct cred *old_cred;
1271         struct iattr attr2 = *attr;
1272         struct dentry *pcc_dentry;
1273         struct pcc_inode *pcci;
1274
1275         ENTRY;
1276
1277         if (!S_ISREG(inode->i_mode)) {
1278                 *cached = false;
1279                 RETURN(0);
1280         }
1281
1282         pcc_io_init(inode, cached);
1283         if (!*cached)
1284                 RETURN(0);
1285
1286         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1287                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1288                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1289         pcci = ll_i2pcci(inode);
1290         pcc_dentry = pcci->pcci_path.dentry;
1291         inode_lock(pcc_dentry->d_inode);
1292         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1293         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1294         revert_creds(old_cred);
1295         inode_unlock(pcc_dentry->d_inode);
1296
1297         pcc_io_fini(inode);
1298         RETURN(rc);
1299 }
1300
1301 int pcc_inode_getattr(struct inode *inode, bool *cached)
1302 {
1303         struct ll_inode_info *lli = ll_i2info(inode);
1304         const struct cred *old_cred;
1305         struct kstat stat;
1306         s64 atime;
1307         s64 mtime;
1308         s64 ctime;
1309         int rc;
1310
1311         ENTRY;
1312
1313         if (!S_ISREG(inode->i_mode)) {
1314                 *cached = false;
1315                 RETURN(0);
1316         }
1317
1318         pcc_io_init(inode, cached);
1319         if (!*cached)
1320                 RETURN(0);
1321
1322         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1323         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1324         revert_creds(old_cred);
1325         if (rc)
1326                 GOTO(out, rc);
1327
1328         ll_inode_size_lock(inode);
1329         if (inode->i_atime.tv_sec < lli->lli_atime ||
1330             lli->lli_update_atime) {
1331                 inode->i_atime.tv_sec = lli->lli_atime;
1332                 lli->lli_update_atime = 0;
1333         }
1334         inode->i_mtime.tv_sec = lli->lli_mtime;
1335         inode->i_ctime.tv_sec = lli->lli_ctime;
1336
1337         atime = inode->i_atime.tv_sec;
1338         mtime = inode->i_mtime.tv_sec;
1339         ctime = inode->i_ctime.tv_sec;
1340
1341         if (atime < stat.atime.tv_sec)
1342                 atime = stat.atime.tv_sec;
1343
1344         if (ctime < stat.ctime.tv_sec)
1345                 ctime = stat.ctime.tv_sec;
1346
1347         if (mtime < stat.mtime.tv_sec)
1348                 mtime = stat.mtime.tv_sec;
1349
1350         i_size_write(inode, stat.size);
1351         inode->i_blocks = stat.blocks;
1352
1353         inode->i_atime.tv_sec = atime;
1354         inode->i_mtime.tv_sec = mtime;
1355         inode->i_ctime.tv_sec = ctime;
1356
1357         ll_inode_size_unlock(inode);
1358 out:
1359         pcc_io_fini(inode);
1360         RETURN(rc);
1361 }
1362
1363 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1364                              struct pipe_inode_info *pipe,
1365                              size_t count, unsigned int flags,
1366                              bool *cached)
1367 {
1368         struct inode *inode = file_inode(in_file);
1369         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1370         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1371         ssize_t result;
1372
1373         ENTRY;
1374
1375         *cached = false;
1376         if (!pcc_file)
1377                 RETURN(0);
1378
1379         if (!file_inode(pcc_file)->i_fop->splice_read)
1380                 RETURN(-ENOTSUPP);
1381
1382         pcc_io_init(inode, cached);
1383         if (!*cached)
1384                 RETURN(0);
1385
1386         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1387                                                           ppos, pipe, count,
1388                                                           flags);
1389
1390         pcc_io_fini(inode);
1391         RETURN(result);
1392 }
1393
1394 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1395               int datasync, bool *cached)
1396 {
1397         struct inode *inode = file_inode(file);
1398         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1399         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1400         int rc;
1401
1402         ENTRY;
1403
1404         if (!pcc_file) {
1405                 *cached = false;
1406                 RETURN(0);
1407         }
1408
1409         pcc_io_init(inode, cached);
1410         if (!*cached)
1411                 RETURN(0);
1412
1413 #ifdef HAVE_FILE_FSYNC_4ARGS
1414         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1415                                                 start, end, datasync);
1416 #elif defined(HAVE_FILE_FSYNC_2ARGS)
1417         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
1418 #else
1419         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1420                                 file_dentry(dentry), datasync);
1421 #endif
1422
1423         pcc_io_fini(inode);
1424         RETURN(rc);
1425 }
1426
1427 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1428                   bool *cached)
1429 {
1430         struct inode *inode = file_inode(file);
1431         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1432         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1433         struct pcc_inode *pcci;
1434         int rc = 0;
1435
1436         ENTRY;
1437
1438         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1439                 *cached = false;
1440                 RETURN(0);
1441         }
1442
1443         pcc_inode_lock(inode);
1444         pcci = ll_i2pcci(inode);
1445         if (pcci && pcc_inode_has_layout(pcci)) {
1446                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1447                 *cached = true;
1448                 vma->vm_file = pcc_file;
1449                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1450                 vma->vm_file = file;
1451                 /* Save the vm ops of backend PCC */
1452                 vma->vm_private_data = (void *)vma->vm_ops;
1453         } else {
1454                 *cached = false;
1455         }
1456         pcc_inode_unlock(inode);
1457
1458         RETURN(rc);
1459 }
1460
1461 void pcc_vm_open(struct vm_area_struct *vma)
1462 {
1463         struct pcc_inode *pcci;
1464         struct file *file = vma->vm_file;
1465         struct inode *inode = file_inode(file);
1466         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1467         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1468         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1469
1470         ENTRY;
1471
1472         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1473                 RETURN_EXIT;
1474
1475         pcc_inode_lock(inode);
1476         pcci = ll_i2pcci(inode);
1477         if (pcci && pcc_inode_has_layout(pcci)) {
1478                 vma->vm_file = pcc_file;
1479                 pcc_vm_ops->open(vma);
1480                 vma->vm_file = file;
1481         }
1482         pcc_inode_unlock(inode);
1483         EXIT;
1484 }
1485
1486 void pcc_vm_close(struct vm_area_struct *vma)
1487 {
1488         struct file *file = vma->vm_file;
1489         struct inode *inode = file_inode(file);
1490         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1491         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1492         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1493
1494         ENTRY;
1495
1496         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1497                 RETURN_EXIT;
1498
1499         pcc_inode_lock(inode);
1500         /* Layout lock maybe revoked here */
1501         vma->vm_file = pcc_file;
1502         pcc_vm_ops->close(vma);
1503         vma->vm_file = file;
1504         pcc_inode_unlock(inode);
1505         EXIT;
1506 }
1507
1508 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1509                      bool *cached)
1510 {
1511         struct page *page = vmf->page;
1512         struct mm_struct *mm = vma->vm_mm;
1513         struct file *file = vma->vm_file;
1514         struct inode *inode = file_inode(file);
1515         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1516         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1517         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1518         int rc;
1519
1520         ENTRY;
1521
1522         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
1523                 *cached = false;
1524                 RETURN(0);
1525         }
1526
1527         /* Pause to allow for a race with concurrent detach */
1528         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1529
1530         pcc_io_init(inode, cached);
1531         if (!*cached) {
1532                 /* This happens when the file is detached from PCC after got
1533                  * the fault page via ->fault() on the inode of the PCC copy.
1534                  * Here it can not simply fall back to normal Lustre I/O path.
1535                  * The reason is that the address space of fault page used by
1536                  * ->page_mkwrite() is still the one of PCC inode. In the
1537                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1538                  * handled as the address space of the fault page is not
1539                  * consistent with the one of the Lustre inode (though the
1540                  * fault page was truncated).
1541                  * As the file is detached from PCC, the fault page must
1542                  * be released frist, and retry the mmap write (->fault() and
1543                  * ->page_mkwrite).
1544                  * We use an ugly and tricky method by returning
1545                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1546                  * __do_page_fault and retry the memory fault handling.
1547                  */
1548                 if (page->mapping == file_inode(pcc_file)->i_mapping) {
1549                         *cached = true;
1550                         up_read(&mm->mmap_sem);
1551                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1552                 }
1553
1554                 RETURN(0);
1555         }
1556
1557         /*
1558          * This fault injection can also be used to simulate -ENOSPC and
1559          * -EDQUOT failure of underlying PCC backend fs.
1560          */
1561         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1562                 pcc_io_fini(inode);
1563                 pcc_ioctl_detach(inode);
1564                 up_read(&mm->mmap_sem);
1565                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1566         }
1567
1568         vma->vm_file = pcc_file;
1569 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1570         rc = pcc_vm_ops->page_mkwrite(vmf);
1571 #else
1572         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1573 #endif
1574         vma->vm_file = file;
1575
1576         pcc_io_fini(inode);
1577         RETURN(rc);
1578 }
1579
1580 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1581               bool *cached)
1582 {
1583         struct file *file = vma->vm_file;
1584         struct inode *inode = file_inode(file);
1585         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1586         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1587         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1588         int rc;
1589
1590         ENTRY;
1591
1592         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1593                 *cached = false;
1594                 RETURN(0);
1595         }
1596
1597         pcc_io_init(inode, cached);
1598         if (!*cached)
1599                 RETURN(0);
1600
1601         vma->vm_file = pcc_file;
1602 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1603         rc = pcc_vm_ops->fault(vmf);
1604 #else
1605         rc = pcc_vm_ops->fault(vma, vmf);
1606 #endif
1607         vma->vm_file = file;
1608
1609         pcc_io_fini(inode);
1610         RETURN(rc);
1611 }
1612
1613 static void pcc_layout_wait(struct pcc_inode *pcci)
1614 {
1615         struct l_wait_info lwi = { 0 };
1616
1617         while (atomic_read(&pcci->pcci_active_ios) > 0) {
1618                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1619                        atomic_read(&pcci->pcci_active_ios));
1620                 l_wait_event(pcci->pcci_waitq,
1621                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1622         }
1623 }
1624
1625 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1626 {
1627         pcci->pcci_type = LU_PCC_NONE;
1628         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1629         pcc_layout_wait(pcci);
1630 }
1631
1632 void pcc_layout_invalidate(struct inode *inode)
1633 {
1634         struct pcc_inode *pcci;
1635
1636         pcc_inode_lock(inode);
1637         pcci = ll_i2pcci(inode);
1638         if (pcci && pcc_inode_has_layout(pcci)) {
1639                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1640                 __pcc_layout_invalidate(pcci);
1641
1642                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1643                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1644
1645                 pcc_inode_put(pcci);
1646         }
1647         pcc_inode_unlock(inode);
1648 }
1649
1650 static int pcc_inode_remove(struct pcc_inode *pcci)
1651 {
1652         struct dentry *dentry;
1653         int rc;
1654
1655         dentry = pcci->pcci_path.dentry;
1656         rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1657         if (rc)
1658                 CWARN("failed to unlink PCC file %.*s, rc = %d\n",
1659                       dentry->d_name.len, dentry->d_name.name, rc);
1660
1661         return rc;
1662 }
1663
1664 /* Create directory under base if directory does not exist */
1665 static struct dentry *
1666 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1667 {
1668         int rc;
1669         struct dentry *dentry;
1670         struct inode *dir = base->d_inode;
1671
1672         inode_lock(dir);
1673         dentry = lookup_one_len(name, base, strlen(name));
1674         if (IS_ERR(dentry))
1675                 goto out;
1676
1677         if (d_is_positive(dentry))
1678                 goto out;
1679
1680         rc = vfs_mkdir(dir, dentry, mode);
1681         if (rc) {
1682                 dput(dentry);
1683                 dentry = ERR_PTR(rc);
1684                 goto out;
1685         }
1686 out:
1687         inode_unlock(dir);
1688         return dentry;
1689 }
1690
1691 static struct dentry *
1692 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1693 {
1694         char *ptr, *entry_name;
1695         struct dentry *parent;
1696         struct dentry *child = ERR_PTR(-EINVAL);
1697
1698         ptr = path;
1699         while (*ptr == '/')
1700                 ptr++;
1701
1702         entry_name = ptr;
1703         parent = dget(root);
1704         while ((ptr = strchr(ptr, '/')) != NULL) {
1705                 *ptr = '\0';
1706                 child = pcc_mkdir(parent, entry_name, mode);
1707                 *ptr = '/';
1708                 dput(parent);
1709                 if (IS_ERR(child))
1710                         break;
1711
1712                 parent = child;
1713                 ptr++;
1714                 entry_name = ptr;
1715         }
1716
1717         return child;
1718 }
1719
1720 /* Create file under base. If file already exist, return failure */
1721 static struct dentry *
1722 pcc_create(struct dentry *base, const char *name, umode_t mode)
1723 {
1724         int rc;
1725         struct dentry *dentry;
1726         struct inode *dir = base->d_inode;
1727
1728         inode_lock(dir);
1729         dentry = lookup_one_len(name, base, strlen(name));
1730         if (IS_ERR(dentry))
1731                 goto out;
1732
1733         if (d_is_positive(dentry))
1734                 goto out;
1735
1736         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
1737         if (rc) {
1738                 dput(dentry);
1739                 dentry = ERR_PTR(rc);
1740                 goto out;
1741         }
1742 out:
1743         inode_unlock(dir);
1744         return dentry;
1745 }
1746
1747 /* Must be called with pcci->pcci_lock held */
1748 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1749                                   struct pcc_inode *pcci,
1750                                   struct dentry *dentry,
1751                                   enum lu_pcc_type type)
1752 {
1753         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1754         pcci->pcci_path.dentry = dentry;
1755         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1756         atomic_set(&pcci->pcci_refcount, 1);
1757         pcci->pcci_type = type;
1758         pcci->pcci_attr_valid = false;
1759 }
1760
1761 static int __pcc_inode_create(struct pcc_dataset *dataset,
1762                               struct lu_fid *fid,
1763                               struct dentry **dentry)
1764 {
1765         char *path;
1766         struct dentry *base;
1767         struct dentry *child;
1768         int rc = 0;
1769
1770         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
1771         if (path == NULL)
1772                 return -ENOMEM;
1773
1774         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
1775
1776         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
1777         if (IS_ERR(base)) {
1778                 rc = PTR_ERR(base);
1779                 GOTO(out, rc);
1780         }
1781
1782         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
1783         child = pcc_create(base, path, 0);
1784         if (IS_ERR(child)) {
1785                 rc = PTR_ERR(child);
1786                 GOTO(out_base, rc);
1787         }
1788         *dentry = child;
1789
1790 out_base:
1791         dput(base);
1792 out:
1793         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
1794         return rc;
1795 }
1796
1797 /* TODO: Set the project ID for PCC copy */
1798 int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
1799 {
1800         struct inode *inode = dentry->d_inode;
1801         struct iattr attr;
1802         int rc;
1803
1804         ENTRY;
1805
1806         attr.ia_valid = ATTR_UID | ATTR_GID;
1807         attr.ia_uid = uid;
1808         attr.ia_gid = gid;
1809
1810         inode_lock(inode);
1811         rc = notify_change(dentry, &attr, NULL);
1812         inode_unlock(inode);
1813
1814         RETURN(rc);
1815 }
1816
1817 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
1818                      struct lu_fid *fid, struct dentry **pcc_dentry)
1819 {
1820         const struct cred *old_cred;
1821         int rc;
1822
1823         old_cred = override_creds(pcc_super_cred(sb));
1824         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
1825         revert_creds(old_cred);
1826         return rc;
1827 }
1828
1829 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
1830                           struct dentry *pcc_dentry)
1831 {
1832         const struct cred *old_cred;
1833         struct pcc_inode *pcci;
1834         int rc = 0;
1835
1836         ENTRY;
1837
1838         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1839         pcc_inode_lock(inode);
1840         LASSERT(ll_i2pcci(inode) == NULL);
1841         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1842         if (pcci == NULL)
1843                 GOTO(out_unlock, rc = -ENOMEM);
1844
1845         rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
1846                                    old_cred->sgid);
1847         if (rc)
1848                 GOTO(out_unlock, rc);
1849
1850         pcc_inode_init(pcci, ll_i2info(inode));
1851         pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
1852         /* Set the layout generation of newly created file with 0 */
1853         pcc_layout_gen_set(pcci, 0);
1854
1855 out_unlock:
1856         if (rc) {
1857                 int rc2;
1858
1859                 rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1860                 if (rc2)
1861                         CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
1862                               ll_i2sbi(inode)->ll_fsname,
1863                               pcc_dentry->d_name.len, pcc_dentry->d_name.name,
1864                               rc2);
1865
1866                 dput(pcc_dentry);
1867         }
1868
1869         pcc_inode_unlock(inode);
1870         revert_creds(old_cred);
1871         if (rc && pcci)
1872                 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1873
1874         RETURN(rc);
1875 }
1876
1877 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
1878                           loff_t *offset)
1879 {
1880         while (count > 0) {
1881                 ssize_t size;
1882
1883                 size = vfs_write(filp, (const void __user *)buf, count, offset);
1884                 if (size < 0)
1885                         return size;
1886                 count -= size;
1887                 buf += size;
1888         }
1889         return 0;
1890 }
1891
1892 static int pcc_copy_data(struct file *src, struct file *dst)
1893 {
1894         int rc = 0;
1895         ssize_t rc2;
1896         mm_segment_t oldfs;
1897         loff_t pos, offset = 0;
1898         size_t buf_len = 1048576;
1899         void *buf;
1900
1901         ENTRY;
1902
1903         OBD_ALLOC_LARGE(buf, buf_len);
1904         if (buf == NULL)
1905                 RETURN(-ENOMEM);
1906
1907         oldfs = get_fs();
1908         set_fs(KERNEL_DS);
1909         while (1) {
1910                 pos = offset;
1911                 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
1912                 if (rc2 < 0)
1913                         GOTO(out_fs, rc = rc2);
1914                 else if (rc2 == 0)
1915                         break;
1916
1917                 pos = offset;
1918                 rc = pcc_filp_write(dst, buf, rc2, &pos);
1919                 if (rc < 0)
1920                         GOTO(out_fs, rc);
1921                 offset += rc2;
1922         }
1923
1924 out_fs:
1925         set_fs(oldfs);
1926         OBD_FREE_LARGE(buf, buf_len);
1927         RETURN(rc);
1928 }
1929
1930 static int pcc_attach_allowed_check(struct inode *inode)
1931 {
1932         struct ll_inode_info *lli = ll_i2info(inode);
1933         struct pcc_inode *pcci;
1934         int rc = 0;
1935
1936         ENTRY;
1937
1938         pcc_inode_lock(inode);
1939         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1940                 GOTO(out_unlock, rc = -EBUSY);
1941
1942         pcci = ll_i2pcci(inode);
1943         if (pcci && pcc_inode_has_layout(pcci))
1944                 GOTO(out_unlock, rc = -EEXIST);
1945
1946         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
1947 out_unlock:
1948         pcc_inode_unlock(inode);
1949         RETURN(rc);
1950 }
1951
1952 int pcc_readwrite_attach(struct file *file, struct inode *inode,
1953                          __u32 archive_id)
1954 {
1955         struct pcc_dataset *dataset;
1956         struct ll_inode_info *lli = ll_i2info(inode);
1957         struct pcc_inode *pcci;
1958         const struct cred *old_cred;
1959         struct dentry *dentry;
1960         struct file *pcc_filp;
1961         struct path path;
1962         int rc;
1963
1964         ENTRY;
1965
1966         rc = pcc_attach_allowed_check(inode);
1967         if (rc)
1968                 RETURN(rc);
1969
1970         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
1971                                   LU_PCC_READWRITE, archive_id);
1972         if (dataset == NULL)
1973                 RETURN(-ENOENT);
1974
1975         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1976         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
1977         if (rc) {
1978                 revert_creds(old_cred);
1979                 GOTO(out_dataset_put, rc);
1980         }
1981
1982         path.mnt = dataset->pccd_path.mnt;
1983         path.dentry = dentry;
1984 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1985         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
1986                                current_cred());
1987 #else
1988         pcc_filp = dentry_open(path.dentry, path.mnt,
1989                                O_TRUNC | O_WRONLY | O_LARGEFILE,
1990                                current_cred());
1991 #endif
1992         if (IS_ERR_OR_NULL(pcc_filp)) {
1993                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
1994                 revert_creds(old_cred);
1995                 GOTO(out_dentry, rc);
1996         }
1997
1998         rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
1999         revert_creds(old_cred);
2000         if (rc)
2001                 GOTO(out_fput, rc);
2002
2003         rc = pcc_copy_data(file, pcc_filp);
2004         if (rc)
2005                 GOTO(out_fput, rc);
2006
2007         /* Pause to allow for a race with concurrent HSM remove */
2008         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2009
2010         pcc_inode_lock(inode);
2011         pcci = ll_i2pcci(inode);
2012         LASSERT(!pcci);
2013         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2014         if (pcci == NULL)
2015                 GOTO(out_unlock, rc = -ENOMEM);
2016
2017         pcc_inode_init(pcci, lli);
2018         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
2019 out_unlock:
2020         pcc_inode_unlock(inode);
2021 out_fput:
2022         fput(pcc_filp);
2023 out_dentry:
2024         if (rc) {
2025                 int rc2;
2026
2027                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2028                 rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
2029                 revert_creds(old_cred);
2030                 if (rc2)
2031                         CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
2032                               ll_i2sbi(inode)->ll_fsname, dentry->d_name.len,
2033                               dentry->d_name.name, rc2);
2034
2035                 dput(dentry);
2036         }
2037 out_dataset_put:
2038         pcc_dataset_put(dataset);
2039         RETURN(rc);
2040 }
2041
2042 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2043                               __u32 gen, bool lease_broken, int rc,
2044                               bool attached)
2045 {
2046         struct ll_inode_info *lli = ll_i2info(inode);
2047         const struct cred *old_cred;
2048         struct pcc_inode *pcci;
2049         __u32 gen2;
2050
2051         ENTRY;
2052
2053         pcc_inode_lock(inode);
2054         pcci = ll_i2pcci(inode);
2055         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2056         if (rc || lease_broken) {
2057                 if (attached && pcci)
2058                         pcc_inode_put(pcci);
2059
2060                 GOTO(out_unlock, rc);
2061         }
2062
2063         /* PCC inode may be released due to layout lock revocatioin */
2064         if (!pcci)
2065                 GOTO(out_unlock, rc = -ESTALE);
2066
2067         LASSERT(attached);
2068         rc = ll_layout_refresh(inode, &gen2);
2069         if (!rc) {
2070                 if (gen2 == gen) {
2071                         pcc_layout_gen_set(pcci, gen);
2072                 } else {
2073                         CDEBUG(D_CACHE,
2074                                DFID" layout changed from %d to %d.\n",
2075                                PFID(ll_inode2fid(inode)), gen, gen2);
2076                         GOTO(out_put, rc = -ESTALE);
2077                 }
2078         }
2079
2080 out_put:
2081         if (rc) {
2082                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2083                 pcc_inode_remove(pcci);
2084                 revert_creds(old_cred);
2085                 pcc_inode_put(pcci);
2086         }
2087 out_unlock:
2088         pcc_inode_unlock(inode);
2089         RETURN(rc);
2090 }
2091
2092 int pcc_ioctl_detach(struct inode *inode)
2093 {
2094         struct ll_inode_info *lli = ll_i2info(inode);
2095         struct pcc_inode *pcci;
2096         int rc = 0;
2097
2098         ENTRY;
2099
2100         pcc_inode_lock(inode);
2101         pcci = lli->lli_pcc_inode;
2102         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2103             !pcc_inode_has_layout(pcci))
2104                 GOTO(out_unlock, rc = 0);
2105
2106         __pcc_layout_invalidate(pcci);
2107         pcc_inode_put(pcci);
2108
2109 out_unlock:
2110         pcc_inode_unlock(inode);
2111         RETURN(rc);
2112 }
2113
2114 int pcc_ioctl_state(struct file *file, struct inode *inode,
2115                     struct lu_pcc_state *state)
2116 {
2117         int rc = 0;
2118         int count;
2119         char *buf;
2120         char *path;
2121         int buf_len = sizeof(state->pccs_path);
2122         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2123         struct pcc_file *pccf = &fd->fd_pcc_file;
2124         struct pcc_inode *pcci;
2125
2126         ENTRY;
2127
2128         if (buf_len <= 0)
2129                 RETURN(-EINVAL);
2130
2131         OBD_ALLOC(buf, buf_len);
2132         if (buf == NULL)
2133                 RETURN(-ENOMEM);
2134
2135         pcc_inode_lock(inode);
2136         pcci = ll_i2pcci(inode);
2137         if (pcci == NULL) {
2138                 state->pccs_type = LU_PCC_NONE;
2139                 GOTO(out_unlock, rc = 0);
2140         }
2141
2142         count = atomic_read(&pcci->pcci_refcount);
2143         if (count == 0) {
2144                 state->pccs_type = LU_PCC_NONE;
2145                 state->pccs_open_count = 0;
2146                 GOTO(out_unlock, rc = 0);
2147         }
2148
2149         if (pcc_inode_has_layout(pcci))
2150                 count--;
2151         if (pccf->pccf_file != NULL)
2152                 count--;
2153         state->pccs_type = pcci->pcci_type;
2154         state->pccs_open_count = count;
2155         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2156 #ifdef HAVE_DENTRY_PATH_RAW
2157         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2158         if (IS_ERR(path))
2159                 GOTO(out_unlock, rc = PTR_ERR(path));
2160 #else
2161         path = "UNKNOWN";
2162 #endif
2163
2164         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2165                 GOTO(out_unlock, rc = -ENAMETOOLONG);
2166
2167 out_unlock:
2168         pcc_inode_unlock(inode);
2169         OBD_FREE(buf, buf_len);
2170         RETURN(rc);
2171 }