Whamcloud - gitweb
fcebed4ffbfb3ba5796e14c822e65a21e9f3d16c
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         init_rwsem(&super->pccs_rw_sem);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127
128         return 0;
129 }
130
131 /* Rule based auto caching */
132 static void pcc_id_list_free(struct list_head *id_list)
133 {
134         struct pcc_match_id *id, *n;
135
136         list_for_each_entry_safe(id, n, id_list, pmi_linkage) {
137                 list_del_init(&id->pmi_linkage);
138                 OBD_FREE_PTR(id);
139         }
140 }
141
142 static void pcc_fname_list_free(struct list_head *fname_list)
143 {
144         struct pcc_match_fname *fname, *n;
145
146         list_for_each_entry_safe(fname, n, fname_list, pmf_linkage) {
147                 OBD_FREE(fname->pmf_name, strlen(fname->pmf_name) + 1);
148                 list_del_init(&fname->pmf_linkage);
149                 OBD_FREE_PTR(fname);
150         }
151 }
152
153 static void pcc_expression_free(struct pcc_expression *expr)
154 {
155         LASSERT(expr->pe_field >= PCC_FIELD_UID &&
156                 expr->pe_field < PCC_FIELD_MAX);
157         switch (expr->pe_field) {
158         case PCC_FIELD_UID:
159         case PCC_FIELD_GID:
160         case PCC_FIELD_PROJID:
161                 pcc_id_list_free(&expr->pe_cond);
162                 break;
163         case PCC_FIELD_FNAME:
164                 pcc_fname_list_free(&expr->pe_cond);
165                 break;
166         default:
167                 LBUG();
168         }
169         OBD_FREE_PTR(expr);
170 }
171
172 static void pcc_conjunction_free(struct pcc_conjunction *conjunction)
173 {
174         struct pcc_expression *expression, *n;
175
176         LASSERT(list_empty(&conjunction->pc_linkage));
177         list_for_each_entry_safe(expression, n,
178                                  &conjunction->pc_expressions,
179                                  pe_linkage) {
180                 list_del_init(&expression->pe_linkage);
181                 pcc_expression_free(expression);
182         }
183         OBD_FREE_PTR(conjunction);
184 }
185
186 static void pcc_rule_conds_free(struct list_head *cond_list)
187 {
188         struct pcc_conjunction *conjunction, *n;
189
190         list_for_each_entry_safe(conjunction, n, cond_list, pc_linkage) {
191                 list_del_init(&conjunction->pc_linkage);
192                 pcc_conjunction_free(conjunction);
193         }
194 }
195
196 static void pcc_cmd_fini(struct pcc_cmd *cmd)
197 {
198         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
199                 if (!list_empty(&cmd->u.pccc_add.pccc_conds))
200                         pcc_rule_conds_free(&cmd->u.pccc_add.pccc_conds);
201                 if (cmd->u.pccc_add.pccc_conds_str)
202                         OBD_FREE(cmd->u.pccc_add.pccc_conds_str,
203                                  strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
204         }
205 }
206
207 #define PCC_DISJUNCTION_DELIM   (',')
208 #define PCC_CONJUNCTION_DELIM   ('&')
209 #define PCC_EXPRESSION_DELIM    ('=')
210
211 static int
212 pcc_fname_list_add(struct cfs_lstr *id, struct list_head *fname_list)
213 {
214         struct pcc_match_fname *fname;
215
216         OBD_ALLOC(fname, sizeof(struct pcc_match_fname));
217         if (fname == NULL)
218                 return -ENOMEM;
219
220         OBD_ALLOC(fname->pmf_name, id->ls_len + 1);
221         if (fname->pmf_name == NULL) {
222                 OBD_FREE(fname, sizeof(struct pcc_match_fname));
223                 return -ENOMEM;
224         }
225
226         memcpy(fname->pmf_name, id->ls_str, id->ls_len);
227         list_add_tail(&fname->pmf_linkage, fname_list);
228         return 0;
229 }
230
231 static int
232 pcc_fname_list_parse(char *str, int len, struct list_head *fname_list)
233 {
234         struct cfs_lstr src;
235         struct cfs_lstr res;
236         int rc = 0;
237
238         ENTRY;
239
240         src.ls_str = str;
241         src.ls_len = len;
242         INIT_LIST_HEAD(fname_list);
243         while (src.ls_str) {
244                 rc = cfs_gettok(&src, ' ', &res);
245                 if (rc == 0) {
246                         rc = -EINVAL;
247                         break;
248                 }
249                 rc = pcc_fname_list_add(&res, fname_list);
250                 if (rc)
251                         break;
252         }
253         if (rc)
254                 pcc_fname_list_free(fname_list);
255         RETURN(rc);
256 }
257
258 static int
259 pcc_id_list_parse(char *str, int len, struct list_head *id_list,
260                   enum pcc_field type)
261 {
262         struct cfs_lstr src;
263         struct cfs_lstr res;
264         int rc = 0;
265
266         ENTRY;
267
268         if (type != PCC_FIELD_UID && type != PCC_FIELD_GID &&
269             type != PCC_FIELD_PROJID)
270                 RETURN(-EINVAL);
271
272         src.ls_str = str;
273         src.ls_len = len;
274         INIT_LIST_HEAD(id_list);
275         while (src.ls_str) {
276                 struct pcc_match_id *id;
277                 __u32 id_val;
278
279                 if (cfs_gettok(&src, ' ', &res) == 0)
280                         GOTO(out, rc = -EINVAL);
281
282                 if (!cfs_str2num_check(res.ls_str, res.ls_len,
283                                        &id_val, 0, (u32)~0U))
284                         GOTO(out, rc = -EINVAL);
285
286                 OBD_ALLOC_PTR(id);
287                 if (id == NULL)
288                         GOTO(out, rc = -ENOMEM);
289
290                 id->pmi_id = id_val;
291                 list_add_tail(&id->pmi_linkage, id_list);
292         }
293 out:
294         if (rc)
295                 pcc_id_list_free(id_list);
296         RETURN(rc);
297 }
298
299 static inline bool
300 pcc_check_field(struct cfs_lstr *field, char *str)
301 {
302         int len = strlen(str);
303
304         return (field->ls_len == len &&
305                 strncmp(field->ls_str, str, len) == 0);
306 }
307
308 static int
309 pcc_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
310 {
311         struct pcc_expression *expr;
312         struct cfs_lstr field;
313         int rc = 0;
314
315         OBD_ALLOC(expr, sizeof(struct pcc_expression));
316         if (expr == NULL)
317                 return -ENOMEM;
318
319         rc = cfs_gettok(src, PCC_EXPRESSION_DELIM, &field);
320         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
321             src->ls_str[src->ls_len - 1] != '}')
322                 GOTO(out, rc = -EINVAL);
323
324         /* Skip '{' and '}' */
325         src->ls_str++;
326         src->ls_len -= 2;
327
328         if (pcc_check_field(&field, "uid")) {
329                 if (pcc_id_list_parse(src->ls_str,
330                                       src->ls_len,
331                                       &expr->pe_cond,
332                                       PCC_FIELD_UID) < 0)
333                         GOTO(out, rc = -EINVAL);
334                 expr->pe_field = PCC_FIELD_UID;
335         } else if (pcc_check_field(&field, "gid")) {
336                 if (pcc_id_list_parse(src->ls_str,
337                                       src->ls_len,
338                                       &expr->pe_cond,
339                                       PCC_FIELD_GID) < 0)
340                         GOTO(out, rc = -EINVAL);
341                 expr->pe_field = PCC_FIELD_GID;
342         } else if (pcc_check_field(&field, "projid")) {
343                 if (pcc_id_list_parse(src->ls_str,
344                                       src->ls_len,
345                                       &expr->pe_cond,
346                                       PCC_FIELD_PROJID) < 0)
347                         GOTO(out, rc = -EINVAL);
348                 expr->pe_field = PCC_FIELD_PROJID;
349         } else if (pcc_check_field(&field, "fname")) {
350                 if (pcc_fname_list_parse(src->ls_str,
351                                          src->ls_len,
352                                          &expr->pe_cond) < 0)
353                         GOTO(out, rc = -EINVAL);
354                 expr->pe_field = PCC_FIELD_FNAME;
355         } else {
356                 GOTO(out, rc = -EINVAL);
357         }
358
359         list_add_tail(&expr->pe_linkage, cond_list);
360         return 0;
361 out:
362         OBD_FREE_PTR(expr);
363         return rc;
364 }
365
366 static int
367 pcc_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
368 {
369         struct pcc_conjunction *conjunction;
370         struct cfs_lstr expr;
371         int rc = 0;
372
373         OBD_ALLOC(conjunction, sizeof(struct pcc_conjunction));
374         if (conjunction == NULL)
375                 return -ENOMEM;
376
377         INIT_LIST_HEAD(&conjunction->pc_expressions);
378         list_add_tail(&conjunction->pc_linkage, cond_list);
379
380         while (src->ls_str) {
381                 rc = cfs_gettok(src, PCC_CONJUNCTION_DELIM, &expr);
382                 if (rc == 0) {
383                         rc = -EINVAL;
384                         break;
385                 }
386                 rc = pcc_expression_parse(&expr,
387                                           &conjunction->pc_expressions);
388                 if (rc)
389                         break;
390         }
391         return rc;
392 }
393
394 static int pcc_conds_parse(char *str, int len, struct list_head *cond_list)
395 {
396         struct cfs_lstr src;
397         struct cfs_lstr res;
398         int rc = 0;
399
400         src.ls_str = str;
401         src.ls_len = len;
402         INIT_LIST_HEAD(cond_list);
403         while (src.ls_str) {
404                 rc = cfs_gettok(&src, PCC_DISJUNCTION_DELIM, &res);
405                 if (rc == 0) {
406                         rc = -EINVAL;
407                         break;
408                 }
409                 rc = pcc_conjunction_parse(&res, cond_list);
410                 if (rc)
411                         break;
412         }
413         return rc;
414 }
415
416 static int pcc_id_parse(struct pcc_cmd *cmd, const char *id)
417 {
418         int rc;
419
420         OBD_ALLOC(cmd->u.pccc_add.pccc_conds_str, strlen(id) + 1);
421         if (cmd->u.pccc_add.pccc_conds_str == NULL)
422                 return -ENOMEM;
423
424         memcpy(cmd->u.pccc_add.pccc_conds_str, id, strlen(id));
425
426         rc = pcc_conds_parse(cmd->u.pccc_add.pccc_conds_str,
427                              strlen(cmd->u.pccc_add.pccc_conds_str),
428                              &cmd->u.pccc_add.pccc_conds);
429         if (rc)
430                 pcc_cmd_fini(cmd);
431
432         return rc;
433 }
434
435 static int
436 pcc_parse_value_pair(struct pcc_cmd *cmd, char *buffer)
437 {
438         char *key, *val;
439         unsigned long id;
440         int rc;
441
442         val = buffer;
443         key = strsep(&val, "=");
444         if (val == NULL || strlen(val) == 0)
445                 return -EINVAL;
446
447         /* Key of the value pair */
448         if (strcmp(key, "rwid") == 0) {
449                 rc = kstrtoul(val, 10, &id);
450                 if (rc)
451                         return rc;
452                 if (id <= 0)
453                         return -EINVAL;
454                 cmd->u.pccc_add.pccc_rwid = id;
455         } else if (strcmp(key, "roid") == 0) {
456                 rc = kstrtoul(val, 10, &id);
457                 if (rc)
458                         return rc;
459                 if (id <= 0)
460                         return -EINVAL;
461                 cmd->u.pccc_add.pccc_roid = id;
462         } else if (strcmp(key, "open_attach") == 0) {
463                 rc = kstrtoul(val, 10, &id);
464                 if (rc)
465                         return rc;
466                 if (id > 0)
467                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_OPEN_ATTACH;
468         } else if (strcmp(key, "rwpcc") == 0) {
469                 rc = kstrtoul(val, 10, &id);
470                 if (rc)
471                         return rc;
472                 if (id > 0)
473                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_RWPCC;
474         } else if (strcmp(key, "ropcc") == 0) {
475                 rc = kstrtoul(val, 10, &id);
476                 if (rc)
477                         return rc;
478                 if (id > 0)
479                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_ROPCC;
480         } else {
481                 return -EINVAL;
482         }
483
484         return 0;
485 }
486
487 static int
488 pcc_parse_value_pairs(struct pcc_cmd *cmd, char *buffer)
489 {
490         char *val;
491         char *token;
492         int rc;
493
494         val = buffer;
495         while (val != NULL && strlen(val) != 0) {
496                 token = strsep(&val, " ");
497                 rc = pcc_parse_value_pair(cmd, token);
498                 if (rc)
499                         return rc;
500         }
501
502         switch (cmd->pccc_cmd) {
503         case PCC_ADD_DATASET:
504                 if (cmd->u.pccc_add.pccc_flags & PCC_DATASET_RWPCC &&
505                     cmd->u.pccc_add.pccc_flags & PCC_DATASET_ROPCC)
506                         return -EINVAL;
507                 /*
508                  * By default, a PCC backend can provide caching service for
509                  * both RW-PCC and RO-PCC.
510                  */
511                 if ((cmd->u.pccc_add.pccc_flags & PCC_DATASET_PCC_ALL) == 0)
512                         cmd->u.pccc_add.pccc_flags |= PCC_DATASET_PCC_ALL;
513                 break;
514         case PCC_DEL_DATASET:
515         case PCC_CLEAR_ALL:
516                 break;
517         default:
518                 return -EINVAL;
519         }
520         return 0;
521 }
522
523 static void
524 pcc_dataset_rule_fini(struct pcc_match_rule *rule)
525 {
526         if (!list_empty(&rule->pmr_conds))
527                 pcc_rule_conds_free(&rule->pmr_conds);
528         LASSERT(rule->pmr_conds_str != NULL);
529         OBD_FREE(rule->pmr_conds_str, strlen(rule->pmr_conds_str) + 1);
530 }
531
532 static int
533 pcc_dataset_rule_init(struct pcc_match_rule *rule, struct pcc_cmd *cmd)
534 {
535         int rc = 0;
536
537         LASSERT(cmd->u.pccc_add.pccc_conds_str);
538         OBD_ALLOC(rule->pmr_conds_str,
539                   strlen(cmd->u.pccc_add.pccc_conds_str) + 1);
540         if (rule->pmr_conds_str == NULL)
541                 return -ENOMEM;
542
543         memcpy(rule->pmr_conds_str,
544                cmd->u.pccc_add.pccc_conds_str,
545                strlen(cmd->u.pccc_add.pccc_conds_str));
546
547         INIT_LIST_HEAD(&rule->pmr_conds);
548         if (!list_empty(&cmd->u.pccc_add.pccc_conds))
549                 rc = pcc_conds_parse(rule->pmr_conds_str,
550                                           strlen(rule->pmr_conds_str),
551                                           &rule->pmr_conds);
552
553         if (rc)
554                 pcc_dataset_rule_fini(rule);
555
556         return rc;
557 }
558
559 /* Rule Matching */
560 static int
561 pcc_id_list_match(struct list_head *id_list, __u32 id_val)
562 {
563         struct pcc_match_id *id;
564
565         list_for_each_entry(id, id_list, pmi_linkage) {
566                 if (id->pmi_id == id_val)
567                         return 1;
568         }
569         return 0;
570 }
571
572 static bool
573 cfs_match_wildcard(const char *pattern, const char *content)
574 {
575         if (*pattern == '\0' && *content == '\0')
576                 return true;
577
578         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
579                 return false;
580
581         while (*pattern == *content) {
582                 pattern++;
583                 content++;
584                 if (*pattern == '\0' && *content == '\0')
585                         return true;
586
587                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
588                     *content == '\0')
589                         return false;
590         }
591
592         if (*pattern == '*')
593                 return (cfs_match_wildcard(pattern + 1, content) ||
594                         cfs_match_wildcard(pattern, content + 1));
595
596         return false;
597 }
598
599 static int
600 pcc_fname_list_match(struct list_head *fname_list, const char *name)
601 {
602         struct pcc_match_fname *fname;
603
604         list_for_each_entry(fname, fname_list, pmf_linkage) {
605                 if (cfs_match_wildcard(fname->pmf_name, name))
606                         return 1;
607         }
608         return 0;
609 }
610
611 static int
612 pcc_expression_match(struct pcc_expression *expr, struct pcc_matcher *matcher)
613 {
614         switch (expr->pe_field) {
615         case PCC_FIELD_UID:
616                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_uid);
617         case PCC_FIELD_GID:
618                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_gid);
619         case PCC_FIELD_PROJID:
620                 return pcc_id_list_match(&expr->pe_cond, matcher->pm_projid);
621         case PCC_FIELD_FNAME:
622                 return pcc_fname_list_match(&expr->pe_cond,
623                                             matcher->pm_name->name);
624         default:
625                 return 0;
626         }
627 }
628
629 static int
630 pcc_conjunction_match(struct pcc_conjunction *conjunction,
631                       struct pcc_matcher *matcher)
632 {
633         struct pcc_expression *expr;
634         int matched;
635
636         list_for_each_entry(expr, &conjunction->pc_expressions, pe_linkage) {
637                 matched = pcc_expression_match(expr, matcher);
638                 if (!matched)
639                         return 0;
640         }
641
642         return 1;
643 }
644
645 static int
646 pcc_cond_match(struct pcc_match_rule *rule, struct pcc_matcher *matcher)
647 {
648         struct pcc_conjunction *conjunction;
649         int matched;
650
651         list_for_each_entry(conjunction, &rule->pmr_conds, pc_linkage) {
652                 matched = pcc_conjunction_match(conjunction, matcher);
653                 if (matched)
654                         return 1;
655         }
656
657         return 0;
658 }
659
660 struct pcc_dataset*
661 pcc_dataset_match_get(struct pcc_super *super, struct pcc_matcher *matcher)
662 {
663         struct pcc_dataset *dataset;
664         struct pcc_dataset *selected = NULL;
665
666         down_read(&super->pccs_rw_sem);
667         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
668                 if (!(dataset->pccd_flags & PCC_DATASET_RWPCC))
669                         continue;
670
671                 if (pcc_cond_match(&dataset->pccd_rule, matcher)) {
672                         atomic_inc(&dataset->pccd_refcount);
673                         selected = dataset;
674                         break;
675                 }
676         }
677         up_read(&super->pccs_rw_sem);
678         if (selected)
679                 CDEBUG(D_CACHE, "PCC create, matched %s - %d:%d:%d:%s\n",
680                        dataset->pccd_rule.pmr_conds_str,
681                        matcher->pm_uid, matcher->pm_gid,
682                        matcher->pm_projid, matcher->pm_name->name);
683
684         return selected;
685 }
686
687 /**
688  * pcc_dataset_add - Add a Cache policy to control which files need be
689  * cached and where it will be cached.
690  *
691  * @super:      superblock of pcc
692  * @cmd:        pcc command
693  */
694 static int
695 pcc_dataset_add(struct pcc_super *super, struct pcc_cmd *cmd)
696 {
697         char *pathname = cmd->pccc_pathname;
698         struct pcc_dataset *dataset;
699         struct pcc_dataset *tmp;
700         bool found = false;
701         int rc;
702
703         OBD_ALLOC_PTR(dataset);
704         if (dataset == NULL)
705                 return -ENOMEM;
706
707         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
708         if (unlikely(rc)) {
709                 OBD_FREE_PTR(dataset);
710                 return rc;
711         }
712         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
713         dataset->pccd_rwid = cmd->u.pccc_add.pccc_rwid;
714         dataset->pccd_roid = cmd->u.pccc_add.pccc_roid;
715         dataset->pccd_flags = cmd->u.pccc_add.pccc_flags;
716         atomic_set(&dataset->pccd_refcount, 1);
717
718         rc = pcc_dataset_rule_init(&dataset->pccd_rule, cmd);
719         if (rc) {
720                 pcc_dataset_put(dataset);
721                 return rc;
722         }
723
724         down_write(&super->pccs_rw_sem);
725         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
726                 if (strcmp(tmp->pccd_pathname, pathname) == 0 ||
727                     (dataset->pccd_rwid != 0 &&
728                      dataset->pccd_rwid == tmp->pccd_rwid) ||
729                     (dataset->pccd_roid != 0 &&
730                      dataset->pccd_roid == tmp->pccd_roid)) {
731                         found = true;
732                         break;
733                 }
734         }
735         if (!found)
736                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
737         up_write(&super->pccs_rw_sem);
738
739         if (found) {
740                 pcc_dataset_put(dataset);
741                 rc = -EEXIST;
742         }
743
744         return rc;
745 }
746
747 struct pcc_dataset *
748 pcc_dataset_get(struct pcc_super *super, enum lu_pcc_type type, __u32 id)
749 {
750         struct pcc_dataset *dataset;
751         struct pcc_dataset *selected = NULL;
752
753         if (id == 0)
754                 return NULL;
755
756         /*
757          * archive ID (read-write ID) or read-only ID is unique in the list,
758          * we just return last added one as first priority.
759          */
760         down_read(&super->pccs_rw_sem);
761         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
762                 if (type == LU_PCC_READWRITE && (dataset->pccd_rwid != id ||
763                     !(dataset->pccd_flags & PCC_DATASET_RWPCC)))
764                         continue;
765                 atomic_inc(&dataset->pccd_refcount);
766                 selected = dataset;
767                 break;
768         }
769         up_read(&super->pccs_rw_sem);
770         if (selected)
771                 CDEBUG(D_CACHE, "matched id %u, PCC mode %d\n", id, type);
772
773         return selected;
774 }
775
776 void
777 pcc_dataset_put(struct pcc_dataset *dataset)
778 {
779         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
780                 pcc_dataset_rule_fini(&dataset->pccd_rule);
781                 path_put(&dataset->pccd_path);
782                 OBD_FREE_PTR(dataset);
783         }
784 }
785
786 static int
787 pcc_dataset_del(struct pcc_super *super, char *pathname)
788 {
789         struct list_head *l, *tmp;
790         struct pcc_dataset *dataset;
791         int rc = -ENOENT;
792
793         down_write(&super->pccs_rw_sem);
794         list_for_each_safe(l, tmp, &super->pccs_datasets) {
795                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
796                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
797                         list_del_init(&dataset->pccd_linkage);
798                         pcc_dataset_put(dataset);
799                         rc = 0;
800                         break;
801                 }
802         }
803         up_write(&super->pccs_rw_sem);
804         return rc;
805 }
806
807 static void
808 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
809 {
810         seq_printf(m, "%s:\n", dataset->pccd_pathname);
811         seq_printf(m, "  rwid: %u\n", dataset->pccd_rwid);
812         seq_printf(m, "  flags: %x\n", dataset->pccd_flags);
813         seq_printf(m, "  autocache: %s\n", dataset->pccd_rule.pmr_conds_str);
814 }
815
816 int
817 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
818 {
819         struct pcc_dataset *dataset;
820
821         down_read(&super->pccs_rw_sem);
822         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
823                 pcc_dataset_dump(dataset, m);
824         }
825         up_read(&super->pccs_rw_sem);
826         return 0;
827 }
828
829 static void pcc_remove_datasets(struct pcc_super *super)
830 {
831         struct pcc_dataset *dataset, *tmp;
832
833         down_write(&super->pccs_rw_sem);
834         list_for_each_entry_safe(dataset, tmp,
835                                  &super->pccs_datasets, pccd_linkage) {
836                 list_del(&dataset->pccd_linkage);
837                 pcc_dataset_put(dataset);
838         }
839         up_write(&super->pccs_rw_sem);
840 }
841
842 void pcc_super_fini(struct pcc_super *super)
843 {
844         pcc_remove_datasets(super);
845         put_cred(super->pccs_cred);
846 }
847
848 static bool pathname_is_valid(const char *pathname)
849 {
850         /* Needs to be absolute path */
851         if (pathname == NULL || strlen(pathname) == 0 ||
852             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
853                 return false;
854         return true;
855 }
856
857 static struct pcc_cmd *
858 pcc_cmd_parse(char *buffer, unsigned long count)
859 {
860         static struct pcc_cmd *cmd;
861         char *token;
862         char *val;
863         int rc = 0;
864
865         OBD_ALLOC_PTR(cmd);
866         if (cmd == NULL)
867                 GOTO(out, rc = -ENOMEM);
868
869         /* clear all setting */
870         if (strncmp(buffer, "clear", 5) == 0) {
871                 cmd->pccc_cmd = PCC_CLEAR_ALL;
872                 GOTO(out, rc = 0);
873         }
874
875         val = buffer;
876         token = strsep(&val, " ");
877         if (val == NULL || strlen(val) == 0)
878                 GOTO(out_free_cmd, rc = -EINVAL);
879
880         /* Type of the command */
881         if (strcmp(token, "add") == 0)
882                 cmd->pccc_cmd = PCC_ADD_DATASET;
883         else if (strcmp(token, "del") == 0)
884                 cmd->pccc_cmd = PCC_DEL_DATASET;
885         else
886                 GOTO(out_free_cmd, rc = -EINVAL);
887
888         /* Pathname of the dataset */
889         token = strsep(&val, " ");
890         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
891             !pathname_is_valid(token))
892                 GOTO(out_free_cmd, rc = -EINVAL);
893         cmd->pccc_pathname = token;
894
895         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
896                 /* List of ID */
897                 LASSERT(val);
898                 token = val;
899                 val = strrchr(token, '}');
900                 if (!val)
901                         GOTO(out_free_cmd, rc = -EINVAL);
902
903                 /* Skip '}' */
904                 val++;
905                 if (*val == '\0') {
906                         val = NULL;
907                 } else if (*val == ' ') {
908                         *val = '\0';
909                         val++;
910                 } else {
911                         GOTO(out_free_cmd, rc = -EINVAL);
912                 }
913
914                 rc = pcc_id_parse(cmd, token);
915                 if (rc)
916                         GOTO(out_free_cmd, rc);
917
918                 rc = pcc_parse_value_pairs(cmd, val);
919                 if (rc)
920                         GOTO(out_cmd_fini, rc = -EINVAL);
921         }
922         goto out;
923 out_cmd_fini:
924         pcc_cmd_fini(cmd);
925 out_free_cmd:
926         OBD_FREE_PTR(cmd);
927 out:
928         if (rc)
929                 cmd = ERR_PTR(rc);
930         return cmd;
931 }
932
933 int pcc_cmd_handle(char *buffer, unsigned long count,
934                    struct pcc_super *super)
935 {
936         int rc = 0;
937         struct pcc_cmd *cmd;
938
939         cmd = pcc_cmd_parse(buffer, count);
940         if (IS_ERR(cmd))
941                 return PTR_ERR(cmd);
942
943         switch (cmd->pccc_cmd) {
944         case PCC_ADD_DATASET:
945                 rc = pcc_dataset_add(super, cmd);
946                 break;
947         case PCC_DEL_DATASET:
948                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
949                 break;
950         case PCC_CLEAR_ALL:
951                 pcc_remove_datasets(super);
952                 break;
953         default:
954                 rc = -EINVAL;
955                 break;
956         }
957
958         pcc_cmd_fini(cmd);
959         OBD_FREE_PTR(cmd);
960         return rc;
961 }
962
963 static inline void pcc_inode_lock(struct inode *inode)
964 {
965         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
966 }
967
968 static inline void pcc_inode_unlock(struct inode *inode)
969 {
970         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
971 }
972
973 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
974 {
975         pcci->pcci_lli = lli;
976         lli->lli_pcc_inode = pcci;
977         lli->lli_pcc_state = PCC_STATE_FL_NONE;
978         atomic_set(&pcci->pcci_refcount, 0);
979         pcci->pcci_type = LU_PCC_NONE;
980         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
981         atomic_set(&pcci->pcci_active_ios, 0);
982         init_waitqueue_head(&pcci->pcci_waitq);
983 }
984
985 static void pcc_inode_fini(struct pcc_inode *pcci)
986 {
987         struct ll_inode_info *lli = pcci->pcci_lli;
988
989         path_put(&pcci->pcci_path);
990         pcci->pcci_type = LU_PCC_NONE;
991         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
992         lli->lli_pcc_inode = NULL;
993 }
994
995 static void pcc_inode_get(struct pcc_inode *pcci)
996 {
997         atomic_inc(&pcci->pcci_refcount);
998 }
999
1000 static void pcc_inode_put(struct pcc_inode *pcci)
1001 {
1002         if (atomic_dec_and_test(&pcci->pcci_refcount))
1003                 pcc_inode_fini(pcci);
1004 }
1005
1006 void pcc_inode_free(struct inode *inode)
1007 {
1008         struct pcc_inode *pcci = ll_i2pcci(inode);
1009
1010         if (pcci) {
1011                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
1012                 pcc_inode_put(pcci);
1013         }
1014 }
1015
1016 /*
1017  * TODO:
1018  * As Andreas suggested, we'd better use new layout to
1019  * reduce overhead:
1020  * (fid->f_oid >> 16 & oxFFFF)/FID
1021  */
1022 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
1023 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
1024 {
1025         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
1026                         DFID_NOBRACE,
1027                         (fid)->f_oid       & 0xFFFF,
1028                         (fid)->f_oid >> 16 & 0xFFFF,
1029                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1030                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1031                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1032                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1033                         PFID(fid));
1034 }
1035
1036 static inline const struct cred *pcc_super_cred(struct super_block *sb)
1037 {
1038         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
1039 }
1040
1041 void pcc_file_init(struct pcc_file *pccf)
1042 {
1043         pccf->pccf_file = NULL;
1044         pccf->pccf_type = LU_PCC_NONE;
1045 }
1046
1047 static inline bool pcc_open_attach_enabled(struct pcc_dataset *dataset)
1048 {
1049         return dataset->pccd_flags & PCC_DATASET_OPEN_ATTACH;
1050 }
1051
1052 static const char pcc_xattr_layout[] = XATTR_USER_PREFIX "PCC.layout";
1053
1054 static int pcc_layout_xattr_set(struct pcc_inode *pcci, __u32 gen)
1055 {
1056         struct dentry *pcc_dentry = pcci->pcci_path.dentry;
1057         struct ll_inode_info *lli = pcci->pcci_lli;
1058         int rc;
1059
1060         ENTRY;
1061
1062         if (!(lli->lli_pcc_state & PCC_STATE_FL_OPEN_ATTACH))
1063                 RETURN(0);
1064
1065 #ifndef HAVE_VFS_SETXATTR
1066         if (!pcc_dentry->d_inode->i_op->setxattr)
1067                 RETURN(-ENOTSUPP);
1068
1069         rc = pcc_dentry->d_inode->i_op->setxattr(pcc_dentry, pcc_xattr_layout,
1070                                                  &gen, sizeof(gen), 0);
1071 #else
1072         rc = __vfs_setxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1073                             &gen, sizeof(gen), 0);
1074 #endif
1075         RETURN(rc);
1076 }
1077
1078 static int pcc_get_layout_info(struct inode *inode, struct cl_layout *clt)
1079 {
1080         struct lu_env *env;
1081         struct ll_inode_info *lli = ll_i2info(inode);
1082         __u16 refcheck;
1083         int rc;
1084
1085         ENTRY;
1086
1087         if (!lli->lli_clob)
1088                 RETURN(-EINVAL);
1089
1090         env = cl_env_get(&refcheck);
1091         if (IS_ERR(env))
1092                 RETURN(PTR_ERR(env));
1093
1094         rc = cl_object_layout_get(env, lli->lli_clob, clt);
1095         if (rc)
1096                 CDEBUG(D_INODE, "Cannot get layout for "DFID"\n",
1097                        PFID(ll_inode2fid(inode)));
1098
1099         cl_env_put(env, &refcheck);
1100         RETURN(rc);
1101 }
1102
1103 static int pcc_fid2dataset_fullpath(char *buf, int sz, struct lu_fid *fid,
1104                                     struct pcc_dataset *dataset)
1105 {
1106         return snprintf(buf, sz, "%s/%04x/%04x/%04x/%04x/%04x/%04x/"
1107                         DFID_NOBRACE,
1108                         dataset->pccd_pathname,
1109                         (fid)->f_oid       & 0xFFFF,
1110                         (fid)->f_oid >> 16 & 0xFFFF,
1111                         (unsigned int)((fid)->f_seq       & 0xFFFF),
1112                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
1113                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
1114                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
1115                         PFID(fid));
1116 }
1117
1118 /* Must be called with pcci->pcci_lock held */
1119 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1120                                   struct pcc_inode *pcci,
1121                                   struct dentry *dentry,
1122                                   enum lu_pcc_type type)
1123 {
1124         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1125         pcci->pcci_path.dentry = dentry;
1126         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1127         atomic_set(&pcci->pcci_refcount, 1);
1128         pcci->pcci_type = type;
1129         pcci->pcci_attr_valid = false;
1130
1131         if (pcc_open_attach_enabled(dataset)) {
1132                 struct ll_inode_info *lli = pcci->pcci_lli;
1133
1134                 lli->lli_pcc_state |= PCC_STATE_FL_OPEN_ATTACH;
1135         }
1136 }
1137
1138 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
1139                                       __u32 gen)
1140 {
1141         pcci->pcci_layout_gen = gen;
1142 }
1143
1144 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
1145 {
1146         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
1147 }
1148
1149 static int pcc_try_dataset_attach(struct inode *inode, __u32 gen,
1150                                   enum lu_pcc_type type,
1151                                   struct pcc_dataset *dataset,
1152                                   bool *cached)
1153 {
1154         struct ll_inode_info *lli = ll_i2info(inode);
1155         struct pcc_inode *pcci = lli->lli_pcc_inode;
1156         const struct cred *old_cred;
1157         struct dentry *pcc_dentry;
1158         struct path path;
1159         char *pathname;
1160         __u32 pcc_gen;
1161         int rc;
1162
1163         ENTRY;
1164
1165         if (type == LU_PCC_READWRITE &&
1166             !(dataset->pccd_flags & PCC_DATASET_RWPCC))
1167                 RETURN(0);
1168
1169         OBD_ALLOC(pathname, PATH_MAX);
1170         if (pathname == NULL)
1171                 RETURN(-ENOMEM);
1172
1173         pcc_fid2dataset_fullpath(pathname, PATH_MAX, &lli->lli_fid, dataset);
1174
1175         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1176         rc = kern_path(pathname, LOOKUP_FOLLOW, &path);
1177         if (rc)
1178                 /* ignore this error */
1179                 GOTO(out, rc = 0);
1180
1181         pcc_dentry = path.dentry;
1182 #ifndef HAVE_VFS_SETXATTR
1183         if (!pcc_dentry->d_inode->i_op->getxattr)
1184                 /* ignore this error */
1185                 GOTO(out_put_path, rc = 0);
1186
1187         rc = pcc_dentry->d_inode->i_op->getxattr(pcc_dentry, pcc_xattr_layout,
1188                                                  &pcc_gen, sizeof(pcc_gen));
1189 #else
1190         rc = __vfs_getxattr(pcc_dentry, pcc_dentry->d_inode, pcc_xattr_layout,
1191                             &pcc_gen, sizeof(pcc_gen));
1192 #endif
1193
1194         if (rc < 0)
1195                 /* ignore this error */
1196                 GOTO(out_put_path, rc = 0);
1197
1198         rc = 0;
1199         /* The file is still valid cached in PCC, attach it immediately. */
1200         if (pcc_gen == gen) {
1201                 CDEBUG(D_CACHE, DFID" L.Gen (%d) consistent, auto attached.\n",
1202                        PFID(&lli->lli_fid), gen);
1203                 if (!pcci) {
1204                         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1205                         if (pcci == NULL)
1206                                 GOTO(out_put_path, rc = -ENOMEM);
1207
1208                         pcc_inode_init(pcci, lli);
1209                         dget(pcc_dentry);
1210                         pcc_inode_attach_init(dataset, pcci, pcc_dentry, type);
1211                 } else {
1212                         /*
1213                          * This happened when a file was once attached into
1214                          * PCC, and some processes keep this file opened
1215                          * (pcci->refcount > 1) and corresponding PCC file
1216                          * without any I/O activity, and then this file was
1217                          * detached by the manual detach command or the
1218                          * revocation of the layout lock (i.e. cached LRU lock
1219                          * shrinking).
1220                          */
1221                         pcc_inode_get(pcci);
1222                         pcci->pcci_type = type;
1223                 }
1224                 pcc_layout_gen_set(pcci, gen);
1225                 *cached = true;
1226         }
1227 out_put_path:
1228         path_put(&path);
1229 out:
1230         revert_creds(old_cred);
1231         OBD_FREE(pathname, PATH_MAX);
1232         RETURN(rc);
1233 }
1234
1235 static int pcc_try_datasets_attach(struct inode *inode, __u32 gen,
1236                                    enum lu_pcc_type type, bool *cached)
1237 {
1238         struct pcc_dataset *dataset, *tmp;
1239         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1240         int rc = 0;
1241
1242         ENTRY;
1243
1244         down_read(&super->pccs_rw_sem);
1245         list_for_each_entry_safe(dataset, tmp,
1246                                  &super->pccs_datasets, pccd_linkage) {
1247                 if (!pcc_open_attach_enabled(dataset))
1248                         continue;
1249                 rc = pcc_try_dataset_attach(inode, gen, type, dataset, cached);
1250                 if (rc < 0 || (!rc && *cached))
1251                         break;
1252         }
1253         up_read(&super->pccs_rw_sem);
1254
1255         RETURN(rc);
1256 }
1257
1258 static int pcc_try_open_attach(struct inode *inode, bool *cached)
1259 {
1260         struct pcc_super *super = &ll_i2sbi(inode)->ll_pcc_super;
1261         struct cl_layout clt = {
1262                 .cl_layout_gen = 0,
1263                 .cl_is_released = false,
1264         };
1265         int rc;
1266
1267         ENTRY;
1268
1269         /*
1270          * Quick check whether there is PCC device.
1271          */
1272         if (list_empty(&super->pccs_datasets))
1273                 RETURN(0);
1274
1275         /*
1276          * The file layout lock was cancelled. And this open does not
1277          * obtain valid layout lock from MDT (i.e. the file is being
1278          * HSM restoring).
1279          */
1280         if (ll_layout_version_get(ll_i2info(inode)) == CL_LAYOUT_GEN_NONE)
1281                 RETURN(0);
1282
1283         rc = pcc_get_layout_info(inode, &clt);
1284         if (rc)
1285                 RETURN(rc);
1286
1287         if (clt.cl_is_released)
1288                 rc = pcc_try_datasets_attach(inode, clt.cl_layout_gen,
1289                                              LU_PCC_READWRITE, cached);
1290
1291         RETURN(rc);
1292 }
1293
1294 int pcc_file_open(struct inode *inode, struct file *file)
1295 {
1296         struct pcc_inode *pcci;
1297         struct ll_inode_info *lli = ll_i2info(inode);
1298         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1299         struct pcc_file *pccf = &fd->fd_pcc_file;
1300         struct file *pcc_file;
1301         struct path *path;
1302         struct qstr *dname;
1303         bool cached = false;
1304         int rc = 0;
1305
1306         ENTRY;
1307
1308         if (!S_ISREG(inode->i_mode))
1309                 RETURN(0);
1310
1311         pcc_inode_lock(inode);
1312         pcci = ll_i2pcci(inode);
1313
1314         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1315                 GOTO(out_unlock, rc = 0);
1316
1317         if (!pcci || !pcc_inode_has_layout(pcci)) {
1318                 rc = pcc_try_open_attach(inode, &cached);
1319                 if (rc < 0 || !cached)
1320                         GOTO(out_unlock, rc);
1321
1322                 if (!pcci)
1323                         pcci = ll_i2pcci(inode);
1324         }
1325
1326         pcc_inode_get(pcci);
1327         WARN_ON(pccf->pccf_file);
1328
1329         path = &pcci->pcci_path;
1330         dname = &path->dentry->d_name;
1331         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
1332                dname->name);
1333
1334 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1335         pcc_file = dentry_open(path, file->f_flags,
1336                                pcc_super_cred(inode->i_sb));
1337 #else
1338         pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
1339                                pcc_super_cred(inode->i_sb));
1340 #endif
1341         if (IS_ERR_OR_NULL(pcc_file)) {
1342                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
1343                 pcc_inode_put(pcci);
1344         } else {
1345                 pccf->pccf_file = pcc_file;
1346                 pccf->pccf_type = pcci->pcci_type;
1347         }
1348
1349 out_unlock:
1350         pcc_inode_unlock(inode);
1351         RETURN(rc);
1352 }
1353
1354 void pcc_file_release(struct inode *inode, struct file *file)
1355 {
1356         struct pcc_inode *pcci;
1357         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1358         struct pcc_file *pccf;
1359         struct path *path;
1360         struct qstr *dname;
1361
1362         ENTRY;
1363
1364         if (!S_ISREG(inode->i_mode) || fd == NULL)
1365                 RETURN_EXIT;
1366
1367         pccf = &fd->fd_pcc_file;
1368         pcc_inode_lock(inode);
1369         if (pccf->pccf_file == NULL)
1370                 goto out;
1371
1372         pcci = ll_i2pcci(inode);
1373         LASSERT(pcci);
1374         path = &pcci->pcci_path;
1375         dname = &path->dentry->d_name;
1376         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
1377                dname->name);
1378         pcc_inode_put(pcci);
1379         fput(pccf->pccf_file);
1380         pccf->pccf_file = NULL;
1381 out:
1382         pcc_inode_unlock(inode);
1383         RETURN_EXIT;
1384 }
1385
1386 static void pcc_io_init(struct inode *inode, bool *cached)
1387 {
1388         struct pcc_inode *pcci;
1389
1390         pcc_inode_lock(inode);
1391         pcci = ll_i2pcci(inode);
1392         if (pcci && pcc_inode_has_layout(pcci)) {
1393                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1394                 atomic_inc(&pcci->pcci_active_ios);
1395                 *cached = true;
1396         } else {
1397                 *cached = false;
1398         }
1399         pcc_inode_unlock(inode);
1400 }
1401
1402 static void pcc_io_fini(struct inode *inode)
1403 {
1404         struct pcc_inode *pcci = ll_i2pcci(inode);
1405
1406         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
1407         if (atomic_dec_and_test(&pcci->pcci_active_ios))
1408                 wake_up_all(&pcci->pcci_waitq);
1409 }
1410
1411
1412 static ssize_t
1413 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
1414 {
1415         struct file *file = iocb->ki_filp;
1416
1417 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1418         return file->f_op->read_iter(iocb, iter);
1419 #else
1420         struct iovec iov;
1421         struct iov_iter i;
1422         ssize_t bytes = 0;
1423
1424         iov_for_each(iov, i, *iter) {
1425                 ssize_t res;
1426
1427                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
1428                 if (-EIOCBQUEUED == res)
1429                         res = wait_on_sync_kiocb(iocb);
1430                 if (res <= 0) {
1431                         if (bytes == 0)
1432                                 bytes = res;
1433                         break;
1434                 }
1435
1436                 bytes += res;
1437                 if (res < iov.iov_len)
1438                         break;
1439         }
1440
1441         if (bytes > 0)
1442                 iov_iter_advance(iter, bytes);
1443         return bytes;
1444 #endif
1445 }
1446
1447 ssize_t pcc_file_read_iter(struct kiocb *iocb,
1448                            struct iov_iter *iter, bool *cached)
1449 {
1450         struct file *file = iocb->ki_filp;
1451         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1452         struct pcc_file *pccf = &fd->fd_pcc_file;
1453         struct inode *inode = file_inode(file);
1454         ssize_t result;
1455
1456         ENTRY;
1457
1458         if (pccf->pccf_file == NULL) {
1459                 *cached = false;
1460                 RETURN(0);
1461         }
1462
1463         pcc_io_init(inode, cached);
1464         if (!*cached)
1465                 RETURN(0);
1466
1467         iocb->ki_filp = pccf->pccf_file;
1468         /* generic_file_aio_read does not support ext4-dax,
1469          * __pcc_file_read_iter uses ->aio_read hook directly
1470          * to add support for ext4-dax.
1471          */
1472         result = __pcc_file_read_iter(iocb, iter);
1473         iocb->ki_filp = file;
1474
1475         pcc_io_fini(inode);
1476         RETURN(result);
1477 }
1478
1479 static ssize_t
1480 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
1481 {
1482         struct file *file = iocb->ki_filp;
1483
1484 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
1485         return file->f_op->write_iter(iocb, iter);
1486 #else
1487         struct iovec iov;
1488         struct iov_iter i;
1489         ssize_t bytes = 0;
1490
1491         iov_for_each(iov, i, *iter) {
1492                 ssize_t res;
1493
1494                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
1495                 if (-EIOCBQUEUED == res)
1496                         res = wait_on_sync_kiocb(iocb);
1497                 if (res <= 0) {
1498                         if (bytes == 0)
1499                                 bytes = res;
1500                         break;
1501                 }
1502
1503                 bytes += res;
1504                 if (res < iov.iov_len)
1505                         break;
1506         }
1507
1508         if (bytes > 0)
1509                 iov_iter_advance(iter, bytes);
1510         return bytes;
1511 #endif
1512 }
1513
1514 ssize_t pcc_file_write_iter(struct kiocb *iocb,
1515                             struct iov_iter *iter, bool *cached)
1516 {
1517         struct file *file = iocb->ki_filp;
1518         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1519         struct pcc_file *pccf = &fd->fd_pcc_file;
1520         struct inode *inode = file_inode(file);
1521         ssize_t result;
1522
1523         ENTRY;
1524
1525         if (pccf->pccf_file == NULL) {
1526                 *cached = false;
1527                 RETURN(0);
1528         }
1529
1530         if (pccf->pccf_type != LU_PCC_READWRITE) {
1531                 *cached = false;
1532                 RETURN(-EAGAIN);
1533         }
1534
1535         pcc_io_init(inode, cached);
1536         if (!*cached)
1537                 RETURN(0);
1538
1539         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
1540                 GOTO(out, result = -ENOSPC);
1541
1542         iocb->ki_filp = pccf->pccf_file;
1543
1544         /* Since __pcc_file_write_iter makes write calls via
1545          * the normal vfs interface to the local PCC file system,
1546          * the inode lock is not needed.
1547          */
1548         result = __pcc_file_write_iter(iocb, iter);
1549         iocb->ki_filp = file;
1550 out:
1551         pcc_io_fini(inode);
1552         RETURN(result);
1553 }
1554
1555 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
1556                       bool *cached)
1557 {
1558         int rc;
1559         const struct cred *old_cred;
1560         struct iattr attr2 = *attr;
1561         struct dentry *pcc_dentry;
1562         struct pcc_inode *pcci;
1563
1564         ENTRY;
1565
1566         if (!S_ISREG(inode->i_mode)) {
1567                 *cached = false;
1568                 RETURN(0);
1569         }
1570
1571         pcc_io_init(inode, cached);
1572         if (!*cached)
1573                 RETURN(0);
1574
1575         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
1576                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
1577                          ATTR_CTIME | ATTR_UID | ATTR_GID);
1578         pcci = ll_i2pcci(inode);
1579         pcc_dentry = pcci->pcci_path.dentry;
1580         inode_lock(pcc_dentry->d_inode);
1581         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1582         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
1583         revert_creds(old_cred);
1584         inode_unlock(pcc_dentry->d_inode);
1585
1586         pcc_io_fini(inode);
1587         RETURN(rc);
1588 }
1589
1590 int pcc_inode_getattr(struct inode *inode, bool *cached)
1591 {
1592         struct ll_inode_info *lli = ll_i2info(inode);
1593         const struct cred *old_cred;
1594         struct kstat stat;
1595         s64 atime;
1596         s64 mtime;
1597         s64 ctime;
1598         int rc;
1599
1600         ENTRY;
1601
1602         if (!S_ISREG(inode->i_mode)) {
1603                 *cached = false;
1604                 RETURN(0);
1605         }
1606
1607         pcc_io_init(inode, cached);
1608         if (!*cached)
1609                 RETURN(0);
1610
1611         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1612         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
1613         revert_creds(old_cred);
1614         if (rc)
1615                 GOTO(out, rc);
1616
1617         ll_inode_size_lock(inode);
1618         if (inode->i_atime.tv_sec < lli->lli_atime ||
1619             lli->lli_update_atime) {
1620                 inode->i_atime.tv_sec = lli->lli_atime;
1621                 lli->lli_update_atime = 0;
1622         }
1623         inode->i_mtime.tv_sec = lli->lli_mtime;
1624         inode->i_ctime.tv_sec = lli->lli_ctime;
1625
1626         atime = inode->i_atime.tv_sec;
1627         mtime = inode->i_mtime.tv_sec;
1628         ctime = inode->i_ctime.tv_sec;
1629
1630         if (atime < stat.atime.tv_sec)
1631                 atime = stat.atime.tv_sec;
1632
1633         if (ctime < stat.ctime.tv_sec)
1634                 ctime = stat.ctime.tv_sec;
1635
1636         if (mtime < stat.mtime.tv_sec)
1637                 mtime = stat.mtime.tv_sec;
1638
1639         i_size_write(inode, stat.size);
1640         inode->i_blocks = stat.blocks;
1641
1642         inode->i_atime.tv_sec = atime;
1643         inode->i_mtime.tv_sec = mtime;
1644         inode->i_ctime.tv_sec = ctime;
1645
1646         ll_inode_size_unlock(inode);
1647 out:
1648         pcc_io_fini(inode);
1649         RETURN(rc);
1650 }
1651
1652 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
1653                              struct pipe_inode_info *pipe,
1654                              size_t count, unsigned int flags,
1655                              bool *cached)
1656 {
1657         struct inode *inode = file_inode(in_file);
1658         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
1659         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1660         ssize_t result;
1661
1662         ENTRY;
1663
1664         *cached = false;
1665         if (!pcc_file)
1666                 RETURN(0);
1667
1668         if (!file_inode(pcc_file)->i_fop->splice_read)
1669                 RETURN(-ENOTSUPP);
1670
1671         pcc_io_init(inode, cached);
1672         if (!*cached)
1673                 RETURN(0);
1674
1675         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
1676                                                           ppos, pipe, count,
1677                                                           flags);
1678
1679         pcc_io_fini(inode);
1680         RETURN(result);
1681 }
1682
1683 int pcc_fsync(struct file *file, loff_t start, loff_t end,
1684               int datasync, bool *cached)
1685 {
1686         struct inode *inode = file_inode(file);
1687         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1688         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1689         int rc;
1690
1691         ENTRY;
1692
1693         if (!pcc_file) {
1694                 *cached = false;
1695                 RETURN(0);
1696         }
1697
1698         pcc_io_init(inode, cached);
1699         if (!*cached)
1700                 RETURN(0);
1701
1702 #ifdef HAVE_FILE_FSYNC_4ARGS
1703         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1704                                                 start, end, datasync);
1705 #elif defined(HAVE_FILE_FSYNC_2ARGS)
1706         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
1707 #else
1708         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
1709                                 file_dentry(dentry), datasync);
1710 #endif
1711
1712         pcc_io_fini(inode);
1713         RETURN(rc);
1714 }
1715
1716 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
1717                   bool *cached)
1718 {
1719         struct inode *inode = file_inode(file);
1720         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1721         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1722         struct pcc_inode *pcci;
1723         int rc = 0;
1724
1725         ENTRY;
1726
1727         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
1728                 *cached = false;
1729                 RETURN(0);
1730         }
1731
1732         pcc_inode_lock(inode);
1733         pcci = ll_i2pcci(inode);
1734         if (pcci && pcc_inode_has_layout(pcci)) {
1735                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
1736                 *cached = true;
1737                 vma->vm_file = pcc_file;
1738                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
1739                 vma->vm_file = file;
1740                 /* Save the vm ops of backend PCC */
1741                 vma->vm_private_data = (void *)vma->vm_ops;
1742         } else {
1743                 *cached = false;
1744         }
1745         pcc_inode_unlock(inode);
1746
1747         RETURN(rc);
1748 }
1749
1750 void pcc_vm_open(struct vm_area_struct *vma)
1751 {
1752         struct pcc_inode *pcci;
1753         struct file *file = vma->vm_file;
1754         struct inode *inode = file_inode(file);
1755         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1756         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1757         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1758
1759         ENTRY;
1760
1761         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
1762                 RETURN_EXIT;
1763
1764         pcc_inode_lock(inode);
1765         pcci = ll_i2pcci(inode);
1766         if (pcci && pcc_inode_has_layout(pcci)) {
1767                 vma->vm_file = pcc_file;
1768                 pcc_vm_ops->open(vma);
1769                 vma->vm_file = file;
1770         }
1771         pcc_inode_unlock(inode);
1772         EXIT;
1773 }
1774
1775 void pcc_vm_close(struct vm_area_struct *vma)
1776 {
1777         struct file *file = vma->vm_file;
1778         struct inode *inode = file_inode(file);
1779         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1780         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1781         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1782
1783         ENTRY;
1784
1785         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
1786                 RETURN_EXIT;
1787
1788         pcc_inode_lock(inode);
1789         /* Layout lock maybe revoked here */
1790         vma->vm_file = pcc_file;
1791         pcc_vm_ops->close(vma);
1792         vma->vm_file = file;
1793         pcc_inode_unlock(inode);
1794         EXIT;
1795 }
1796
1797 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
1798                      bool *cached)
1799 {
1800         struct page *page = vmf->page;
1801         struct mm_struct *mm = vma->vm_mm;
1802         struct file *file = vma->vm_file;
1803         struct inode *inode = file_inode(file);
1804         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1805         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1806         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1807         int rc;
1808
1809         ENTRY;
1810
1811         if (!pcc_file || !pcc_vm_ops) {
1812                 *cached = false;
1813                 RETURN(0);
1814         }
1815
1816         if (!pcc_vm_ops->page_mkwrite &&
1817             page->mapping == pcc_file->f_mapping) {
1818                 CDEBUG(D_MMAP,
1819                        "%s: PCC backend fs not support ->page_mkwrite()\n",
1820                        ll_i2sbi(inode)->ll_fsname);
1821                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
1822                 up_read(&mm->mmap_sem);
1823                 *cached = true;
1824                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1825         }
1826         /* Pause to allow for a race with concurrent detach */
1827         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1828
1829         pcc_io_init(inode, cached);
1830         if (!*cached) {
1831                 /* This happens when the file is detached from PCC after got
1832                  * the fault page via ->fault() on the inode of the PCC copy.
1833                  * Here it can not simply fall back to normal Lustre I/O path.
1834                  * The reason is that the address space of fault page used by
1835                  * ->page_mkwrite() is still the one of PCC inode. In the
1836                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1837                  * handled as the address space of the fault page is not
1838                  * consistent with the one of the Lustre inode (though the
1839                  * fault page was truncated).
1840                  * As the file is detached from PCC, the fault page must
1841                  * be released frist, and retry the mmap write (->fault() and
1842                  * ->page_mkwrite).
1843                  * We use an ugly and tricky method by returning
1844                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1845                  * __do_page_fault and retry the memory fault handling.
1846                  */
1847                 if (page->mapping == pcc_file->f_mapping) {
1848                         *cached = true;
1849                         up_read(&mm->mmap_sem);
1850                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1851                 }
1852
1853                 RETURN(0);
1854         }
1855
1856         /*
1857          * This fault injection can also be used to simulate -ENOSPC and
1858          * -EDQUOT failure of underlying PCC backend fs.
1859          */
1860         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1861                 pcc_io_fini(inode);
1862                 pcc_ioctl_detach(inode, PCC_DETACH_OPT_NONE);
1863                 up_read(&mm->mmap_sem);
1864                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1865         }
1866
1867         vma->vm_file = pcc_file;
1868 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1869         rc = pcc_vm_ops->page_mkwrite(vmf);
1870 #else
1871         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1872 #endif
1873         vma->vm_file = file;
1874
1875         pcc_io_fini(inode);
1876         RETURN(rc);
1877 }
1878
1879 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1880               bool *cached)
1881 {
1882         struct file *file = vma->vm_file;
1883         struct inode *inode = file_inode(file);
1884         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1885         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1886         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1887         int rc;
1888
1889         ENTRY;
1890
1891         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1892                 *cached = false;
1893                 RETURN(0);
1894         }
1895
1896         pcc_io_init(inode, cached);
1897         if (!*cached)
1898                 RETURN(0);
1899
1900         vma->vm_file = pcc_file;
1901 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1902         rc = pcc_vm_ops->fault(vmf);
1903 #else
1904         rc = pcc_vm_ops->fault(vma, vmf);
1905 #endif
1906         vma->vm_file = file;
1907
1908         pcc_io_fini(inode);
1909         RETURN(rc);
1910 }
1911
1912 static void pcc_layout_wait(struct pcc_inode *pcci)
1913 {
1914         struct l_wait_info lwi = { 0 };
1915
1916         while (atomic_read(&pcci->pcci_active_ios) > 0) {
1917                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1918                        atomic_read(&pcci->pcci_active_ios));
1919                 l_wait_event(pcci->pcci_waitq,
1920                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1921         }
1922 }
1923
1924 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1925 {
1926         pcci->pcci_type = LU_PCC_NONE;
1927         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1928         pcc_layout_wait(pcci);
1929 }
1930
1931 void pcc_layout_invalidate(struct inode *inode)
1932 {
1933         struct pcc_inode *pcci;
1934
1935         ENTRY;
1936
1937         pcc_inode_lock(inode);
1938         pcci = ll_i2pcci(inode);
1939         if (pcci && pcc_inode_has_layout(pcci)) {
1940                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1941                 __pcc_layout_invalidate(pcci);
1942
1943                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1944                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1945
1946                 pcc_inode_put(pcci);
1947         }
1948         pcc_inode_unlock(inode);
1949
1950         EXIT;
1951 }
1952
1953 static int pcc_inode_remove(struct inode *inode, struct dentry *pcc_dentry)
1954 {
1955         int rc;
1956
1957         rc = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1958         if (rc)
1959                 CWARN("%s: failed to unlink PCC file %.*s, rc = %d\n",
1960                       ll_i2sbi(inode)->ll_fsname, pcc_dentry->d_name.len,
1961                       pcc_dentry->d_name.name, rc);
1962
1963         return rc;
1964 }
1965
1966 /* Create directory under base if directory does not exist */
1967 static struct dentry *
1968 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1969 {
1970         int rc;
1971         struct dentry *dentry;
1972         struct inode *dir = base->d_inode;
1973
1974         inode_lock(dir);
1975         dentry = lookup_one_len(name, base, strlen(name));
1976         if (IS_ERR(dentry))
1977                 goto out;
1978
1979         if (d_is_positive(dentry))
1980                 goto out;
1981
1982         rc = vfs_mkdir(dir, dentry, mode);
1983         if (rc) {
1984                 dput(dentry);
1985                 dentry = ERR_PTR(rc);
1986                 goto out;
1987         }
1988 out:
1989         inode_unlock(dir);
1990         return dentry;
1991 }
1992
1993 static struct dentry *
1994 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1995 {
1996         char *ptr, *entry_name;
1997         struct dentry *parent;
1998         struct dentry *child = ERR_PTR(-EINVAL);
1999
2000         ptr = path;
2001         while (*ptr == '/')
2002                 ptr++;
2003
2004         entry_name = ptr;
2005         parent = dget(root);
2006         while ((ptr = strchr(ptr, '/')) != NULL) {
2007                 *ptr = '\0';
2008                 child = pcc_mkdir(parent, entry_name, mode);
2009                 *ptr = '/';
2010                 dput(parent);
2011                 if (IS_ERR(child))
2012                         break;
2013
2014                 parent = child;
2015                 ptr++;
2016                 entry_name = ptr;
2017         }
2018
2019         return child;
2020 }
2021
2022 /* Create file under base. If file already exist, return failure */
2023 static struct dentry *
2024 pcc_create(struct dentry *base, const char *name, umode_t mode)
2025 {
2026         int rc;
2027         struct dentry *dentry;
2028         struct inode *dir = base->d_inode;
2029
2030         inode_lock(dir);
2031         dentry = lookup_one_len(name, base, strlen(name));
2032         if (IS_ERR(dentry))
2033                 goto out;
2034
2035         if (d_is_positive(dentry))
2036                 goto out;
2037
2038         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
2039         if (rc) {
2040                 dput(dentry);
2041                 dentry = ERR_PTR(rc);
2042                 goto out;
2043         }
2044 out:
2045         inode_unlock(dir);
2046         return dentry;
2047 }
2048
2049 static int __pcc_inode_create(struct pcc_dataset *dataset,
2050                               struct lu_fid *fid,
2051                               struct dentry **dentry)
2052 {
2053         char *path;
2054         struct dentry *base;
2055         struct dentry *child;
2056         int rc = 0;
2057
2058         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
2059         if (path == NULL)
2060                 return -ENOMEM;
2061
2062         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
2063
2064         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
2065         if (IS_ERR(base)) {
2066                 rc = PTR_ERR(base);
2067                 GOTO(out, rc);
2068         }
2069
2070         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
2071         child = pcc_create(base, path, 0);
2072         if (IS_ERR(child)) {
2073                 rc = PTR_ERR(child);
2074                 GOTO(out_base, rc);
2075         }
2076         *dentry = child;
2077
2078 out_base:
2079         dput(base);
2080 out:
2081         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
2082         return rc;
2083 }
2084
2085 /* TODO: Set the project ID for PCC copy */
2086 int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
2087 {
2088         struct inode *inode = dentry->d_inode;
2089         struct iattr attr;
2090         int rc;
2091
2092         ENTRY;
2093
2094         attr.ia_valid = ATTR_UID | ATTR_GID;
2095         attr.ia_uid = uid;
2096         attr.ia_gid = gid;
2097
2098         inode_lock(inode);
2099         rc = notify_change(dentry, &attr, NULL);
2100         inode_unlock(inode);
2101
2102         RETURN(rc);
2103 }
2104
2105 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
2106                      struct lu_fid *fid, struct dentry **pcc_dentry)
2107 {
2108         const struct cred *old_cred;
2109         int rc;
2110
2111         old_cred = override_creds(pcc_super_cred(sb));
2112         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
2113         revert_creds(old_cred);
2114         return rc;
2115 }
2116
2117 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
2118                           struct dentry *pcc_dentry)
2119 {
2120         const struct cred *old_cred;
2121         struct pcc_inode *pcci;
2122         int rc = 0;
2123
2124         ENTRY;
2125
2126         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2127         pcc_inode_lock(inode);
2128         LASSERT(ll_i2pcci(inode) == NULL);
2129         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2130         if (pcci == NULL)
2131                 GOTO(out_put, rc = -ENOMEM);
2132
2133         rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
2134                                    old_cred->sgid);
2135         if (rc)
2136                 GOTO(out_put, rc);
2137
2138         pcc_inode_init(pcci, ll_i2info(inode));
2139         pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
2140
2141         rc = pcc_layout_xattr_set(pcci, 0);
2142         if (rc) {
2143                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2144                 pcc_inode_put(pcci);
2145                 GOTO(out_unlock, rc);
2146         }
2147
2148         /* Set the layout generation of newly created file with 0 */
2149         pcc_layout_gen_set(pcci, 0);
2150
2151 out_put:
2152         if (rc) {
2153                 (void) pcc_inode_remove(inode, pcc_dentry);
2154                 dput(pcc_dentry);
2155
2156                 if (pcci)
2157                         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
2158         }
2159 out_unlock:
2160         pcc_inode_unlock(inode);
2161         revert_creds(old_cred);
2162         RETURN(rc);
2163 }
2164
2165 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
2166                           loff_t *offset)
2167 {
2168         while (count > 0) {
2169                 ssize_t size;
2170
2171                 size = vfs_write(filp, (const void __user *)buf, count, offset);
2172                 if (size < 0)
2173                         return size;
2174                 count -= size;
2175                 buf += size;
2176         }
2177         return 0;
2178 }
2179
2180 static int pcc_copy_data(struct file *src, struct file *dst)
2181 {
2182         int rc = 0;
2183         ssize_t rc2;
2184         mm_segment_t oldfs;
2185         loff_t pos, offset = 0;
2186         size_t buf_len = 1048576;
2187         void *buf;
2188
2189         ENTRY;
2190
2191         OBD_ALLOC_LARGE(buf, buf_len);
2192         if (buf == NULL)
2193                 RETURN(-ENOMEM);
2194
2195         oldfs = get_fs();
2196         set_fs(KERNEL_DS);
2197         while (1) {
2198                 pos = offset;
2199                 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
2200                 if (rc2 < 0)
2201                         GOTO(out_fs, rc = rc2);
2202                 else if (rc2 == 0)
2203                         break;
2204
2205                 pos = offset;
2206                 rc = pcc_filp_write(dst, buf, rc2, &pos);
2207                 if (rc < 0)
2208                         GOTO(out_fs, rc);
2209                 offset += rc2;
2210         }
2211
2212 out_fs:
2213         set_fs(oldfs);
2214         OBD_FREE_LARGE(buf, buf_len);
2215         RETURN(rc);
2216 }
2217
2218 static int pcc_attach_allowed_check(struct inode *inode)
2219 {
2220         struct ll_inode_info *lli = ll_i2info(inode);
2221         struct pcc_inode *pcci;
2222         int rc = 0;
2223
2224         ENTRY;
2225
2226         pcc_inode_lock(inode);
2227         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
2228                 GOTO(out_unlock, rc = -EBUSY);
2229
2230         pcci = ll_i2pcci(inode);
2231         if (pcci && pcc_inode_has_layout(pcci))
2232                 GOTO(out_unlock, rc = -EEXIST);
2233
2234         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
2235 out_unlock:
2236         pcc_inode_unlock(inode);
2237         RETURN(rc);
2238 }
2239
2240 int pcc_readwrite_attach(struct file *file, struct inode *inode,
2241                          __u32 archive_id)
2242 {
2243         struct pcc_dataset *dataset;
2244         struct ll_inode_info *lli = ll_i2info(inode);
2245         struct pcc_inode *pcci;
2246         const struct cred *old_cred;
2247         struct dentry *dentry;
2248         struct file *pcc_filp;
2249         struct path path;
2250         int rc;
2251
2252         ENTRY;
2253
2254         rc = pcc_attach_allowed_check(inode);
2255         if (rc)
2256                 RETURN(rc);
2257
2258         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super,
2259                                   LU_PCC_READWRITE, archive_id);
2260         if (dataset == NULL)
2261                 RETURN(-ENOENT);
2262
2263         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2264         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
2265         if (rc) {
2266                 revert_creds(old_cred);
2267                 GOTO(out_dataset_put, rc);
2268         }
2269
2270         path.mnt = dataset->pccd_path.mnt;
2271         path.dentry = dentry;
2272 #ifdef HAVE_DENTRY_OPEN_USE_PATH
2273         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
2274                                current_cred());
2275 #else
2276         pcc_filp = dentry_open(path.dentry, path.mnt,
2277                                O_TRUNC | O_WRONLY | O_LARGEFILE,
2278                                current_cred());
2279 #endif
2280         if (IS_ERR_OR_NULL(pcc_filp)) {
2281                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
2282                 revert_creds(old_cred);
2283                 GOTO(out_dentry, rc);
2284         }
2285
2286         rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
2287         revert_creds(old_cred);
2288         if (rc)
2289                 GOTO(out_fput, rc);
2290
2291         rc = pcc_copy_data(file, pcc_filp);
2292         if (rc)
2293                 GOTO(out_fput, rc);
2294
2295         /* Pause to allow for a race with concurrent HSM remove */
2296         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
2297
2298         pcc_inode_lock(inode);
2299         pcci = ll_i2pcci(inode);
2300         LASSERT(!pcci);
2301         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
2302         if (pcci == NULL)
2303                 GOTO(out_unlock, rc = -ENOMEM);
2304
2305         pcc_inode_init(pcci, lli);
2306         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
2307 out_unlock:
2308         pcc_inode_unlock(inode);
2309 out_fput:
2310         fput(pcc_filp);
2311 out_dentry:
2312         if (rc) {
2313                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2314                 (void) pcc_inode_remove(inode, dentry);
2315                 revert_creds(old_cred);
2316                 dput(dentry);
2317         }
2318 out_dataset_put:
2319         pcc_dataset_put(dataset);
2320         RETURN(rc);
2321 }
2322
2323 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
2324                               __u32 gen, bool lease_broken, int rc,
2325                               bool attached)
2326 {
2327         struct ll_inode_info *lli = ll_i2info(inode);
2328         const struct cred *old_cred;
2329         struct pcc_inode *pcci;
2330         __u32 gen2;
2331
2332         ENTRY;
2333
2334         old_cred = override_creds(pcc_super_cred(inode->i_sb));
2335         pcc_inode_lock(inode);
2336         pcci = ll_i2pcci(inode);
2337         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
2338         if (rc || lease_broken) {
2339                 if (attached && pcci)
2340                         pcc_inode_put(pcci);
2341
2342                 GOTO(out_unlock, rc);
2343         }
2344
2345         /* PCC inode may be released due to layout lock revocatioin */
2346         if (!pcci)
2347                 GOTO(out_unlock, rc = -ESTALE);
2348
2349         LASSERT(attached);
2350         rc = pcc_layout_xattr_set(pcci, gen);
2351         if (rc)
2352                 GOTO(out_put, rc);
2353
2354         rc = ll_layout_refresh(inode, &gen2);
2355         if (!rc) {
2356                 if (gen2 == gen) {
2357                         pcc_layout_gen_set(pcci, gen);
2358                 } else {
2359                         CDEBUG(D_CACHE,
2360                                DFID" layout changed from %d to %d.\n",
2361                                PFID(ll_inode2fid(inode)), gen, gen2);
2362                         GOTO(out_put, rc = -ESTALE);
2363                 }
2364         }
2365
2366 out_put:
2367         if (rc) {
2368                 (void) pcc_inode_remove(inode, pcci->pcci_path.dentry);
2369                 pcc_inode_put(pcci);
2370         }
2371 out_unlock:
2372         pcc_inode_unlock(inode);
2373         revert_creds(old_cred);
2374         RETURN(rc);
2375 }
2376
2377 static int pcc_hsm_remove(struct inode *inode)
2378 {
2379         struct hsm_user_request *hur;
2380         __u32 gen;
2381         int len;
2382         int rc;
2383
2384         ENTRY;
2385
2386         rc = ll_layout_restore(inode, 0, OBD_OBJECT_EOF);
2387         if (rc) {
2388                 CDEBUG(D_CACHE, DFID" RESTORE failure: %d\n",
2389                        PFID(&ll_i2info(inode)->lli_fid), rc);
2390                 RETURN(rc);
2391         }
2392
2393         ll_layout_refresh(inode, &gen);
2394
2395         len = sizeof(struct hsm_user_request) +
2396               sizeof(struct hsm_user_item);
2397         OBD_ALLOC(hur, len);
2398         if (hur == NULL)
2399                 RETURN(-ENOMEM);
2400
2401         hur->hur_request.hr_action = HUA_REMOVE;
2402         hur->hur_request.hr_archive_id = 0;
2403         hur->hur_request.hr_flags = 0;
2404         memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
2405                sizeof(hur->hur_user_item[0].hui_fid));
2406         hur->hur_user_item[0].hui_extent.offset = 0;
2407         hur->hur_user_item[0].hui_extent.length = OBD_OBJECT_EOF;
2408         hur->hur_request.hr_itemcount = 1;
2409         rc = obd_iocontrol(LL_IOC_HSM_REQUEST, ll_i2sbi(inode)->ll_md_exp,
2410                            len, hur, NULL);
2411         if (rc)
2412                 CDEBUG(D_CACHE, DFID" HSM REMOVE failure: %d\n",
2413                        PFID(&ll_i2info(inode)->lli_fid), rc);
2414
2415         OBD_FREE(hur, len);
2416         RETURN(rc);
2417 }
2418
2419 int pcc_ioctl_detach(struct inode *inode, __u32 opt)
2420 {
2421         struct ll_inode_info *lli = ll_i2info(inode);
2422         struct pcc_inode *pcci;
2423         bool hsm_remove = false;
2424         int rc = 0;
2425
2426         ENTRY;
2427
2428         pcc_inode_lock(inode);
2429         pcci = lli->lli_pcc_inode;
2430         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
2431             !pcc_inode_has_layout(pcci))
2432                 GOTO(out_unlock, rc = 0);
2433
2434         LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
2435
2436         if (pcci->pcci_type == LU_PCC_READWRITE) {
2437                 if (opt == PCC_DETACH_OPT_UNCACHE)
2438                         hsm_remove = true;
2439
2440                 __pcc_layout_invalidate(pcci);
2441                 pcc_inode_put(pcci);
2442         }
2443
2444 out_unlock:
2445         pcc_inode_unlock(inode);
2446         if (hsm_remove) {
2447                 const struct cred *old_cred;
2448
2449                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
2450                 rc = pcc_hsm_remove(inode);
2451                 revert_creds(old_cred);
2452         }
2453
2454         RETURN(rc);
2455 }
2456
2457 int pcc_ioctl_state(struct file *file, struct inode *inode,
2458                     struct lu_pcc_state *state)
2459 {
2460         int rc = 0;
2461         int count;
2462         char *buf;
2463         char *path;
2464         int buf_len = sizeof(state->pccs_path);
2465         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2466         struct pcc_file *pccf = &fd->fd_pcc_file;
2467         struct pcc_inode *pcci;
2468
2469         ENTRY;
2470
2471         if (buf_len <= 0)
2472                 RETURN(-EINVAL);
2473
2474         OBD_ALLOC(buf, buf_len);
2475         if (buf == NULL)
2476                 RETURN(-ENOMEM);
2477
2478         pcc_inode_lock(inode);
2479         pcci = ll_i2pcci(inode);
2480         if (pcci == NULL) {
2481                 state->pccs_type = LU_PCC_NONE;
2482                 GOTO(out_unlock, rc = 0);
2483         }
2484
2485         count = atomic_read(&pcci->pcci_refcount);
2486         if (count == 0) {
2487                 state->pccs_type = LU_PCC_NONE;
2488                 state->pccs_open_count = 0;
2489                 GOTO(out_unlock, rc = 0);
2490         }
2491
2492         if (pcc_inode_has_layout(pcci))
2493                 count--;
2494         if (pccf->pccf_file != NULL)
2495                 count--;
2496         state->pccs_type = pcci->pcci_type;
2497         state->pccs_open_count = count;
2498         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
2499 #ifdef HAVE_DENTRY_PATH_RAW
2500         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
2501         if (IS_ERR(path))
2502                 GOTO(out_unlock, rc = PTR_ERR(path));
2503 #else
2504         path = "UNKNOWN";
2505 #endif
2506
2507         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
2508                 GOTO(out_unlock, rc = -ENAMETOOLONG);
2509
2510 out_unlock:
2511         pcc_inode_unlock(inode);
2512         OBD_FREE(buf, buf_len);
2513         RETURN(rc);
2514 }