Whamcloud - gitweb
LU-10092 pcc: Non-blocking PCC caching
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 void pcc_super_init(struct pcc_super *super)
116 {
117         spin_lock_init(&super->pccs_lock);
118         INIT_LIST_HEAD(&super->pccs_datasets);
119 }
120
121 /**
122  * pcc_dataset_add - Add a Cache policy to control which files need be
123  * cached and where it will be cached.
124  *
125  * @super: superblock of pcc
126  * @pathname: root path of pcc
127  * @id: HSM archive ID
128  * @projid: files with specified project ID will be cached.
129  */
130 static int
131 pcc_dataset_add(struct pcc_super *super, const char *pathname,
132                 __u32 archive_id, __u32 projid)
133 {
134         int rc;
135         struct pcc_dataset *dataset;
136         struct pcc_dataset *tmp;
137         bool found = false;
138
139         OBD_ALLOC_PTR(dataset);
140         if (dataset == NULL)
141                 return -ENOMEM;
142
143         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
144         if (unlikely(rc)) {
145                 OBD_FREE_PTR(dataset);
146                 return rc;
147         }
148         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
149         dataset->pccd_id = archive_id;
150         dataset->pccd_projid = projid;
151         atomic_set(&dataset->pccd_refcount, 1);
152
153         spin_lock(&super->pccs_lock);
154         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
155                 if (tmp->pccd_id == archive_id) {
156                         found = true;
157                         break;
158                 }
159         }
160         if (!found)
161                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
162         spin_unlock(&super->pccs_lock);
163
164         if (found) {
165                 pcc_dataset_put(dataset);
166                 rc = -EEXIST;
167         }
168
169         return rc;
170 }
171
172 struct pcc_dataset *
173 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
174 {
175         struct pcc_dataset *dataset;
176         struct pcc_dataset *selected = NULL;
177
178         if (projid == 0 && archive_id == 0)
179                 return NULL;
180
181         /*
182          * archive ID is unique in the list, projid might be duplicate,
183          * we just return last added one as first priority.
184          */
185         spin_lock(&super->pccs_lock);
186         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
187                 if (projid && dataset->pccd_projid != projid)
188                         continue;
189                 if (archive_id && dataset->pccd_id != archive_id)
190                         continue;
191                 atomic_inc(&dataset->pccd_refcount);
192                 selected = dataset;
193                 break;
194         }
195         spin_unlock(&super->pccs_lock);
196         if (selected)
197                 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
198                        selected->pccd_projid);
199         return selected;
200 }
201
202 void
203 pcc_dataset_put(struct pcc_dataset *dataset)
204 {
205         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
206                 path_put(&dataset->pccd_path);
207                 OBD_FREE_PTR(dataset);
208         }
209 }
210
211 static int
212 pcc_dataset_del(struct pcc_super *super, char *pathname)
213 {
214         struct list_head *l, *tmp;
215         struct pcc_dataset *dataset;
216         int rc = -ENOENT;
217
218         spin_lock(&super->pccs_lock);
219         list_for_each_safe(l, tmp, &super->pccs_datasets) {
220                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
221                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
222                         list_del(&dataset->pccd_linkage);
223                         pcc_dataset_put(dataset);
224                         rc = 0;
225                         break;
226                 }
227         }
228         spin_unlock(&super->pccs_lock);
229         return rc;
230 }
231
232 static void
233 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
234 {
235         seq_printf(m, "%s:\n", dataset->pccd_pathname);
236         seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
237         seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
238 }
239
240 int
241 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
242 {
243         struct pcc_dataset *dataset;
244
245         spin_lock(&super->pccs_lock);
246         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
247                 pcc_dataset_dump(dataset, m);
248         }
249         spin_unlock(&super->pccs_lock);
250         return 0;
251 }
252
253 void pcc_super_fini(struct pcc_super *super)
254 {
255         struct pcc_dataset *dataset, *tmp;
256
257         list_for_each_entry_safe(dataset, tmp,
258                                  &super->pccs_datasets, pccd_linkage) {
259                 list_del(&dataset->pccd_linkage);
260                 pcc_dataset_put(dataset);
261         }
262 }
263
264
265 static bool pathname_is_valid(const char *pathname)
266 {
267         /* Needs to be absolute path */
268         if (pathname == NULL || strlen(pathname) == 0 ||
269             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
270                 return false;
271         return true;
272 }
273
274 static struct pcc_cmd *
275 pcc_cmd_parse(char *buffer, unsigned long count)
276 {
277         static struct pcc_cmd *cmd;
278         char *token;
279         char *val;
280         unsigned long tmp;
281         int rc = 0;
282
283         OBD_ALLOC_PTR(cmd);
284         if (cmd == NULL)
285                 GOTO(out, rc = -ENOMEM);
286
287         /* clear all setting */
288         if (strncmp(buffer, "clear", 5) == 0) {
289                 cmd->pccc_cmd = PCC_CLEAR_ALL;
290                 GOTO(out, rc = 0);
291         }
292
293         val = buffer;
294         token = strsep(&val, " ");
295         if (val == NULL || strlen(val) == 0)
296                 GOTO(out_free_cmd, rc = -EINVAL);
297
298         /* Type of the command */
299         if (strcmp(token, "add") == 0)
300                 cmd->pccc_cmd = PCC_ADD_DATASET;
301         else if (strcmp(token, "del") == 0)
302                 cmd->pccc_cmd = PCC_DEL_DATASET;
303         else
304                 GOTO(out_free_cmd, rc = -EINVAL);
305
306         /* Pathname of the dataset */
307         token = strsep(&val, " ");
308         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
309             !pathname_is_valid(token))
310                 GOTO(out_free_cmd, rc = -EINVAL);
311         cmd->pccc_pathname = token;
312
313         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
314                 /* archive ID */
315                 token = strsep(&val, " ");
316                 if (val == NULL)
317                         GOTO(out_free_cmd, rc = -EINVAL);
318
319                 rc = kstrtoul(token, 10, &tmp);
320                 if (rc != 0)
321                         GOTO(out_free_cmd, rc = -EINVAL);
322                 if (tmp == 0)
323                         GOTO(out_free_cmd, rc = -EINVAL);
324                 cmd->u.pccc_add.pccc_id = tmp;
325
326                 token = val;
327                 rc = kstrtoul(token, 10, &tmp);
328                 if (rc != 0)
329                         GOTO(out_free_cmd, rc = -EINVAL);
330                 if (tmp == 0)
331                         GOTO(out_free_cmd, rc = -EINVAL);
332                 cmd->u.pccc_add.pccc_projid = tmp;
333         }
334
335         goto out;
336 out_free_cmd:
337         OBD_FREE_PTR(cmd);
338 out:
339         if (rc)
340                 cmd = ERR_PTR(rc);
341         return cmd;
342 }
343
344 int pcc_cmd_handle(char *buffer, unsigned long count,
345                    struct pcc_super *super)
346 {
347         int rc = 0;
348         struct pcc_cmd *cmd;
349
350         cmd = pcc_cmd_parse(buffer, count);
351         if (IS_ERR(cmd))
352                 return PTR_ERR(cmd);
353
354         switch (cmd->pccc_cmd) {
355         case PCC_ADD_DATASET:
356                 rc = pcc_dataset_add(super, cmd->pccc_pathname,
357                                       cmd->u.pccc_add.pccc_id,
358                                       cmd->u.pccc_add.pccc_projid);
359                 break;
360         case PCC_DEL_DATASET:
361                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
362                 break;
363         case PCC_CLEAR_ALL:
364                 pcc_super_fini(super);
365                 break;
366         default:
367                 rc = -EINVAL;
368                 break;
369         }
370
371         OBD_FREE_PTR(cmd);
372         return rc;
373 }
374
375 static inline void pcc_inode_lock(struct inode *inode)
376 {
377         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
378 }
379
380 static inline void pcc_inode_unlock(struct inode *inode)
381 {
382         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
383 }
384
385 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
386 {
387         pcci->pcci_lli = lli;
388         lli->lli_pcc_inode = pcci;
389         atomic_set(&pcci->pcci_refcount, 0);
390         pcci->pcci_type = LU_PCC_NONE;
391         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
392         atomic_set(&pcci->pcci_active_ios, 0);
393         init_waitqueue_head(&pcci->pcci_waitq);
394 }
395
396 static void pcc_inode_fini(struct pcc_inode *pcci)
397 {
398         struct ll_inode_info *lli = pcci->pcci_lli;
399
400         path_put(&pcci->pcci_path);
401         pcci->pcci_type = LU_PCC_NONE;
402         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
403         lli->lli_pcc_inode = NULL;
404 }
405
406 static void pcc_inode_get(struct pcc_inode *pcci)
407 {
408         atomic_inc(&pcci->pcci_refcount);
409 }
410
411 static void pcc_inode_put(struct pcc_inode *pcci)
412 {
413         if (atomic_dec_and_test(&pcci->pcci_refcount))
414                 pcc_inode_fini(pcci);
415 }
416
417 void pcc_inode_free(struct inode *inode)
418 {
419         struct pcc_inode *pcci = ll_i2pcci(inode);
420
421         if (pcci) {
422                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
423                 pcc_inode_put(pcci);
424         }
425 }
426
427 /*
428  * TODO:
429  * As Andreas suggested, we'd better use new layout to
430  * reduce overhead:
431  * (fid->f_oid >> 16 & oxFFFF)/FID
432  */
433 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
434 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
435 {
436         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
437                         DFID_NOBRACE,
438                         (fid)->f_oid       & 0xFFFF,
439                         (fid)->f_oid >> 16 & 0xFFFF,
440                         (unsigned int)((fid)->f_seq       & 0xFFFF),
441                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
442                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
443                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
444                         PFID(fid));
445 }
446
447 void pcc_file_init(struct pcc_file *pccf)
448 {
449         pccf->pccf_file = NULL;
450         pccf->pccf_type = LU_PCC_NONE;
451 }
452
453 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
454 {
455         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
456 }
457
458 int pcc_file_open(struct inode *inode, struct file *file)
459 {
460         struct pcc_inode *pcci;
461         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
462         struct pcc_file *pccf = &fd->fd_pcc_file;
463         struct file *pcc_file;
464         struct path *path;
465         struct qstr *dname;
466         int rc = 0;
467
468         ENTRY;
469
470         if (!S_ISREG(inode->i_mode))
471                 RETURN(0);
472
473         pcc_inode_lock(inode);
474         pcci = ll_i2pcci(inode);
475         if (!pcci)
476                 GOTO(out_unlock, rc = 0);
477
478         if (atomic_read(&pcci->pcci_refcount) == 0 ||
479             !pcc_inode_has_layout(pcci))
480                 GOTO(out_unlock, rc = 0);
481
482         pcc_inode_get(pcci);
483         WARN_ON(pccf->pccf_file);
484
485         path = &pcci->pcci_path;
486         dname = &path->dentry->d_name;
487         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
488                dname->name);
489 #ifdef HAVE_DENTRY_OPEN_USE_PATH
490         pcc_file = dentry_open(path, file->f_flags, current_cred());
491 #else
492         pcc_file = dentry_open(path->dentry, path->mnt,
493                                file->f_flags, current_cred());
494 #endif
495         if (IS_ERR_OR_NULL(pcc_file)) {
496                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
497                 pcc_inode_put(pcci);
498         } else {
499                 pccf->pccf_file = pcc_file;
500                 pccf->pccf_type = pcci->pcci_type;
501         }
502
503 out_unlock:
504         pcc_inode_unlock(inode);
505         RETURN(rc);
506 }
507
508 void pcc_file_release(struct inode *inode, struct file *file)
509 {
510         struct pcc_inode *pcci;
511         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
512         struct pcc_file *pccf;
513         struct path *path;
514         struct qstr *dname;
515
516         ENTRY;
517
518         if (!S_ISREG(inode->i_mode) || fd == NULL)
519                 RETURN_EXIT;
520
521         pccf = &fd->fd_pcc_file;
522         pcc_inode_lock(inode);
523         if (pccf->pccf_file == NULL)
524                 goto out;
525
526         pcci = ll_i2pcci(inode);
527         LASSERT(pcci);
528         path = &pcci->pcci_path;
529         dname = &path->dentry->d_name;
530         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
531                dname->name);
532         pcc_inode_put(pcci);
533         fput(pccf->pccf_file);
534         pccf->pccf_file = NULL;
535 out:
536         pcc_inode_unlock(inode);
537         RETURN_EXIT;
538 }
539
540 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
541                                       __u32 gen)
542 {
543         pcci->pcci_layout_gen = gen;
544 }
545
546 static void pcc_io_init(struct inode *inode, bool *cached)
547 {
548         struct pcc_inode *pcci;
549
550         pcc_inode_lock(inode);
551         pcci = ll_i2pcci(inode);
552         if (pcci && pcc_inode_has_layout(pcci)) {
553                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
554                 atomic_inc(&pcci->pcci_active_ios);
555                 *cached = true;
556         } else {
557                 *cached = false;
558         }
559         pcc_inode_unlock(inode);
560 }
561
562 static void pcc_io_fini(struct inode *inode)
563 {
564         struct pcc_inode *pcci = ll_i2pcci(inode);
565
566         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
567         if (atomic_dec_and_test(&pcci->pcci_active_ios))
568                 wake_up_all(&pcci->pcci_waitq);
569 }
570
571
572 static ssize_t
573 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
574 {
575         struct file *file = iocb->ki_filp;
576
577 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
578         return file->f_op->read_iter(iocb, iter);
579 #else
580         struct iovec iov;
581         struct iov_iter i;
582         ssize_t bytes = 0;
583
584         iov_for_each(iov, i, *iter) {
585                 ssize_t res;
586
587                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
588                 if (-EIOCBQUEUED == res)
589                         res = wait_on_sync_kiocb(iocb);
590                 if (res <= 0) {
591                         if (bytes == 0)
592                                 bytes = res;
593                         break;
594                 }
595
596                 bytes += res;
597                 if (res < iov.iov_len)
598                         break;
599         }
600
601         if (bytes > 0)
602                 iov_iter_advance(iter, bytes);
603         return bytes;
604 #endif
605 }
606
607 ssize_t pcc_file_read_iter(struct kiocb *iocb,
608                            struct iov_iter *iter, bool *cached)
609 {
610         struct file *file = iocb->ki_filp;
611         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
612         struct pcc_file *pccf = &fd->fd_pcc_file;
613         struct inode *inode = file_inode(file);
614         ssize_t result;
615
616         ENTRY;
617
618         if (pccf->pccf_file == NULL) {
619                 *cached = false;
620                 RETURN(0);
621         }
622
623         pcc_io_init(inode, cached);
624         if (!*cached)
625                 RETURN(0);
626
627         iocb->ki_filp = pccf->pccf_file;
628         /* generic_file_aio_read does not support ext4-dax,
629          * __pcc_file_read_iter uses ->aio_read hook directly
630          * to add support for ext4-dax.
631          */
632         result = __pcc_file_read_iter(iocb, iter);
633         iocb->ki_filp = file;
634
635         pcc_io_fini(inode);
636         RETURN(result);
637 }
638
639 static ssize_t
640 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
641 {
642         struct file *file = iocb->ki_filp;
643
644 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
645         return file->f_op->write_iter(iocb, iter);
646 #else
647         struct iovec iov;
648         struct iov_iter i;
649         ssize_t bytes = 0;
650
651         iov_for_each(iov, i, *iter) {
652                 ssize_t res;
653
654                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
655                 if (-EIOCBQUEUED == res)
656                         res = wait_on_sync_kiocb(iocb);
657                 if (res <= 0) {
658                         if (bytes == 0)
659                                 bytes = res;
660                         break;
661                 }
662
663                 bytes += res;
664                 if (res < iov.iov_len)
665                         break;
666         }
667
668         if (bytes > 0)
669                 iov_iter_advance(iter, bytes);
670         return bytes;
671 #endif
672 }
673
674 ssize_t pcc_file_write_iter(struct kiocb *iocb,
675                             struct iov_iter *iter, bool *cached)
676 {
677         struct file *file = iocb->ki_filp;
678         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
679         struct pcc_file *pccf = &fd->fd_pcc_file;
680         struct inode *inode = file_inode(file);
681         ssize_t result;
682
683         ENTRY;
684
685         if (pccf->pccf_file == NULL) {
686                 *cached = false;
687                 RETURN(0);
688         }
689
690         if (pccf->pccf_type != LU_PCC_READWRITE) {
691                 *cached = false;
692                 RETURN(-EAGAIN);
693         }
694
695         pcc_io_init(inode, cached);
696         if (!*cached)
697                 RETURN(0);
698
699         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
700                 GOTO(out, result = -ENOSPC);
701
702         iocb->ki_filp = pccf->pccf_file;
703
704         /* Since __pcc_file_write_iter makes write calls via
705          * the normal vfs interface to the local PCC file system,
706          * the inode lock is not needed.
707          */
708         result = __pcc_file_write_iter(iocb, iter);
709         iocb->ki_filp = file;
710 out:
711         pcc_io_fini(inode);
712         RETURN(result);
713 }
714
715 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
716                       bool *cached)
717 {
718         int rc;
719         struct iattr attr2 = *attr;
720         struct dentry *pcc_dentry;
721         struct pcc_inode *pcci;
722
723         ENTRY;
724
725         if (!S_ISREG(inode->i_mode)) {
726                 *cached = false;
727                 RETURN(0);
728         }
729
730         pcc_io_init(inode, cached);
731         if (!*cached)
732                 RETURN(0);
733
734         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
735                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
736                          ATTR_CTIME);
737         pcci = ll_i2pcci(inode);
738         pcc_dentry = pcci->pcci_path.dentry;
739         inode_lock(pcc_dentry->d_inode);
740         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
741         inode_unlock(pcc_dentry->d_inode);
742
743         pcc_io_fini(inode);
744         RETURN(rc);
745 }
746
747 int pcc_inode_getattr(struct inode *inode, bool *cached)
748 {
749         struct ll_inode_info *lli = ll_i2info(inode);
750         struct kstat stat;
751         s64 atime;
752         s64 mtime;
753         s64 ctime;
754         int rc;
755
756         ENTRY;
757
758         if (!S_ISREG(inode->i_mode)) {
759                 *cached = false;
760                 RETURN(0);
761         }
762
763         pcc_io_init(inode, cached);
764         if (!*cached)
765                 RETURN(0);
766
767         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
768         if (rc)
769                 GOTO(out, rc);
770
771         ll_inode_size_lock(inode);
772         if (inode->i_atime.tv_sec < lli->lli_atime ||
773             lli->lli_update_atime) {
774                 inode->i_atime.tv_sec = lli->lli_atime;
775                 lli->lli_update_atime = 0;
776         }
777         inode->i_mtime.tv_sec = lli->lli_mtime;
778         inode->i_ctime.tv_sec = lli->lli_ctime;
779
780         atime = inode->i_atime.tv_sec;
781         mtime = inode->i_mtime.tv_sec;
782         ctime = inode->i_ctime.tv_sec;
783
784         if (atime < stat.atime.tv_sec)
785                 atime = stat.atime.tv_sec;
786
787         if (ctime < stat.ctime.tv_sec)
788                 ctime = stat.ctime.tv_sec;
789
790         if (mtime < stat.mtime.tv_sec)
791                 mtime = stat.mtime.tv_sec;
792
793         i_size_write(inode, stat.size);
794         inode->i_blocks = stat.blocks;
795
796         inode->i_atime.tv_sec = atime;
797         inode->i_mtime.tv_sec = mtime;
798         inode->i_ctime.tv_sec = ctime;
799
800         ll_inode_size_unlock(inode);
801 out:
802         pcc_io_fini(inode);
803         RETURN(rc);
804 }
805
806 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
807                              struct pipe_inode_info *pipe,
808                              size_t count, unsigned int flags,
809                              bool *cached)
810 {
811         struct inode *inode = file_inode(in_file);
812         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
813         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
814         ssize_t result;
815
816         ENTRY;
817
818         *cached = false;
819         if (!pcc_file)
820                 RETURN(0);
821
822         if (!file_inode(pcc_file)->i_fop->splice_read)
823                 RETURN(-ENOTSUPP);
824
825         pcc_io_init(inode, cached);
826         if (!*cached)
827                 RETURN(0);
828
829         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
830                                                           ppos, pipe, count,
831                                                           flags);
832
833         pcc_io_fini(inode);
834         RETURN(result);
835 }
836
837 int pcc_fsync(struct file *file, loff_t start, loff_t end,
838               int datasync, bool *cached)
839 {
840         struct inode *inode = file_inode(file);
841         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
842         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
843         int rc;
844
845         ENTRY;
846
847         if (!pcc_file) {
848                 *cached = false;
849                 RETURN(0);
850         }
851
852         pcc_io_init(inode, cached);
853         if (!*cached)
854                 RETURN(0);
855
856 #ifdef HAVE_FILE_FSYNC_4ARGS
857         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
858                                                 start, end, datasync);
859 #elif defined(HAVE_FILE_FSYNC_2ARGS)
860         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
861 #else
862         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
863                                 file_dentry(dentry), datasync);
864 #endif
865
866         pcc_io_fini(inode);
867         RETURN(rc);
868 }
869
870 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
871                   bool *cached)
872 {
873         struct inode *inode = file_inode(file);
874         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
875         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
876         struct pcc_inode *pcci;
877         int rc = 0;
878
879         ENTRY;
880
881         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
882                 *cached = false;
883                 RETURN(0);
884         }
885
886         pcc_inode_lock(inode);
887         pcci = ll_i2pcci(inode);
888         if (pcci && pcc_inode_has_layout(pcci)) {
889                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
890                 *cached = true;
891                 vma->vm_file = pcc_file;
892                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
893                 vma->vm_file = file;
894                 /* Save the vm ops of backend PCC */
895                 vma->vm_private_data = (void *)vma->vm_ops;
896         } else {
897                 *cached = false;
898         }
899         pcc_inode_unlock(inode);
900
901         RETURN(rc);
902 }
903
904 void pcc_vm_open(struct vm_area_struct *vma)
905 {
906         struct pcc_inode *pcci;
907         struct file *file = vma->vm_file;
908         struct inode *inode = file_inode(file);
909         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
910         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
911         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
912
913         ENTRY;
914
915         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
916                 RETURN_EXIT;
917
918         pcc_inode_lock(inode);
919         pcci = ll_i2pcci(inode);
920         if (pcci && pcc_inode_has_layout(pcci)) {
921                 vma->vm_file = pcc_file;
922                 pcc_vm_ops->open(vma);
923                 vma->vm_file = file;
924         }
925         pcc_inode_unlock(inode);
926         EXIT;
927 }
928
929 void pcc_vm_close(struct vm_area_struct *vma)
930 {
931         struct file *file = vma->vm_file;
932         struct inode *inode = file_inode(file);
933         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
934         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
935         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
936
937         ENTRY;
938
939         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
940                 RETURN_EXIT;
941
942         pcc_inode_lock(inode);
943         /* Layout lock maybe revoked here */
944         vma->vm_file = pcc_file;
945         pcc_vm_ops->close(vma);
946         vma->vm_file = file;
947         pcc_inode_unlock(inode);
948         EXIT;
949 }
950
951 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
952                      bool *cached)
953 {
954         struct page *page = vmf->page;
955         struct mm_struct *mm = vma->vm_mm;
956         struct file *file = vma->vm_file;
957         struct inode *inode = file_inode(file);
958         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
959         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
960         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
961         int rc;
962
963         ENTRY;
964
965         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
966                 *cached = false;
967                 RETURN(0);
968         }
969
970         /* Pause to allow for a race with concurrent detach */
971         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
972
973         pcc_io_init(inode, cached);
974         if (!*cached) {
975                 /* This happens when the file is detached from PCC after got
976                  * the fault page via ->fault() on the inode of the PCC copy.
977                  * Here it can not simply fall back to normal Lustre I/O path.
978                  * The reason is that the address space of fault page used by
979                  * ->page_mkwrite() is still the one of PCC inode. In the
980                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
981                  * handled as the address space of the fault page is not
982                  * consistent with the one of the Lustre inode (though the
983                  * fault page was truncated).
984                  * As the file is detached from PCC, the fault page must
985                  * be released frist, and retry the mmap write (->fault() and
986                  * ->page_mkwrite).
987                  * We use an ugly and tricky method by returning
988                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
989                  * __do_page_fault and retry the memory fault handling.
990                  */
991                 if (page->mapping == file_inode(pcc_file)->i_mapping) {
992                         *cached = true;
993                         up_read(&mm->mmap_sem);
994                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
995                 }
996
997                 RETURN(0);
998         }
999
1000         /*
1001          * This fault injection can also be used to simulate -ENOSPC and
1002          * -EDQUOT failure of underlying PCC backend fs.
1003          */
1004         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1005                 pcc_io_fini(inode);
1006                 pcc_ioctl_detach(inode);
1007                 up_read(&mm->mmap_sem);
1008                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1009         }
1010
1011         vma->vm_file = pcc_file;
1012 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1013         rc = pcc_vm_ops->page_mkwrite(vmf);
1014 #else
1015         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1016 #endif
1017         vma->vm_file = file;
1018
1019         pcc_io_fini(inode);
1020         RETURN(rc);
1021 }
1022
1023 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1024               bool *cached)
1025 {
1026         struct file *file = vma->vm_file;
1027         struct inode *inode = file_inode(file);
1028         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1029         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1030         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1031         int rc;
1032
1033         ENTRY;
1034
1035         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1036                 *cached = false;
1037                 RETURN(0);
1038         }
1039
1040         pcc_io_init(inode, cached);
1041         if (!*cached)
1042                 RETURN(0);
1043
1044         vma->vm_file = pcc_file;
1045 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1046         rc = pcc_vm_ops->fault(vmf);
1047 #else
1048         rc = pcc_vm_ops->fault(vma, vmf);
1049 #endif
1050         vma->vm_file = file;
1051
1052         pcc_io_fini(inode);
1053         RETURN(rc);
1054 }
1055
1056 static void pcc_layout_wait(struct pcc_inode *pcci)
1057 {
1058         struct l_wait_info lwi = { 0 };
1059
1060         while (atomic_read(&pcci->pcci_active_ios) > 0) {
1061                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1062                        atomic_read(&pcci->pcci_active_ios));
1063                 l_wait_event(pcci->pcci_waitq,
1064                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1065         }
1066 }
1067
1068 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1069 {
1070         pcci->pcci_type = LU_PCC_NONE;
1071         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1072         pcc_layout_wait(pcci);
1073 }
1074
1075 void pcc_layout_invalidate(struct inode *inode)
1076 {
1077         struct pcc_inode *pcci;
1078
1079         pcc_inode_lock(inode);
1080         pcci = ll_i2pcci(inode);
1081         if (pcci && pcc_inode_has_layout(pcci)) {
1082                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1083                 __pcc_layout_invalidate(pcci);
1084
1085                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1086                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1087
1088                 pcc_inode_put(pcci);
1089         }
1090         pcc_inode_unlock(inode);
1091 }
1092
1093 static int pcc_inode_remove(struct pcc_inode *pcci)
1094 {
1095         struct dentry *dentry;
1096         int rc;
1097
1098         dentry = pcci->pcci_path.dentry;
1099         rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1100         if (rc)
1101                 CWARN("failed to unlink cached file, rc = %d\n", rc);
1102
1103         return rc;
1104 }
1105
1106 /* Create directory under base if directory does not exist */
1107 static struct dentry *
1108 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1109 {
1110         int rc;
1111         struct dentry *dentry;
1112         struct inode *dir = base->d_inode;
1113
1114         inode_lock(dir);
1115         dentry = lookup_one_len(name, base, strlen(name));
1116         if (IS_ERR(dentry))
1117                 goto out;
1118
1119         if (d_is_positive(dentry))
1120                 goto out;
1121
1122         rc = vfs_mkdir(dir, dentry, mode);
1123         if (rc) {
1124                 dput(dentry);
1125                 dentry = ERR_PTR(rc);
1126                 goto out;
1127         }
1128 out:
1129         inode_unlock(dir);
1130         return dentry;
1131 }
1132
1133 static struct dentry *
1134 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1135 {
1136         char *ptr, *entry_name;
1137         struct dentry *parent;
1138         struct dentry *child = ERR_PTR(-EINVAL);
1139
1140         ptr = path;
1141         while (*ptr == '/')
1142                 ptr++;
1143
1144         entry_name = ptr;
1145         parent = dget(root);
1146         while ((ptr = strchr(ptr, '/')) != NULL) {
1147                 *ptr = '\0';
1148                 child = pcc_mkdir(parent, entry_name, mode);
1149                 *ptr = '/';
1150                 dput(parent);
1151                 if (IS_ERR(child))
1152                         break;
1153
1154                 parent = child;
1155                 ptr++;
1156                 entry_name = ptr;
1157         }
1158
1159         return child;
1160 }
1161
1162 /* Create file under base. If file already exist, return failure */
1163 static struct dentry *
1164 pcc_create(struct dentry *base, const char *name, umode_t mode)
1165 {
1166         int rc;
1167         struct dentry *dentry;
1168         struct inode *dir = base->d_inode;
1169
1170         inode_lock(dir);
1171         dentry = lookup_one_len(name, base, strlen(name));
1172         if (IS_ERR(dentry))
1173                 goto out;
1174
1175         if (d_is_positive(dentry))
1176                 goto out;
1177
1178         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
1179         if (rc) {
1180                 dput(dentry);
1181                 dentry = ERR_PTR(rc);
1182                 goto out;
1183         }
1184 out:
1185         inode_unlock(dir);
1186         return dentry;
1187 }
1188
1189 /* Must be called with pcci->pcci_lock held */
1190 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1191                                   struct pcc_inode *pcci,
1192                                   struct dentry *dentry,
1193                                   enum lu_pcc_type type)
1194 {
1195         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1196         pcci->pcci_path.dentry = dentry;
1197         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1198         atomic_set(&pcci->pcci_refcount, 1);
1199         pcci->pcci_type = type;
1200         pcci->pcci_attr_valid = false;
1201 }
1202
1203 static int __pcc_inode_create(struct pcc_dataset *dataset,
1204                               struct lu_fid *fid,
1205                               struct dentry **dentry)
1206 {
1207         char *path;
1208         struct dentry *base;
1209         struct dentry *child;
1210         int rc = 0;
1211
1212         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
1213         if (path == NULL)
1214                 return -ENOMEM;
1215
1216         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
1217
1218         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
1219         if (IS_ERR(base)) {
1220                 rc = PTR_ERR(base);
1221                 GOTO(out, rc);
1222         }
1223
1224         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
1225         child = pcc_create(base, path, 0600);
1226         if (IS_ERR(child)) {
1227                 rc = PTR_ERR(child);
1228                 GOTO(out_base, rc);
1229         }
1230         *dentry = child;
1231
1232 out_base:
1233         dput(base);
1234 out:
1235         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
1236         return rc;
1237 }
1238
1239 int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
1240                      struct dentry **pcc_dentry)
1241 {
1242         return __pcc_inode_create(dataset, fid, pcc_dentry);
1243 }
1244
1245 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
1246                           struct dentry *pcc_dentry)
1247 {
1248         struct pcc_inode *pcci;
1249         int rc = 0;
1250
1251         ENTRY;
1252
1253         pcc_inode_lock(inode);
1254         LASSERT(ll_i2pcci(inode) == NULL);
1255         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1256         if (pcci == NULL)
1257                 GOTO(out_unlock, rc = -ENOMEM);
1258
1259         pcc_inode_init(pcci, ll_i2info(inode));
1260         pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
1261         /* Set the layout generation of newly created file with 0 */
1262         pcc_layout_gen_set(pcci, 0);
1263
1264 out_unlock:
1265         if (rc) {
1266                 int rc2;
1267
1268                 rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1269                 if (rc2)
1270                         CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1271
1272                 dput(pcc_dentry);
1273         }
1274
1275         pcc_inode_unlock(inode);
1276         RETURN(rc);
1277 }
1278
1279 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
1280                           loff_t *offset)
1281 {
1282         while (count > 0) {
1283                 ssize_t size;
1284
1285                 size = vfs_write(filp, (const void __user *)buf, count, offset);
1286                 if (size < 0)
1287                         return size;
1288                 count -= size;
1289                 buf += size;
1290         }
1291         return 0;
1292 }
1293
1294 static int pcc_copy_data(struct file *src, struct file *dst)
1295 {
1296         int rc = 0;
1297         ssize_t rc2;
1298         mm_segment_t oldfs;
1299         loff_t pos, offset = 0;
1300         size_t buf_len = 1048576;
1301         void *buf;
1302
1303         ENTRY;
1304
1305         OBD_ALLOC_LARGE(buf, buf_len);
1306         if (buf == NULL)
1307                 RETURN(-ENOMEM);
1308
1309         oldfs = get_fs();
1310         set_fs(KERNEL_DS);
1311         while (1) {
1312                 pos = offset;
1313                 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
1314                 if (rc2 < 0)
1315                         GOTO(out_fs, rc = rc2);
1316                 else if (rc2 == 0)
1317                         break;
1318
1319                 pos = offset;
1320                 rc = pcc_filp_write(dst, buf, rc2, &pos);
1321                 if (rc < 0)
1322                         GOTO(out_fs, rc);
1323                 offset += rc2;
1324         }
1325
1326 out_fs:
1327         set_fs(oldfs);
1328         OBD_FREE_LARGE(buf, buf_len);
1329         RETURN(rc);
1330 }
1331
1332 static int pcc_attach_allowed_check(struct inode *inode)
1333 {
1334         struct ll_inode_info *lli = ll_i2info(inode);
1335         struct pcc_inode *pcci;
1336         int rc = 0;
1337
1338         ENTRY;
1339
1340         pcc_inode_lock(inode);
1341         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1342                 GOTO(out_unlock, rc = -EBUSY);
1343
1344         pcci = ll_i2pcci(inode);
1345         if (pcci && pcc_inode_has_layout(pcci))
1346                 GOTO(out_unlock, rc = -EEXIST);
1347
1348         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
1349 out_unlock:
1350         pcc_inode_unlock(inode);
1351         RETURN(rc);
1352 }
1353
1354 int pcc_readwrite_attach(struct file *file, struct inode *inode,
1355                          __u32 archive_id)
1356 {
1357         struct pcc_dataset *dataset;
1358         struct ll_inode_info *lli = ll_i2info(inode);
1359         struct pcc_inode *pcci;
1360         struct dentry *dentry;
1361         struct file *pcc_filp;
1362         struct path path;
1363         int rc;
1364
1365         ENTRY;
1366
1367         rc = pcc_attach_allowed_check(inode);
1368         if (rc)
1369                 RETURN(rc);
1370
1371         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
1372                                   archive_id);
1373         if (dataset == NULL)
1374                 RETURN(-ENOENT);
1375
1376         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
1377         if (rc)
1378                 GOTO(out_dataset_put, rc);
1379
1380         path.mnt = dataset->pccd_path.mnt;
1381         path.dentry = dentry;
1382 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1383         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
1384                                current_cred());
1385 #else
1386         pcc_filp = dentry_open(path.dentry, path.mnt,
1387                                O_TRUNC | O_WRONLY | O_LARGEFILE,
1388                                current_cred());
1389 #endif
1390         if (IS_ERR_OR_NULL(pcc_filp)) {
1391                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
1392                 GOTO(out_dentry, rc);
1393         }
1394
1395         rc = pcc_copy_data(file, pcc_filp);
1396         if (rc)
1397                 GOTO(out_fput, rc);
1398
1399         /* Pause to allow for a race with concurrent HSM remove */
1400         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
1401
1402         pcc_inode_lock(inode);
1403         pcci = ll_i2pcci(inode);
1404         LASSERT(!pcci);
1405         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1406         if (pcci == NULL)
1407                 GOTO(out_unlock, rc = -ENOMEM);
1408
1409         pcc_inode_init(pcci, lli);
1410         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
1411 out_unlock:
1412         pcc_inode_unlock(inode);
1413 out_fput:
1414         fput(pcc_filp);
1415 out_dentry:
1416         if (rc) {
1417                 int rc2;
1418
1419                 rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1420                 if (rc2)
1421                         CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1422
1423                 dput(dentry);
1424         }
1425 out_dataset_put:
1426         pcc_dataset_put(dataset);
1427         RETURN(rc);
1428 }
1429
1430 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1431                               __u32 gen, bool lease_broken, int rc,
1432                               bool attached)
1433 {
1434         struct ll_inode_info *lli = ll_i2info(inode);
1435         struct pcc_inode *pcci;
1436         __u32 gen2;
1437
1438         ENTRY;
1439
1440         pcc_inode_lock(inode);
1441         pcci = ll_i2pcci(inode);
1442         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
1443         if ((rc || lease_broken)) {
1444                 if (attached && pcci)
1445                         pcc_inode_put(pcci);
1446
1447                 GOTO(out_unlock, rc);
1448         }
1449
1450         /* PCC inode may be released due to layout lock revocatioin */
1451         if (!pcci)
1452                 GOTO(out_unlock, rc = -ESTALE);
1453
1454         LASSERT(attached);
1455         rc = ll_layout_refresh(inode, &gen2);
1456         if (!rc) {
1457                 if (gen2 == gen) {
1458                         pcc_layout_gen_set(pcci, gen);
1459                 } else {
1460                         CDEBUG(D_CACHE,
1461                                DFID" layout changed from %d to %d.\n",
1462                                PFID(ll_inode2fid(inode)), gen, gen2);
1463                         GOTO(out_put, rc = -ESTALE);
1464                 }
1465         }
1466
1467 out_put:
1468         if (rc) {
1469                 pcc_inode_remove(pcci);
1470                 pcc_inode_put(pcci);
1471         }
1472 out_unlock:
1473         pcc_inode_unlock(inode);
1474         RETURN(rc);
1475 }
1476
1477 int pcc_ioctl_detach(struct inode *inode)
1478 {
1479         struct ll_inode_info *lli = ll_i2info(inode);
1480         struct pcc_inode *pcci;
1481         int rc = 0;
1482
1483         ENTRY;
1484
1485         pcc_inode_lock(inode);
1486         pcci = lli->lli_pcc_inode;
1487         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
1488             !pcc_inode_has_layout(pcci))
1489                 GOTO(out_unlock, rc = 0);
1490
1491         __pcc_layout_invalidate(pcci);
1492         pcc_inode_put(pcci);
1493
1494 out_unlock:
1495         pcc_inode_unlock(inode);
1496         RETURN(rc);
1497 }
1498
1499 int pcc_ioctl_state(struct file *file, struct inode *inode,
1500                     struct lu_pcc_state *state)
1501 {
1502         int rc = 0;
1503         int count;
1504         char *buf;
1505         char *path;
1506         int buf_len = sizeof(state->pccs_path);
1507         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1508         struct pcc_file *pccf = &fd->fd_pcc_file;
1509         struct pcc_inode *pcci;
1510
1511         ENTRY;
1512
1513         if (buf_len <= 0)
1514                 RETURN(-EINVAL);
1515
1516         OBD_ALLOC(buf, buf_len);
1517         if (buf == NULL)
1518                 RETURN(-ENOMEM);
1519
1520         pcc_inode_lock(inode);
1521         pcci = ll_i2pcci(inode);
1522         if (pcci == NULL) {
1523                 state->pccs_type = LU_PCC_NONE;
1524                 GOTO(out_unlock, rc = 0);
1525         }
1526
1527         count = atomic_read(&pcci->pcci_refcount);
1528         if (count == 0) {
1529                 state->pccs_type = LU_PCC_NONE;
1530                 state->pccs_open_count = 0;
1531                 GOTO(out_unlock, rc = 0);
1532         }
1533
1534         if (pcc_inode_has_layout(pcci))
1535                 count--;
1536         if (pccf->pccf_file != NULL)
1537                 count--;
1538         state->pccs_type = pcci->pcci_type;
1539         state->pccs_open_count = count;
1540         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
1541 #ifdef HAVE_DENTRY_PATH_RAW
1542         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1543         if (IS_ERR(path))
1544                 GOTO(out_unlock, rc = PTR_ERR(path));
1545 #else
1546         path = "UNKNOWN";
1547 #endif
1548
1549         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1550                 GOTO(out_unlock, rc = -ENAMETOOLONG);
1551
1552 out_unlock:
1553         pcc_inode_unlock(inode);
1554         OBD_FREE(buf, buf_len);
1555         RETURN(rc);
1556 }