Whamcloud - gitweb
e6f9d8fb33a3a4f55040d978e25dac558c070605
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127
128         return 0;
129 }
130
131 /* Rule based auto caching */
132 static void pcc_id_list_free(struct list_head *id_list)
133 {
134         struct pcc_match_id *id, *n;
135
136         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
137                 list_del_init(&id->pmi_linkage);
138                 OBD_FREE_PTR(id);
139         }
140 }
141
142 static void pcc_fname_list_free(struct list_head *fname_list)
143 {
144         struct pcc_match_fname *fname, *n;
145
146         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
147                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
148                 list_del_init(&fname->pmf_linkage);
149                 OBD_FREE_PTR(fname);
150         }
151 }
152
153 static void pcc_expression_free(struct pcc_expression *expr)
154 {
155         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
156                 expr->pe_field < PCC_FIELD_MAX);
157         switch (expr->pe_field) {
158         case PCC_FIELD_UID:
159         case PCC_FIELD_GID:
160         case PCC_FIELD_PROJID:
161                 pcc_id_list_free(&expr->pe_cond);
162                 break;
163         case PCC_FIELD_FNAME:
164                 pcc_fname_list_free(&expr->pe_cond);
165                 break;
166         default:
167                 LBUG();
168         }
169         OBD_FREE_PTR(expr);
170 }
171
172 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
173 {
174         struct pcc_expression *expression, *n;
175
176         LASSERT(list_empty(&conjunction->pc_linkage));
177         list_for_each_entry_safe(expression, n,
178                                  &conjunction->pc_expressions,
179                                  pe_linkage) {
180                 list_del_init(&expression->pe_linkage);
181                 pcc_expression_free(expression);
182         }
183         OBD_FREE_PTR(conjunction);
184 }
185
186 static void pcc_rule_conds_free(struct list_head *cond_list)
187 {
188         struct pcc_conjunction *conjunction, *n;
189
190         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
191                 list_del_init(&conjunction->pc_linkage);
192                 pcc_conjunction_free(conjunction);
193         }
194 }
195
196 static void pcc_cmd_fini(struct pcc_cmd *cmd)
197 {
198         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
199                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
200                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
201                 if (cmd->u.pccc_add.pccc_conds_str)
202                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
203                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
204         }
205 }
206
207 #define PCC_DISJUNCTION_DELIM   (',')
208 #define PCC_CONJUNCTION_DELIM   ('&')
209 #define PCC_EXPRESSION_DELIM    ('=')
210
211 static int
212 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
213 {
214         struct pcc_match_fname *fname;
215
216         OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
217         if (fname == NULL)
218                 return -ENOMEM;
219
220         OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
221         if (fname->pmf_name == NULL) {
222                 OBD_FREE(fname, sizeof(struct pcc_match_fname));
223                 return -ENOMEM;
224         }
225
226         memcpy(fname->pmf_name, id->ls_str, id->ls_len);
227         list_add_tail(&fname->pmf_linkage, fname_list);
228         return 0;
229 }
230
231 static int
232 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
233 {
234         struct cfs_lstr src;
235         struct cfs_lstr res;
236         int rc = 0;
237
238         ENTRY;
239
240         src.ls_str = str;
241         src.ls_len = len;
242         INIT_LIST_HEAD(fname_list);
243         while (src.ls_str) {
244                 rc = cfs_gettok(&src, ' ', &res);
245                 if (rc == 0) {
246                         rc = -EINVAL;
247                         break;
248                 }
249                 rc = pcc_fname_list_add(&res, fname_list);
250                 if (rc)
251                         break;
252         }
253         if (rc)
254                 pcc_fname_list_free(fname_list);
255         RETURN(rc);
256 }
257
258 static int
259 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
260                   enum pcc_field type)
261 {
262         struct cfs_lstr src;
263         struct cfs_lstr res;
264         int rc = 0;
265
266         ENTRY;
267
268         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
269             type != PCC_FIELD_PROJID)
270                 RETURN(-EINVAL);
271
272         src.ls_str = str;
273         src.ls_len = len;
274         INIT_LIST_HEAD(id_list);
275         while (src.ls_str) {
276                 struct pcc_match_id *id;
277                 __u32 id_val;
278
279                 if (cfs_gettok(&src, ' ', &res) == 0)
280                         GOTO(out, rc = -EINVAL);
281
282                 if (!cfs_str2num_check(res.ls_str, res.ls_len,
283                                        &id_val, 0, (u32)~0U))
284                         GOTO(out, rc = -EINVAL);
285
286                 OBD_ALLOC_PTR(id);
287                 if (id == NULL)
288                         GOTO(out, rc = -ENOMEM);
289
290                 id->pmi_id = id_val;
291                 list_add_tail(&id->pmi_linkage, id_list);
292         }
293 out:
294         if (rc)
295                 pcc_id_list_free(id_list);
296         RETURN(rc);
297 }
298
299 static inline bool
300 pcc_check_field(struct cfs_lstr *field, char *str)
301 {
302         int len = strlen(str);
303
304         return (field->ls_len == len &&
305                 strncmp(field->ls_str, str, len) == 0);
306 }
307
308 static int
309 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
310 {
311         struct pcc_expression *expr;
312         struct cfs_lstr field;
313         int rc = 0;
314
315         OBD_ALLOC(expr, sizeof(struct pcc_expression));
316         if (expr == NULL)
317                 return -ENOMEM;
318
319         rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
320         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
321             src->ls_str[src->ls_len - 1] != '}')
322                 GOTO(out, rc = -EINVAL);
323
324         /* Skip '{' and '}' */
325         src->ls_str++;
326         src->ls_len -= 2;
327
328         if (pcc_check_field(&field, "uid")) {
329                 if (pcc_id_list_parse(src->ls_str,
330                                       src->ls_len,
331                                       &expr->pe_cond,
332                                       PCC_FIELD_UID) < 0)
333                         GOTO(out, rc = -EINVAL);
334                 expr->pe_field = PCC_FIELD_UID;
335         } else if (pcc_check_field(&field, "gid")) {
336                 if (pcc_id_list_parse(src->ls_str,
337                                       src->ls_len,
338                                       &expr->pe_cond,
339                                       PCC_FIELD_GID) < 0)
340                         GOTO(out, rc = -EINVAL);
341                 expr->pe_field = PCC_FIELD_GID;
342         } else if (pcc_check_field(&field, "projid")) {
343                 if (pcc_id_list_parse(src->ls_str,
344                                       src->ls_len,
345                                       &expr->pe_cond,
346                                       PCC_FIELD_PROJID) < 0)
347                         GOTO(out, rc = -EINVAL);
348                 expr->pe_field = PCC_FIELD_PROJID;
349         } else if (pcc_check_field(&field, "fname")) {
350                 if (pcc_fname_list_parse(src->ls_str,
351                                          src->ls_len,
352                                          &expr->pe_cond) < 0)
353                         GOTO(out, rc = -EINVAL);
354                 expr->pe_field = PCC_FIELD_FNAME;
355         } else {
356                 GOTO(out, rc = -EINVAL);
357         }
358
359         list_add_tail(&expr->pe_linkage, cond_list);
360         return 0;
361 out:
362         OBD_FREE_PTR(expr);
363         return rc;
364 }
365
366 static int
367 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
368 {
369         struct pcc_conjunction *conjunction;
370         struct cfs_lstr expr;
371         int rc = 0;
372
373         OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
374         if (conjunction == NULL)
375                 return -ENOMEM;
376
377         INIT_LIST_HEAD(&conjunction->pc_expressions);
378         list_add_tail(&conjunction->pc_linkage, cond_list);
379
380         while (src->ls_str) {
381                 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
382                 if (rc == 0) {
383                         rc = -EINVAL;
384                         break;
385                 }
386                 rc = pcc_expression_parse(&expr,
387                                           &conjunction->pc_expressions);
388                 if (rc)
389                         break;
390         }
391         return rc;
392 }
393
394 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
395 {
396         struct cfs_lstr src;
397         struct cfs_lstr res;
398         int rc = 0;
399
400         src.ls_str = str;
401         src.ls_len = len;
402         INIT_LIST_HEAD(cond_list);
403         while (src.ls_str) {
404                 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
405                 if (rc == 0) {
406                         rc = -EINVAL;
407                         break;
408                 }
409                 rc = pcc_conjunction_parse(&res, cond_list);
410                 if (rc)
411                         break;
412         }
413         return rc;
414 }
415
416 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
417 {
418         int rc;
419
420         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
421         if (cmd->u.pccc_add.pccc_conds_str == NULL)
422                 return -ENOMEM;
423
424         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
425
426         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
427                              strlen(cmd->u.pccc_add.pccc_conds_str),
428                              &cmd->u.pccc_add.pccc_conds);
429         if (rc)
430                 pcc_cmd_fini(cmd);
431
432         return rc;
433 }
434
435 static int
436 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
437 {
438         char *key, *val;
439         unsigned long id;
440         int rc;
441
442         val = buffer;
443         key = strsep(&val, "=");
444         if (val == NULL || strlen(val) == 0)
445                 return -EINVAL;
446
447         /* Key of the value pair */
448         if (strcmp(key, "rwid") == 0) {
449                 rc = kstrtoul(val, 10, &id);
450                 if (rc)
451                         return rc;
452                 if (id <= 0)
453                         return -EINVAL;
454                 cmd->u.pccc_add.pccc_rwid = id;
455         } else if (strcmp(key, "roid") == 0) {
456                 rc = kstrtoul(val, 10, &id);
457                 if (rc)
458                         return rc;
459                 if (id <= 0)
460                         return -EINVAL;
461                 cmd->u.pccc_add.pccc_roid = id;
462         } else if (strcmp(key, "auto_attach") == 0) {
463                 rc = kstrtoul(val, 10, &id);
464                 if (rc)
465                         return rc;
466                 if (id == 0)
467                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_AUTO_ATTACH;
468         } else if (strcmp(key, "open_attach") == 0) {
469                 rc = kstrtoul(val, 10, &id);
470                 if (rc)
471                         return rc;
472                 if (id == 0)
473                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_OPEN_ATTACH;
474         } else if (strcmp(key, "io_attach") == 0) {
475                 rc = kstrtoul(val, 10, &id);
476                 if (rc)
477                         return rc;
478                 if (id == 0)
479                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_IO_ATTACH;
480         } else if (strcmp(key, "stat_attach") == 0) {
481                 rc = kstrtoul(val, 10, &id);
482                 if (rc)
483                         return rc;
484                 if (id == 0)
485                         cmd->u.pccc_add.pccc_flags &= ~PCC_DATASET_STAT_ATTACH;
486         } else if (strcmp(key, "rwpcc") == 0) {
487                 rc = kstrtoul(val, 10, &id);
488                 if (rc)
489                         return rc;
490                 if (id > 0)
491                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
492         } else if (strcmp(key, "ropcc") == 0) {
493                 rc = kstrtoul(val, 10, &id);
494                 if (rc)
495                         return rc;
496                 if (id > 0)
497                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
498         } else {
499                 return -EINVAL;
500         }
501
502         return 0;
503 }
504
505 static int
506 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
507 {
508         char *val;
509         char *token;
510         int rc;
511
512         switch (cmd->pccc_cmd) {
513         case PCC_ADD_DATASET:
514                 /* Enable auto attach by default */
515                 cmd->u.pccc_add.pccc_flags |= PCC_DATASET_AUTO_ATTACH;
516                 break;
517         case PCC_DEL_DATASET:
518         case PCC_CLEAR_ALL:
519                 break;
520         default:
521                 return -EINVAL;
522         }
523
524         val = buffer;
525         while (val != NULL && strlen(val) != 0) {
526                 token = strsep(&val, " ");
527                 rc = pcc_parse_value_pair(cmd, token);
528                 if (rc)
529                         return rc;
530         }
531
532         switch (cmd->pccc_cmd) {
533         case PCC_ADD_DATASET:
534                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
535                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
536                         return -EINVAL;
537                 /*
538                  * By default, a PCC backend can provide caching service for
539                  * both RW-PCC and RO-PCC.
540                  */
541                 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
542                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
543                 break;
544         case PCC_DEL_DATASET:
545         case PCC_CLEAR_ALL:
546                 break;
547         default:
548                 return -EINVAL;
549         }
550         return 0;
551 }
552
553 static void
554 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
555 {
556         if (!list_empty(&rule->pmr_conds))
557                 pcc_rule_conds_free(&rule->pmr_conds);
558         LASSERT(rule->pmr_conds_str != NULL);
559         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
560 }
561
562 static int
563 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
564 {
565         int rc = 0;
566
567         LASSERT(cmd->u.pccc_add.pccc_conds_str);
568         OBD_ALLOC(rule->pmr_conds_str,
569                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
570         if (rule->pmr_conds_str == NULL)
571                 return -ENOMEM;
572
573         memcpy(rule->pmr_conds_str,
574                cmd->u.pccc_add.pccc_conds_str,
575                strlen(cmd->u.pccc_add.pccc_conds_str));
576
577         INIT_LIST_HEAD(&rule->pmr_conds);
578         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
579                 rc = pcc_conds_parse(rule->pmr_conds_str,
580                                           strlen(rule->pmr_conds_str),
581                                           &rule->pmr_conds);
582
583         if (rc)
584                 pcc_dataset_rule_fini(rule);
585
586         return rc;
587 }
588
589 /* Rule Matching */
590 static int
591 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
592 {
593         struct pcc_match_id *id;
594
595         list_for_each_entry(id, id_list, pmi_linkage) {
596                 if (id->pmi_id == id_val)
597                         return 1;
598         }
599         return 0;
600 }
601
602 static bool
603 cfs_match_wildcard(const char *pattern, const char *content)
604 {
605         if (*pattern == '\0' && *content == '\0')
606                 return true;
607
608         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
609                 return false;
610
611         while (*pattern == *content) {
612                 pattern++;
613                 content++;
614                 if (*pattern == '\0' && *content == '\0')
615                         return true;
616
617                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
618                     *content == '\0')
619                         return false;
620         }
621
622         if (*pattern == '*')
623                 return (cfs_match_wildcard(pattern + 1, content) ||
624                         cfs_match_wildcard(pattern, content + 1));
625
626         return false;
627 }
628
629 static int
630 pcc_fname_list_match(struct list_head *fname_list, const char *name)
631 {
632         struct pcc_match_fname *fname;
633
634         list_for_each_entry(fname, fname_list, pmf_linkage) {
635                 if (cfs_match_wildcard(fname->pmf_name, name))
636                         return 1;
637         }
638         return 0;
639 }
640
641 static int
642 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
643 {
644         switch (expr->pe_field) {
645         case PCC_FIELD_UID:
646                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
647         case PCC_FIELD_GID:
648                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
649         case PCC_FIELD_PROJID:
650                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
651         case PCC_FIELD_FNAME:
652                 return pcc_fname_list_match(&expr->pe_cond,
653                                             matcher->pm_name->name);
654         default:
655                 return 0;
656         }
657 }
658
659 static int
660 pcc_conjunction_match(struct pcc_conjunction *conjunction,
661                       struct pcc_matcher *matcher)
662 {
663         struct pcc_expression *expr;
664         int matched;
665
666         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
667                 matched = pcc_expression_match(expr, matcher);
668                 if (!matched)
669                         return 0;
670         }
671
672         return 1;
673 }
674
675 static int
676 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
677 {
678         struct pcc_conjunction *conjunction;
679         int matched;
680
681         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
682                 matched = pcc_conjunction_match(conjunction, matcher);
683                 if (matched)
684                         return 1;
685         }
686
687         return 0;
688 }
689
690 struct pcc_dataset*
691 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
692 {
693         struct pcc_dataset *dataset;
694         struct pcc_dataset *selected = NULL;
695
696         down_read(&super->pccs_rw_sem);
697         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
698                 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
699                         continue;
700
701                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
702                         atomic_inc(&dataset->pccd_refcount);
703                         selected = dataset;
704                         break;
705                 }
706         }
707         up_read(&super->pccs_rw_sem);
708         if (selected)
709                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
710                        dataset->pccd_rule.pmr_conds_str,
711                        matcher->pm_uid, matcher->pm_gid,
712                        matcher->pm_projid, matcher->pm_name->name);
713
714         return selected;
715 }
716
717 /**
718  * pcc_dataset_add - Add a Cache policy to control which files need be
719  * cached and where it will be cached.
720  *
721  * @super:      superblock of pcc
722  * @cmd:        pcc command
723  */
724 static int
725 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
726 {
727         char *pathname = cmd->pccc_pathname;
728         struct pcc_dataset *dataset;
729         struct pcc_dataset *tmp;
730         bool found = false;
731         int rc;
732
733         OBD_ALLOC_PTR(dataset);
734         if (dataset == NULL)
735                 return -ENOMEM;
736
737         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
738         if (unlikely(rc)) {
739                 OBD_FREE_PTR(dataset);
740                 return rc;
741         }
742         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
743         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
744         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
745         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
746         atomic_set(&dataset->pccd_refcount, 1);
747
748         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
749         if (rc) {
750                 pcc_dataset_put(dataset);
751                 return rc;
752         }
753
754         down_write(&super->pccs_rw_sem);
755         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
756                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
757                     (dataset->pccd_rwid != 0 &&
758                      dataset->pccd_rwid == tmp->pccd_rwid) ||
759                     (dataset->pccd_roid != 0 &&
760                      dataset->pccd_roid == tmp->pccd_roid)) {
761                         found = true;
762                         break;
763                 }
764         }
765         if (!found)
766                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
767         up_write(&super->pccs_rw_sem);
768
769         if (found) {
770                 pcc_dataset_put(dataset);
771                 rc = -EEXIST;
772         }
773
774         return rc;
775 }
776
777 struct pcc_dataset *
778 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
779 {
780         struct pcc_dataset *dataset;
781         struct pcc_dataset *selected = NULL;
782
783         if (id == 0)
784                 return NULL;
785
786         /*
787          * archive ID (read-write ID) or read-only ID is unique in the list,
788          * we just return last added one as first priority.
789          */
790         down_read(&super->pccs_rw_sem);
791         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
792                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
793                     !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
794                         continue;
795                 atomic_inc(&dataset->pccd_refcount);
796                 selected = dataset;
797                 break;
798         }
799         up_read(&super->pccs_rw_sem);
800         if (selected)
801                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
802
803         return selected;
804 }
805
806 void
807 pcc_dataset_put(struct pcc_dataset *dataset)
808 {
809         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
810                 pcc_dataset_rule_fini(&dataset->pccd_rule);
811                 path_put(&dataset->pccd_path);
812                 OBD_FREE_PTR(dataset);
813         }
814 }
815
816 static int
817 pcc_dataset_del(struct pcc_super *super, char *pathname)
818 {
819         struct list_head *l, *tmp;
820         struct pcc_dataset *dataset;
821         int rc = -ENOENT;
822
823         down_write(&super->pccs_rw_sem);
824         list_for_each_safe(l, tmp, &super->pccs_datasets) {
825                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
826                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
827                         list_del_init(&dataset->pccd_linkage);
828                         pcc_dataset_put(dataset);
829                         rc = 0;
830                         break;
831                 }
832         }
833         up_write(&super->pccs_rw_sem);
834         return rc;
835 }
836
837 static void
838 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
839 {
840         seq_printf(m, "%s:\n", dataset->pccd_pathname);
841         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
842         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
843         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
844 }
845
846 int
847 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
848 {
849         struct pcc_dataset *dataset;
850
851         down_read(&super->pccs_rw_sem);
852         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
853                 pcc_dataset_dump(dataset, m);
854         }
855         up_read(&super->pccs_rw_sem);
856         return 0;
857 }
858
859 static void pcc_remove_datasets(struct pcc_super *super)
860 {
861         struct pcc_dataset *dataset, *tmp;
862
863         down_write(&super->pccs_rw_sem);
864         list_for_each_entry_safe(dataset, tmp,
865                                  &super->pccs_datasets, pccd_linkage) {
866                 list_del(&dataset->pccd_linkage);
867                 pcc_dataset_put(dataset);
868         }
869         up_write(&super->pccs_rw_sem);
870 }
871
872 void pcc_super_fini(struct pcc_super *super)
873 {
874         pcc_remove_datasets(super);
875         put_cred(super->pccs_cred);
876 }
877
878 static bool pathname_is_valid(const char *pathname)
879 {
880         /* Needs to be absolute path */
881         if (pathname == NULL || strlen(pathname) == 0 ||
882             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
883                 return false;
884         return true;
885 }
886
887 static struct pcc_cmd *
888 pcc_cmd_parse(char *buffer, unsigned long count)
889 {
890         static struct pcc_cmd *cmd;
891         char *token;
892         char *val;
893         int rc = 0;
894
895         OBD_ALLOC_PTR(cmd);
896         if (cmd == NULL)
897                 GOTO(out, rc = -ENOMEM);
898
899         /* clear all setting */
900         if (strncmp(buffer, "clear", 5) == 0) {
901                 cmd->pccc_cmd = PCC_CLEAR_ALL;
902                 GOTO(out, rc = 0);
903         }
904
905         val = buffer;
906         token = strsep(&val, " ");
907         if (val == NULL || strlen(val) == 0)
908                 GOTO(out_free_cmd, rc = -EINVAL);
909
910         /* Type of the command */
911         if (strcmp(token, "add") == 0)
912                 cmd->pccc_cmd = PCC_ADD_DATASET;
913         else if (strcmp(token, "del") == 0)
914                 cmd->pccc_cmd = PCC_DEL_DATASET;
915         else
916                 GOTO(out_free_cmd, rc = -EINVAL);
917
918         /* Pathname of the dataset */
919         token = strsep(&val, " ");
920         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
921             !pathname_is_valid(token))
922                 GOTO(out_free_cmd, rc = -EINVAL);
923         cmd->pccc_pathname = token;
924
925         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
926                 /* List of ID */
927                 LASSERT(val);
928                 token = val;
929                 val = strrchr(token, '}');
930                 if (!val)
931                         GOTO(out_free_cmd, rc = -EINVAL);
932
933                 /* Skip '}' */
934                 val++;
935                 if (*val == '\0') {
936                         val = NULL;
937                 } else if (*val == ' ') {
938                         *val = '\0';
939                         val++;
940                 } else {
941                         GOTO(out_free_cmd, rc = -EINVAL);
942                 }
943
944                 rc = pcc_id_parse(cmd, token);
945                 if (rc)
946                         GOTO(out_free_cmd, rc);
947
948                 rc = pcc_parse_value_pairs(cmd, val);
949                 if (rc)
950                         GOTO(out_cmd_fini, rc = -EINVAL);
951         }
952         goto out;
953 out_cmd_fini:
954         pcc_cmd_fini(cmd);
955 out_free_cmd:
956         OBD_FREE_PTR(cmd);
957 out:
958         if (rc)
959                 cmd = ERR_PTR(rc);
960         return cmd;
961 }
962
963 int pcc_cmd_handle(char *buffer, unsigned long count,
964                    struct pcc_super *super)
965 {
966         int rc = 0;
967         struct pcc_cmd *cmd;
968
969         cmd = pcc_cmd_parse(buffer, count);
970         if (IS_ERR(cmd))
971                 return PTR_ERR(cmd);
972
973         switch (cmd->pccc_cmd) {
974         case PCC_ADD_DATASET:
975                 rc = pcc_dataset_add(super, cmd);
976                 break;
977         case PCC_DEL_DATASET:
978                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
979                 break;
980         case PCC_CLEAR_ALL:
981                 pcc_remove_datasets(super);
982                 break;
983         default:
984                 rc = -EINVAL;
985                 break;
986         }
987
988         pcc_cmd_fini(cmd);
989         OBD_FREE_PTR(cmd);
990         return rc;
991 }
992
993 static inline void pcc_inode_lock(struct inode *inode)
994 {
995         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
996 }
997
998 static inline void pcc_inode_unlock(struct inode *inode)
999 {
1000         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
1001 }
1002
1003 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
1004 {
1005         pcci->pcci_lli = lli;
1006         lli->lli_pcc_inode = pcci;
1007         atomic_set(&pcci->pcci_refcount, 0);
1008         pcci->pcci_type = LU_PCC_NONE;
1009         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
1010         atomic_set(&pcci->pcci_active_ios, 0);
1011         init_waitqueue_head(&pcci->pcci_waitq);
1012 }
1013
1014 static void pcc_inode_fini(struct pcc_inode *pcci)
1015 {
1016         struct ll_inode_info *lli = pcci->pcci_lli;
1017
1018         path_put(&pcci->pcci_path);
1019         pcci->pcci_type = LU_PCC_NONE;
1020         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1021         lli->lli_pcc_inode = NULL;
1022 }
1023
1024 static void pcc_inode_get(struct pcc_inode *pcci)
1025 {
1026         atomic_inc(&pcci->pcci_refcount);
1027 }
1028
1029 static void pcc_inode_put(struct pcc_inode *pcci)
1030 {
1031         if (atomic_dec_and_test(&pcci->pcci_refcount))
1032                 pcc_inode_fini(pcci);
1033 }
1034
1035 void pcc_inode_free(struct inode *inode)
1036 {
1037         struct pcc_inode *pcci = ll_i2pcci(inode);
1038
1039         if (pcci) {
1040                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1041                 pcc_inode_put(pcci);
1042         }
1043 }
1044
1045 /*
1046  * TODO:
1047  * As Andreas suggested, we'd better use new layout to
1048  * reduce overhead:
1049  * (fid->f_oid >> 16 & oxFFFF)/FID
1050  */
1051 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1052 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1053 {
1054         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1055                         DFID_NOBRACE,
1056                         (fid)->f_oid       & 0xFFFF,
1057                         (fid)->f_oid >> 16 & 0xFFFF,
1058                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1059                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1060                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1061                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1062                         PFID(fid));
1063 }
1064
1065 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1066 {
1067         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1068 }
1069
1070 void pcc_file_init(struct pcc_file *pccf)
1071 {
1072         pccf->pccf_file = NULL;
1073         pccf->pccf_type = LU_PCC_NONE;
1074 }
1075
1076 static inline bool pcc_auto_attach_enabled(struct pcc_dataset *dataset)
1077 {
1078         return dataset->pccd_flags & PCC_DATASET_AUTO_ATTACH;
1079 }
1080
1081 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1082
1083 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1084 {
1085         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1086         struct ll_inode_info *lli = pcci->pcci_lli;
1087         int rc;
1088
1089         ENTRY;
1090
1091         if (!(lli->lli_pcc_state & PCC_STATE_FL_AUTO_ATTACH))
1092                 RETURN(0);
1093
1094 #ifndef HAVE_VFS_SETXATTR
1095         if (!pcc_dentry->d_inode->i_op->setxattr)
1096                 RETURN(-ENOTSUPP);
1097
1098         rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
1099                                                  &gen, sizeof(gen), 0);
1100 #else
1101         rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1102                             &gen, sizeof(gen), 0);
1103 #endif
1104         RETURN(rc);
1105 }
1106
1107 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1108 {
1109         struct lu_env *env;
1110         struct ll_inode_info *lli = ll_i2info(inode);
1111         __u16 refcheck;
1112         int rc;
1113
1114         ENTRY;
1115
1116         if (!lli->lli_clob)
1117                 RETURN(-EINVAL);
1118
1119         env = cl_env_get(&refcheck);
1120         if (IS_ERR(env))
1121                 RETURN(PTR_ERR(env));
1122
1123         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1124         if (rc)
1125                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1126                        PFID(ll_inode2fid(inode)));
1127
1128         cl_env_put(env, &refcheck);
1129         RETURN(rc);
1130 }
1131
1132 static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
1133                                     struct pcc_dataset *dataset)
1134 {
1135         return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
1136                         DFID_NOBRACE,
1137                         dataset->pccd_pathname,
1138                         (fid)->f_oid       & 0xFFFF,
1139                         (fid)->f_oid >> 16 & 0xFFFF,
1140                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1141                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1142                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1143                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1144                         PFID(fid));
1145 }
1146
1147 /* Must be called with pcci->pcci_lock held */
1148 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1149                                   struct pcc_inode *pcci,
1150                                   struct dentry *dentry,
1151                                   enum lu_pcc_type type)
1152 {
1153         struct ll_inode_info *lli = pcci->pcci_lli;
1154
1155         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1156         pcci->pcci_path.dentry = dentry;
1157         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1158         atomic_set(&pcci->pcci_refcount, 1);
1159         pcci->pcci_type = type;
1160         pcci->pcci_attr_valid = false;
1161
1162         if (dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH)
1163                 lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
1164         if (dataset->pccd_flags & PCC_DATASET_IO_ATTACH)
1165                 lli->lli_pcc_state |= PCC_STATE_FL_IO_ATTACH;
1166         if (dataset->pccd_flags & PCC_DATASET_STAT_ATTACH)
1167                 lli->lli_pcc_state |= PCC_STATE_FL_STAT_ATTACH;
1168 }
1169
1170 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1171                                       __u32 gen)
1172 {
1173         pcci->pcci_layout_gen = gen;
1174 }
1175
1176 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1177 {
1178         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1179 }
1180
1181 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1182                                   enum lu_pcc_type type,
1183                                   struct pcc_dataset *dataset,
1184                                   bool *cached)
1185 {
1186         struct ll_inode_info *lli = ll_i2info(inode);
1187         struct pcc_inode *pcci = lli->lli_pcc_inode;
1188         const struct cred *old_cred;
1189         struct dentry *pcc_dentry;
1190         struct path path;
1191         char *pathname;
1192         __u32 pcc_gen;
1193         int rc;
1194
1195         ENTRY;
1196
1197         if (type == LU_PCC_READWRITE &&
1198             !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1199                 RETURN(0);
1200
1201         OBD_ALLOC(pathname, PATH_MAX);
1202         if (pathname == NULL)
1203                 RETURN(-ENOMEM);
1204
1205         pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
1206
1207         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1208         rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
1209         if (rc)
1210                 /* ignore this error */
1211                 GOTO(out, rc = 0);
1212
1213         pcc_dentry = path.dentry;
1214 #ifndef HAVE_VFS_SETXATTR
1215         if (!pcc_dentry->d_inode->i_op->getxattr)
1216                 /* ignore this error */
1217                 GOTO(out_put_path, rc = 0);
1218
1219         rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
1220                                                  &pcc_gen, sizeof(pcc_gen));
1221 #else
1222         rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1223                             &pcc_gen, sizeof(pcc_gen));
1224 #endif
1225
1226         if (rc < 0)
1227                 /* ignore this error */
1228                 GOTO(out_put_path, rc = 0);
1229
1230         rc = 0;
1231         /* The file is still valid cached in PCC, attach it immediately. */
1232         if (pcc_gen == gen) {
1233                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1234                        PFID(&lli->lli_fid), gen);
1235                 if (!pcci) {
1236                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1237                         if (pcci == NULL)
1238                                 GOTO(out_put_path, rc = -ENOMEM);
1239
1240                         pcc_inode_init(pcci, lli);
1241                         dget(pcc_dentry);
1242                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1243                 } else {
1244                         /*
1245                          * This happened when a file was once attached into
1246                          * PCC, and some processes keep this file opened
1247                          * (pcci->refcount > 1) and corresponding PCC file
1248                          * without any I/O activity, and then this file was
1249                          * detached by the manual detach command or the
1250                          * revocation of the layout lock (i.e. cached LRU lock
1251                          * shrinking).
1252                          */
1253                         pcc_inode_get(pcci);
1254                         pcci->pcci_type = type;
1255                 }
1256                 pcc_layout_gen_set(pcci, gen);
1257                 *cached = true;
1258         }
1259 out_put_path:
1260         path_put(&path);
1261 out:
1262         revert_creds(old_cred);
1263         OBD_FREE(pathname, PATH_MAX);
1264         RETURN(rc);
1265 }
1266
1267 static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
1268                                    enum lu_pcc_type type, bool *cached)
1269 {
1270         struct pcc_dataset *dataset, *tmp;
1271         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1272         int rc = 0;
1273
1274         ENTRY;
1275
1276         down_read(&super->pccs_rw_sem);
1277         list_for_each_entry_safe(dataset, tmp,
1278                                  &super->pccs_datasets, pccd_linkage) {
1279                 if (!pcc_auto_attach_enabled(dataset))
1280                         continue;
1281                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1282                 if (rc < 0 || (!rc && *cached))
1283                         break;
1284         }
1285         up_read(&super->pccs_rw_sem);
1286
1287         RETURN(rc);
1288 }
1289
1290 static int pcc_try_auto_attach(struct inode *inode, bool *cached, bool is_open)
1291 {
1292         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1293         struct cl_layout clt = {
1294                 .cl_layout_gen = 0,
1295                 .cl_is_released = false,
1296         };
1297         struct ll_inode_info *lli = ll_i2info(inode);
1298         __u32 gen;
1299         int rc;
1300
1301         ENTRY;
1302
1303         /*
1304          * Quick check whether there is PCC device.
1305          */
1306         if (list_empty(&super->pccs_datasets))
1307                 RETURN(0);
1308
1309         /*
1310          * The file layout lock was cancelled. And this open does not
1311          * obtain valid layout lock from MDT (i.e. the file is being
1312          * HSM restoring).
1313          */
1314         if (is_open) {
1315                 if (ll_layout_version_get(lli) == CL_LAYOUT_GEN_NONE)
1316                         RETURN(0);
1317         } else {
1318                 rc = ll_layout_refresh(inode, &gen);
1319                 if (rc)
1320                         RETURN(rc);
1321         }
1322
1323         rc = pcc_get_layout_info(inode, &clt);
1324         if (rc)
1325                 RETURN(rc);
1326
1327         if (!is_open && gen != clt.cl_layout_gen) {
1328                 CDEBUG(D_CACHE, DFID" layout changed from %d to %d.\n",
1329                        PFID(ll_inode2fid(inode)), gen, clt.cl_layout_gen);
1330                 RETURN(-EINVAL);
1331         }
1332
1333         if (clt.cl_is_released)
1334                 rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
1335                                              LU_PCC_READWRITE, cached);
1336
1337         RETURN(rc);
1338 }
1339
1340 int pcc_file_open(struct inode *inode, struct file *file)
1341 {
1342         struct pcc_inode *pcci;
1343         struct ll_inode_info *lli = ll_i2info(inode);
1344         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1345         struct pcc_file *pccf = &fd->fd_pcc_file;
1346         struct file *pcc_file;
1347         struct path *path;
1348         struct qstr *dname;
1349         bool cached = false;
1350         int rc = 0;
1351
1352         ENTRY;
1353
1354         if (!S_ISREG(inode->i_mode))
1355                 RETURN(0);
1356
1357         pcc_inode_lock(inode);
1358         pcci = ll_i2pcci(inode);
1359
1360         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1361                 GOTO(out_unlock, rc = 0);
1362
1363         if (!pcci || !pcc_inode_has_layout(pcci)) {
1364                 if (lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH)
1365                         rc = pcc_try_auto_attach(inode, &cached, true);
1366
1367                 if (rc < 0 || !cached)
1368                         GOTO(out_unlock, rc);
1369
1370                 if (!pcci)
1371                         pcci = ll_i2pcci(inode);
1372         }
1373
1374         pcc_inode_get(pcci);
1375         WARN_ON(pccf->pccf_file);
1376
1377         path = &pcci->pcci_path;
1378         dname = &path->dentry->d_name;
1379         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1380                dname->name);
1381
1382 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1383         pcc_file = dentry_open(path, file->f_flags,
1384                                pcc_super_cred(inode->i_sb));
1385 #else
1386         pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1387                                pcc_super_cred(inode->i_sb));
1388 #endif
1389         if (IS_ERR_OR_NULL(pcc_file)) {
1390                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1391                 pcc_inode_put(pcci);
1392         } else {
1393                 pccf->pccf_file = pcc_file;
1394                 pccf->pccf_type = pcci->pcci_type;
1395         }
1396
1397 out_unlock:
1398         pcc_inode_unlock(inode);
1399         RETURN(rc);
1400 }
1401
1402 void pcc_file_release(struct inode *inode, struct file *file)
1403 {
1404         struct pcc_inode *pcci;
1405         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1406         struct pcc_file *pccf;
1407         struct path *path;
1408         struct qstr *dname;
1409
1410         ENTRY;
1411
1412         if (!S_ISREG(inode->i_mode) || fd == NULL)
1413                 RETURN_EXIT;
1414
1415         pccf = &fd->fd_pcc_file;
1416         pcc_inode_lock(inode);
1417         if (pccf->pccf_file == NULL)
1418                 goto out;
1419
1420         pcci = ll_i2pcci(inode);
1421         LASSERT(pcci);
1422         path = &pcci->pcci_path;
1423         dname = &path->dentry->d_name;
1424         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1425                dname->name);
1426         pcc_inode_put(pcci);
1427         fput(pccf->pccf_file);
1428         pccf->pccf_file = NULL;
1429 out:
1430         pcc_inode_unlock(inode);
1431         RETURN_EXIT;
1432 }
1433
1434 static void pcc_io_init(struct inode *inode, enum pcc_io_type iot, bool *cached)
1435 {
1436         struct ll_inode_info *lli = ll_i2info(inode);
1437         struct pcc_inode *pcci;
1438
1439         pcc_inode_lock(inode);
1440         pcci = ll_i2pcci(inode);
1441         if (pcci && pcc_inode_has_layout(pcci)) {
1442                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1443                 atomic_inc(&pcci->pcci_active_ios);
1444                 *cached = true;
1445         } else {
1446                 *cached = false;
1447                 if ((lli->lli_pcc_state & PCC_STATE_FL_IO_ATTACH &&
1448                      iot != PIT_GETATTR) ||
1449                     (iot == PIT_GETATTR &&
1450                      lli->lli_pcc_state & PCC_STATE_FL_STAT_ATTACH)) {
1451                         (void) pcc_try_auto_attach(inode, cached, false);
1452                         if (*cached) {
1453                                 pcci = ll_i2pcci(inode);
1454                                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1455                                 atomic_inc(&pcci->pcci_active_ios);
1456                         }
1457                 }
1458         }
1459         pcc_inode_unlock(inode);
1460 }
1461
1462 static void pcc_io_fini(struct inode *inode)
1463 {
1464         struct pcc_inode *pcci = ll_i2pcci(inode);
1465
1466         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1467         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1468                 wake_up_all(&pcci->pcci_waitq);
1469 }
1470
1471
1472 static ssize_t
1473 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1474 {
1475         struct file *file = iocb->ki_filp;
1476
1477 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1478         return file->f_op->read_iter(iocb, iter);
1479 #else
1480         struct iovec iov;
1481         struct iov_iter i;
1482         ssize_t bytes = 0;
1483
1484         iov_for_each(iov, i, *iter) {
1485                 ssize_t res;
1486
1487                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1488                 if (-EIOCBQUEUED == res)
1489                         res = wait_on_sync_kiocb(iocb);
1490                 if (res <= 0) {
1491                         if (bytes == 0)
1492                                 bytes = res;
1493                         break;
1494                 }
1495
1496                 bytes += res;
1497                 if (res < iov.iov_len)
1498                         break;
1499         }
1500
1501         if (bytes > 0)
1502                 iov_iter_advance(iter, bytes);
1503         return bytes;
1504 #endif
1505 }
1506
1507 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1508                            struct iov_iter *iter, bool *cached)
1509 {
1510         struct file *file = iocb->ki_filp;
1511         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1512         struct pcc_file *pccf = &fd->fd_pcc_file;
1513         struct inode *inode = file_inode(file);
1514         ssize_t result;
1515
1516         ENTRY;
1517
1518         if (pccf->pccf_file == NULL) {
1519                 *cached = false;
1520                 RETURN(0);
1521         }
1522
1523         pcc_io_init(inode, PIT_READ, cached);
1524         if (!*cached)
1525                 RETURN(0);
1526
1527         iocb->ki_filp = pccf->pccf_file;
1528         /* generic_file_aio_read does not support ext4-dax,
1529          * __pcc_file_read_iter uses ->aio_read hook directly
1530          * to add support for ext4-dax.
1531          */
1532         result = __pcc_file_read_iter(iocb, iter);
1533         iocb->ki_filp = file;
1534
1535         pcc_io_fini(inode);
1536         RETURN(result);
1537 }
1538
1539 static ssize_t
1540 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1541 {
1542         struct file *file = iocb->ki_filp;
1543
1544 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1545         return file->f_op->write_iter(iocb, iter);
1546 #else
1547         struct iovec iov;
1548         struct iov_iter i;
1549         ssize_t bytes = 0;
1550
1551         iov_for_each(iov, i, *iter) {
1552                 ssize_t res;
1553
1554                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1555                 if (-EIOCBQUEUED == res)
1556                         res = wait_on_sync_kiocb(iocb);
1557                 if (res <= 0) {
1558                         if (bytes == 0)
1559                                 bytes = res;
1560                         break;
1561                 }
1562
1563                 bytes += res;
1564                 if (res < iov.iov_len)
1565                         break;
1566         }
1567
1568         if (bytes > 0)
1569                 iov_iter_advance(iter, bytes);
1570         return bytes;
1571 #endif
1572 }
1573
1574 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1575                             struct iov_iter *iter, bool *cached)
1576 {
1577         struct file *file = iocb->ki_filp;
1578         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1579         struct pcc_file *pccf = &fd->fd_pcc_file;
1580         struct inode *inode = file_inode(file);
1581         ssize_t result;
1582
1583         ENTRY;
1584
1585         if (pccf->pccf_file == NULL) {
1586                 *cached = false;
1587                 RETURN(0);
1588         }
1589
1590         if (pccf->pccf_type != LU_PCC_READWRITE) {
1591                 *cached = false;
1592                 RETURN(-EAGAIN);
1593         }
1594
1595         pcc_io_init(inode, PIT_WRITE, cached);
1596         if (!*cached)
1597                 RETURN(0);
1598
1599         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1600                 GOTO(out, result = -ENOSPC);
1601
1602         iocb->ki_filp = pccf->pccf_file;
1603
1604         /* Since __pcc_file_write_iter makes write calls via
1605          * the normal vfs interface to the local PCC file system,
1606          * the inode lock is not needed.
1607          */
1608         result = __pcc_file_write_iter(iocb, iter);
1609         iocb->ki_filp = file;
1610 out:
1611         pcc_io_fini(inode);
1612         RETURN(result);
1613 }
1614
1615 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1616                       bool *cached)
1617 {
1618         int rc;
1619         const struct cred *old_cred;
1620         struct iattr attr2 = *attr;
1621         struct dentry *pcc_dentry;
1622         struct pcc_inode *pcci;
1623
1624         ENTRY;
1625
1626         if (!S_ISREG(inode->i_mode)) {
1627                 *cached = false;
1628                 RETURN(0);
1629         }
1630
1631         pcc_io_init(inode, PIT_SETATTR, cached);
1632         if (!*cached)
1633                 RETURN(0);
1634
1635         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1636                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1637                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1638         pcci = ll_i2pcci(inode);
1639         pcc_dentry = pcci->pcci_path.dentry;
1640         inode_lock(pcc_dentry->d_inode);
1641         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1642         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1643         revert_creds(old_cred);
1644         inode_unlock(pcc_dentry->d_inode);
1645
1646         pcc_io_fini(inode);
1647         RETURN(rc);
1648 }
1649
1650 int pcc_inode_getattr(struct inode *inode, bool *cached)
1651 {
1652         struct ll_inode_info *lli = ll_i2info(inode);
1653         const struct cred *old_cred;
1654         struct kstat stat;
1655         s64 atime;
1656         s64 mtime;
1657         s64 ctime;
1658         int rc;
1659
1660         ENTRY;
1661
1662         if (!S_ISREG(inode->i_mode)) {
1663                 *cached = false;
1664                 RETURN(0);
1665         }
1666
1667         pcc_io_init(inode, PIT_GETATTR, cached);
1668         if (!*cached)
1669                 RETURN(0);
1670
1671         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1672         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1673         revert_creds(old_cred);
1674         if (rc)
1675                 GOTO(out, rc);
1676
1677         ll_inode_size_lock(inode);
1678         if (inode->i_atime.tv_sec < lli->lli_atime ||
1679             lli->lli_update_atime) {
1680                 inode->i_atime.tv_sec = lli->lli_atime;
1681                 lli->lli_update_atime = 0;
1682         }
1683         inode->i_mtime.tv_sec = lli->lli_mtime;
1684         inode->i_ctime.tv_sec = lli->lli_ctime;
1685
1686         atime = inode->i_atime.tv_sec;
1687         mtime = inode->i_mtime.tv_sec;
1688         ctime = inode->i_ctime.tv_sec;
1689
1690         if (atime < stat.atime.tv_sec)
1691                 atime = stat.atime.tv_sec;
1692
1693         if (ctime < stat.ctime.tv_sec)
1694                 ctime = stat.ctime.tv_sec;
1695
1696         if (mtime < stat.mtime.tv_sec)
1697                 mtime = stat.mtime.tv_sec;
1698
1699         i_size_write(inode, stat.size);
1700         inode->i_blocks = stat.blocks;
1701
1702         inode->i_atime.tv_sec = atime;
1703         inode->i_mtime.tv_sec = mtime;
1704         inode->i_ctime.tv_sec = ctime;
1705
1706         ll_inode_size_unlock(inode);
1707 out:
1708         pcc_io_fini(inode);
1709         RETURN(rc);
1710 }
1711
1712 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1713                              struct pipe_inode_info *pipe,
1714                              size_t count, unsigned int flags,
1715                              bool *cached)
1716 {
1717         struct inode *inode = file_inode(in_file);
1718         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1719         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1720         ssize_t result;
1721
1722         ENTRY;
1723
1724         *cached = false;
1725         if (!pcc_file)
1726                 RETURN(0);
1727
1728         if (!file_inode(pcc_file)->i_fop->splice_read)
1729                 RETURN(-ENOTSUPP);
1730
1731         pcc_io_init(inode, PIT_SPLICE_READ, cached);
1732         if (!*cached)
1733                 RETURN(0);
1734
1735         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1736                                                           ppos, pipe, count,
1737                                                           flags);
1738
1739         pcc_io_fini(inode);
1740         RETURN(result);
1741 }
1742
1743 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1744               int datasync, bool *cached)
1745 {
1746         struct inode *inode = file_inode(file);
1747         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1748         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1749         int rc;
1750
1751         ENTRY;
1752
1753         if (!pcc_file) {
1754                 *cached = false;
1755                 RETURN(0);
1756         }
1757
1758         pcc_io_init(inode, PIT_FSYNC, cached);
1759         if (!*cached)
1760                 RETURN(0);
1761
1762         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1763                                                 start, end, datasync);
1764
1765         pcc_io_fini(inode);
1766         RETURN(rc);
1767 }
1768
1769 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1770                   bool *cached)
1771 {
1772         struct inode *inode = file_inode(file);
1773         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1774         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1775         struct pcc_inode *pcci;
1776         int rc = 0;
1777
1778         ENTRY;
1779
1780         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1781                 *cached = false;
1782                 RETURN(0);
1783         }
1784
1785         pcc_inode_lock(inode);
1786         pcci = ll_i2pcci(inode);
1787         if (pcci && pcc_inode_has_layout(pcci)) {
1788                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1789                 *cached = true;
1790                 vma->vm_file = pcc_file;
1791                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1792                 vma->vm_file = file;
1793                 /* Save the vm ops of backend PCC */
1794                 vma->vm_private_data = (void *)vma->vm_ops;
1795         } else {
1796                 *cached = false;
1797         }
1798         pcc_inode_unlock(inode);
1799
1800         RETURN(rc);
1801 }
1802
1803 void pcc_vm_open(struct vm_area_struct *vma)
1804 {
1805         struct pcc_inode *pcci;
1806         struct file *file = vma->vm_file;
1807         struct inode *inode = file_inode(file);
1808         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1809         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1810         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1811
1812         ENTRY;
1813
1814         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1815                 RETURN_EXIT;
1816
1817         pcc_inode_lock(inode);
1818         pcci = ll_i2pcci(inode);
1819         if (pcci && pcc_inode_has_layout(pcci)) {
1820                 vma->vm_file = pcc_file;
1821                 pcc_vm_ops->open(vma);
1822                 vma->vm_file = file;
1823         }
1824         pcc_inode_unlock(inode);
1825         EXIT;
1826 }
1827
1828 void pcc_vm_close(struct vm_area_struct *vma)
1829 {
1830         struct file *file = vma->vm_file;
1831         struct inode *inode = file_inode(file);
1832         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1833         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1834         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1835
1836         ENTRY;
1837
1838         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1839                 RETURN_EXIT;
1840
1841         pcc_inode_lock(inode);
1842         /* Layout lock maybe revoked here */
1843         vma->vm_file = pcc_file;
1844         pcc_vm_ops->close(vma);
1845         vma->vm_file = file;
1846         pcc_inode_unlock(inode);
1847         EXIT;
1848 }
1849
1850 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1851                      bool *cached)
1852 {
1853         struct page *page = vmf->page;
1854         struct mm_struct *mm = vma->vm_mm;
1855         struct file *file = vma->vm_file;
1856         struct inode *inode = file_inode(file);
1857         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1858         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1859         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1860         int rc;
1861
1862         ENTRY;
1863
1864         if (!pcc_file || !pcc_vm_ops) {
1865                 *cached = false;
1866                 RETURN(0);
1867         }
1868
1869         if (!pcc_vm_ops->page_mkwrite &&
1870             page->mapping == pcc_file->f_mapping) {
1871                 CDEBUG(D_MMAP,
1872                        "%s: PCC backend fs not support ->page_mkwrite()\n",
1873                        ll_i2sbi(inode)->ll_fsname);
1874                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1875                 up_read(&mm->mmap_sem);
1876                 *cached = true;
1877                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1878         }
1879         /* Pause to allow for a race with concurrent detach */
1880         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1881
1882         pcc_io_init(inode, PIT_PAGE_MKWRITE, cached);
1883         if (!*cached) {
1884                 /* This happens when the file is detached from PCC after got
1885                  * the fault page via ->fault() on the inode of the PCC copy.
1886                  * Here it can not simply fall back to normal Lustre I/O path.
1887                  * The reason is that the address space of fault page used by
1888                  * ->page_mkwrite() is still the one of PCC inode. In the
1889                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1890                  * handled as the address space of the fault page is not
1891                  * consistent with the one of the Lustre inode (though the
1892                  * fault page was truncated).
1893                  * As the file is detached from PCC, the fault page must
1894                  * be released frist, and retry the mmap write (->fault() and
1895                  * ->page_mkwrite).
1896                  * We use an ugly and tricky method by returning
1897                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1898                  * __do_page_fault and retry the memory fault handling.
1899                  */
1900                 if (page->mapping == pcc_file->f_mapping) {
1901                         *cached = true;
1902                         up_read(&mm->mmap_sem);
1903                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1904                 }
1905
1906                 RETURN(0);
1907         }
1908
1909         /*
1910          * This fault injection can also be used to simulate -ENOSPC and
1911          * -EDQUOT failure of underlying PCC backend fs.
1912          */
1913         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1914                 pcc_io_fini(inode);
1915                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_UNCACHE);
1916                 up_read(&mm->mmap_sem);
1917                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1918         }
1919
1920         vma->vm_file = pcc_file;
1921 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1922         rc = pcc_vm_ops->page_mkwrite(vmf);
1923 #else
1924         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1925 #endif
1926         vma->vm_file = file;
1927
1928         pcc_io_fini(inode);
1929         RETURN(rc);
1930 }
1931
1932 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1933               bool *cached)
1934 {
1935         struct file *file = vma->vm_file;
1936         struct inode *inode = file_inode(file);
1937         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1938         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1939         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1940         int rc;
1941
1942         ENTRY;
1943
1944         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1945                 *cached = false;
1946                 RETURN(0);
1947         }
1948
1949         pcc_io_init(inode, PIT_FAULT, cached);
1950         if (!*cached)
1951                 RETURN(0);
1952
1953         vma->vm_file = pcc_file;
1954 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1955         rc = pcc_vm_ops->fault(vmf);
1956 #else
1957         rc = pcc_vm_ops->fault(vma, vmf);
1958 #endif
1959         vma->vm_file = file;
1960
1961         pcc_io_fini(inode);
1962         RETURN(rc);
1963 }
1964
1965 static void pcc_layout_wait(struct pcc_inode *pcci)
1966 {
1967         struct l_wait_info lwi = { 0 };
1968
1969         while (atomic_read(&pcci->pcci_active_ios) > 0) {
1970                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1971                        atomic_read(&pcci->pcci_active_ios));
1972                 l_wait_event(pcci->pcci_waitq,
1973                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1974         }
1975 }
1976
1977 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1978 {
1979         pcci->pcci_type = LU_PCC_NONE;
1980         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1981         pcc_layout_wait(pcci);
1982 }
1983
1984 void pcc_layout_invalidate(struct inode *inode)
1985 {
1986         struct pcc_inode *pcci;
1987
1988         ENTRY;
1989
1990         pcc_inode_lock(inode);
1991         pcci = ll_i2pcci(inode);
1992         if (pcci && pcc_inode_has_layout(pcci)) {
1993                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1994                 __pcc_layout_invalidate(pcci);
1995
1996                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1997                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1998
1999                 pcc_inode_put(pcci);
2000         }
2001         pcc_inode_unlock(inode);
2002
2003         EXIT;
2004 }
2005
2006 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
2007 {
2008         int rc;
2009
2010         rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
2011         if (rc)
2012                 CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
2013                       ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
2014                       pcc_dentry->d_name.name, rc);
2015
2016         return rc;
2017 }
2018
2019 /* Create directory under base if directory does not exist */
2020 static struct dentry *
2021 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
2022 {
2023         int rc;
2024         struct dentry *dentry;
2025         struct inode *dir = base->d_inode;
2026
2027         inode_lock(dir);
2028         dentry = lookup_one_len(name, base, strlen(name));
2029         if (IS_ERR(dentry))
2030                 goto out;
2031
2032         if (d_is_positive(dentry))
2033                 goto out;
2034
2035         rc = vfs_mkdir(dir, dentry, mode);
2036         if (rc) {
2037                 dput(dentry);
2038                 dentry = ERR_PTR(rc);
2039                 goto out;
2040         }
2041 out:
2042         inode_unlock(dir);
2043         return dentry;
2044 }
2045
2046 static struct dentry *
2047 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
2048 {
2049         char *ptr, *entry_name;
2050         struct dentry *parent;
2051         struct dentry *child = ERR_PTR(-EINVAL);
2052
2053         ptr = path;
2054         while (*ptr == '/')
2055                 ptr++;
2056
2057         entry_name = ptr;
2058         parent = dget(root);
2059         while ((ptr = strchr(ptr, '/')) != NULL) {
2060                 *ptr = '\0';
2061                 child = pcc_mkdir(parent, entry_name, mode);
2062                 *ptr = '/';
2063                 dput(parent);
2064                 if (IS_ERR(child))
2065                         break;
2066
2067                 parent = child;
2068                 ptr++;
2069                 entry_name = ptr;
2070         }
2071
2072         return child;
2073 }
2074
2075 /* Create file under base. If file already exist, return failure */
2076 static struct dentry *
2077 pcc_create(struct dentry *base, const char *name, umode_t mode)
2078 {
2079         int rc;
2080         struct dentry *dentry;
2081         struct inode *dir = base->d_inode;
2082
2083         inode_lock(dir);
2084         dentry = lookup_one_len(name, base, strlen(name));
2085         if (IS_ERR(dentry))
2086                 goto out;
2087
2088         if (d_is_positive(dentry))
2089                 goto out;
2090
2091         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
2092         if (rc) {
2093                 dput(dentry);
2094                 dentry = ERR_PTR(rc);
2095                 goto out;
2096         }
2097 out:
2098         inode_unlock(dir);
2099         return dentry;
2100 }
2101
2102 static int __pcc_inode_create(struct pcc_dataset *dataset,
2103                               struct lu_fid *fid,
2104                               struct dentry **dentry)
2105 {
2106         char *path;
2107         struct dentry *base;
2108         struct dentry *child;
2109         int rc = 0;
2110
2111         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
2112         if (path == NULL)
2113                 return -ENOMEM;
2114
2115         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
2116
2117         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2118         if (IS_ERR(base)) {
2119                 rc = PTR_ERR(base);
2120                 GOTO(out, rc);
2121         }
2122
2123         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
2124         child = pcc_create(base, path, 0);
2125         if (IS_ERR(child)) {
2126                 rc = PTR_ERR(child);
2127                 GOTO(out_base, rc);
2128         }
2129         *dentry = child;
2130
2131 out_base:
2132         dput(base);
2133 out:
2134         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
2135         return rc;
2136 }
2137
2138 /* TODO: Set the project ID for PCC copy */
2139 int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
2140 {
2141         struct inode *inode = dentry->d_inode;
2142         struct iattr attr;
2143         int rc;
2144
2145         ENTRY;
2146
2147         attr.ia_valid = ATTR_UID | ATTR_GID;
2148         attr.ia_uid = uid;
2149         attr.ia_gid = gid;
2150
2151         inode_lock(inode);
2152         rc = notify_change(dentry, &attr, NULL);
2153         inode_unlock(inode);
2154
2155         RETURN(rc);
2156 }
2157
2158 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2159                      struct lu_fid *fid, struct dentry **pcc_dentry)
2160 {
2161         const struct cred *old_cred;
2162         int rc;
2163
2164         old_cred = override_creds(pcc_super_cred(sb));
2165         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2166         revert_creds(old_cred);
2167         return rc;
2168 }
2169
2170 int pcc_inode_create_fini(struct inode *inode, struct pcc_create_attach *pca)
2171 {
2172         struct dentry *pcc_dentry = pca->pca_dentry;
2173         const struct cred *old_cred;
2174         struct pcc_inode *pcci;
2175         int rc;
2176
2177         ENTRY;
2178
2179         if (!pca->pca_dataset)
2180                 RETURN(0);
2181
2182         if (!inode)
2183                 GOTO(out_dataset_put, rc = 0);
2184
2185         LASSERT(pcc_dentry);
2186
2187         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2188         pcc_inode_lock(inode);
2189         LASSERT(ll_i2pcci(inode) == NULL);
2190         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2191         if (pcci == NULL)
2192                 GOTO(out_put, rc = -ENOMEM);
2193
2194         rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
2195                                    old_cred->sgid);
2196         if (rc)
2197                 GOTO(out_put, rc);
2198
2199         pcc_inode_init(pcci, ll_i2info(inode));
2200         pcc_inode_attach_init(pca->pca_dataset, pcci, pcc_dentry,
2201                               LU_PCC_READWRITE);
2202
2203         rc = pcc_layout_xattr_set(pcci, 0);
2204         if (rc) {
2205                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2206                 pcc_inode_put(pcci);
2207                 GOTO(out_unlock, rc);
2208         }
2209
2210         /* Set the layout generation of newly created file with 0 */
2211         pcc_layout_gen_set(pcci, 0);
2212
2213 out_put:
2214         if (rc) {
2215                 (void) pcc_inode_remove(inode, pcc_dentry);
2216                 dput(pcc_dentry);
2217
2218                 if (pcci)
2219                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2220         }
2221 out_unlock:
2222         pcc_inode_unlock(inode);
2223         revert_creds(old_cred);
2224 out_dataset_put:
2225         pcc_dataset_put(pca->pca_dataset);
2226         RETURN(rc);
2227 }
2228
2229 void pcc_create_attach_cleanup(struct super_block *sb,
2230                                struct pcc_create_attach *pca)
2231 {
2232         if (!pca->pca_dataset)
2233                 return;
2234
2235         if (pca->pca_dentry) {
2236                 const struct cred *old_cred;
2237                 int rc;
2238
2239                 old_cred = override_creds(pcc_super_cred(sb));
2240                 rc = ll_vfs_unlink(pca->pca_dentry->d_parent->d_inode,
2241                                    pca->pca_dentry);
2242                 if (rc)
2243                         CWARN("failed to unlink PCC file %.*s, rc = %d\n",
2244                               pca->pca_dentry->d_name.len,
2245                               pca->pca_dentry->d_name.name, rc);
2246                 /* ignore the unlink failure */
2247                 revert_creds(old_cred);
2248                 dput(pca->pca_dentry);
2249         }
2250
2251         pcc_dataset_put(pca->pca_dataset);
2252 }
2253
2254 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2255                           loff_t *offset)
2256 {
2257         while (count > 0) {
2258                 ssize_t size;
2259
2260                 size = cfs_kernel_write(filp, buf, count, offset);
2261                 if (size < 0)
2262                         return size;
2263                 count -= size;
2264                 buf += size;
2265         }
2266         return 0;
2267 }
2268
2269 static int pcc_copy_data(struct file *src, struct file *dst)
2270 {
2271         int rc = 0;
2272         ssize_t rc2;
2273         loff_t pos, offset = 0;
2274         size_t buf_len = 1048576;
2275         void *buf;
2276
2277         ENTRY;
2278
2279         OBD_ALLOC_LARGE(buf, buf_len);
2280         if (buf == NULL)
2281                 RETURN(-ENOMEM);
2282
2283         while (1) {
2284                 pos = offset;
2285                 rc2 = cfs_kernel_read(src, buf, buf_len, &pos);
2286                 if (rc2 < 0)
2287                         GOTO(out_free, rc = rc2);
2288                 else if (rc2 == 0)
2289                         break;
2290
2291                 pos = offset;
2292                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2293                 if (rc < 0)
2294                         GOTO(out_free, rc);
2295                 offset += rc2;
2296         }
2297
2298 out_free:
2299         OBD_FREE_LARGE(buf, buf_len);
2300         RETURN(rc);
2301 }
2302
2303 static int pcc_attach_allowed_check(struct inode *inode)
2304 {
2305         struct ll_inode_info *lli = ll_i2info(inode);
2306         struct pcc_inode *pcci;
2307         int rc = 0;
2308
2309         ENTRY;
2310
2311         pcc_inode_lock(inode);
2312         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2313                 GOTO(out_unlock, rc = -EBUSY);
2314
2315         pcci = ll_i2pcci(inode);
2316         if (pcci && pcc_inode_has_layout(pcci))
2317                 GOTO(out_unlock, rc = -EEXIST);
2318
2319         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2320 out_unlock:
2321         pcc_inode_unlock(inode);
2322         RETURN(rc);
2323 }
2324
2325 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2326                          __u32 archive_id)
2327 {
2328         struct pcc_dataset *dataset;
2329         struct ll_inode_info *lli = ll_i2info(inode);
2330         struct pcc_inode *pcci;
2331         const struct cred *old_cred;
2332         struct dentry *dentry;
2333         struct file *pcc_filp;
2334         struct path path;
2335         int rc;
2336
2337         ENTRY;
2338
2339         rc = pcc_attach_allowed_check(inode);
2340         if (rc)
2341                 RETURN(rc);
2342
2343         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2344                                   LU_PCC_READWRITE, archive_id);
2345         if (dataset == NULL)
2346                 RETURN(-ENOENT);
2347
2348         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2349         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2350         if (rc) {
2351                 revert_creds(old_cred);
2352                 GOTO(out_dataset_put, rc);
2353         }
2354
2355         path.mnt = dataset->pccd_path.mnt;
2356         path.dentry = dentry;
2357 #ifdef HAVE_DENTRY_OPEN_USE_PATH
2358         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
2359                                current_cred());
2360 #else
2361         pcc_filp = dentry_open(path.dentry, path.mnt,
2362                                O_TRUNC | O_WRONLY | O_LARGEFILE,
2363                                current_cred());
2364 #endif
2365         if (IS_ERR_OR_NULL(pcc_filp)) {
2366                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2367                 revert_creds(old_cred);
2368                 GOTO(out_dentry, rc);
2369         }
2370
2371         rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
2372         revert_creds(old_cred);
2373         if (rc)
2374                 GOTO(out_fput, rc);
2375
2376         rc = pcc_copy_data(file, pcc_filp);
2377         if (rc)
2378                 GOTO(out_fput, rc);
2379
2380         /* Pause to allow for a race with concurrent HSM remove */
2381         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2382
2383         pcc_inode_lock(inode);
2384         pcci = ll_i2pcci(inode);
2385         LASSERT(!pcci);
2386         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2387         if (pcci == NULL)
2388                 GOTO(out_unlock, rc = -ENOMEM);
2389
2390         pcc_inode_init(pcci, lli);
2391         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
2392 out_unlock:
2393         pcc_inode_unlock(inode);
2394 out_fput:
2395         fput(pcc_filp);
2396 out_dentry:
2397         if (rc) {
2398                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2399                 (void) pcc_inode_remove(inode, dentry);
2400                 revert_creds(old_cred);
2401                 dput(dentry);
2402         }
2403 out_dataset_put:
2404         pcc_dataset_put(dataset);
2405         RETURN(rc);
2406 }
2407
2408 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2409                               __u32 gen, bool lease_broken, int rc,
2410                               bool attached)
2411 {
2412         struct ll_inode_info *lli = ll_i2info(inode);
2413         const struct cred *old_cred;
2414         struct pcc_inode *pcci;
2415         __u32 gen2;
2416
2417         ENTRY;
2418
2419         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2420         pcc_inode_lock(inode);
2421         pcci = ll_i2pcci(inode);
2422         if (rc || lease_broken) {
2423                 if (attached && pcci)
2424                         pcc_inode_put(pcci);
2425
2426                 GOTO(out_unlock, rc);
2427         }
2428
2429         /* PCC inode may be released due to layout lock revocatioin */
2430         if (!pcci)
2431                 GOTO(out_unlock, rc = -ESTALE);
2432
2433         LASSERT(attached);
2434         rc = pcc_layout_xattr_set(pcci, gen);
2435         if (rc)
2436                 GOTO(out_put, rc);
2437
2438         LASSERT(lli->lli_pcc_state & PCC_STATE_FL_ATTACHING);
2439         rc = ll_layout_refresh(inode, &gen2);
2440         if (!rc) {
2441                 if (gen2 == gen) {
2442                         pcc_layout_gen_set(pcci, gen);
2443                 } else {
2444                         CDEBUG(D_CACHE,
2445                                DFID" layout changed from %d to %d.\n",
2446                                PFID(ll_inode2fid(inode)), gen, gen2);
2447                         GOTO(out_put, rc = -ESTALE);
2448                 }
2449         }
2450
2451 out_put:
2452         if (rc) {
2453                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2454                 pcc_inode_put(pcci);
2455         }
2456 out_unlock:
2457         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2458         pcc_inode_unlock(inode);
2459         revert_creds(old_cred);
2460         RETURN(rc);
2461 }
2462
2463 static int pcc_hsm_remove(struct inode *inode)
2464 {
2465         struct hsm_user_request *hur;
2466         __u32 gen;
2467         int len;
2468         int rc;
2469
2470         ENTRY;
2471
2472         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2473         if (rc) {
2474                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2475                        PFID(&ll_i2info(inode)->lli_fid), rc);
2476                 RETURN(rc);
2477         }
2478
2479         ll_layout_refresh(inode, &gen);
2480
2481         len = sizeof(struct hsm_user_request) +
2482               sizeof(struct hsm_user_item);
2483         OBD_ALLOC(hur, len);
2484         if (hur == NULL)
2485                 RETURN(-ENOMEM);
2486
2487         hur->hur_request.hr_action = HUA_REMOVE;
2488         hur->hur_request.hr_archive_id = 0;
2489         hur->hur_request.hr_flags = 0;
2490         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2491                sizeof(hur->hur_user_item[0].hui_fid));
2492         hur->hur_user_item[0].hui_extent.offset = 0;
2493         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2494         hur->hur_request.hr_itemcount = 1;
2495         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2496                            len, hur, NULL);
2497         if (rc)
2498                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2499                        PFID(&ll_i2info(inode)->lli_fid), rc);
2500
2501         OBD_FREE(hur, len);
2502         RETURN(rc);
2503 }
2504
2505 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2506 {
2507         struct ll_inode_info *lli = ll_i2info(inode);
2508         struct pcc_inode *pcci;
2509         bool hsm_remove = false;
2510         int rc = 0;
2511
2512         ENTRY;
2513
2514         pcc_inode_lock(inode);
2515         pcci = lli->lli_pcc_inode;
2516         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2517             !pcc_inode_has_layout(pcci))
2518                 GOTO(out_unlock, rc = 0);
2519
2520         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2521
2522         if (pcci->pcci_type == LU_PCC_READWRITE) {
2523                 if (opt == PCC_DETACH_OPT_UNCACHE)
2524                         hsm_remove = true;
2525
2526                 __pcc_layout_invalidate(pcci);
2527                 pcc_inode_put(pcci);
2528         }
2529
2530 out_unlock:
2531         pcc_inode_unlock(inode);
2532         if (hsm_remove) {
2533                 const struct cred *old_cred;
2534
2535                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2536                 rc = pcc_hsm_remove(inode);
2537                 revert_creds(old_cred);
2538         }
2539
2540         RETURN(rc);
2541 }
2542
2543 int pcc_ioctl_state(struct file *file, struct inode *inode,
2544                     struct lu_pcc_state *state)
2545 {
2546         int rc = 0;
2547         int count;
2548         char *buf;
2549         char *path;
2550         int buf_len = sizeof(state->pccs_path);
2551         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2552         struct pcc_file *pccf = &fd->fd_pcc_file;
2553         struct pcc_inode *pcci;
2554
2555         ENTRY;
2556
2557         if (buf_len <= 0)
2558                 RETURN(-EINVAL);
2559
2560         OBD_ALLOC(buf, buf_len);
2561         if (buf == NULL)
2562                 RETURN(-ENOMEM);
2563
2564         pcc_inode_lock(inode);
2565         pcci = ll_i2pcci(inode);
2566         if (pcci == NULL) {
2567                 state->pccs_type = LU_PCC_NONE;
2568                 GOTO(out_unlock, rc = 0);
2569         }
2570
2571         count = atomic_read(&pcci->pcci_refcount);
2572         if (count == 0) {
2573                 state->pccs_type = LU_PCC_NONE;
2574                 state->pccs_open_count = 0;
2575                 GOTO(out_unlock, rc = 0);
2576         }
2577
2578         if (pcc_inode_has_layout(pcci))
2579                 count--;
2580         if (pccf->pccf_file != NULL)
2581                 count--;
2582         state->pccs_type = pcci->pcci_type;
2583         state->pccs_open_count = count;
2584         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2585         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2586         if (IS_ERR(path))
2587                 GOTO(out_unlock, rc = PTR_ERR(path));
2588
2589         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2590                 GOTO(out_unlock, rc = -ENAMETOOLONG);
2591
2592 out_unlock:
2593         pcc_inode_unlock(inode);
2594         OBD_FREE(buf, buf_len);
2595         RETURN(rc);
2596 }