Whamcloud - gitweb
LU-10092 pcc: security and permission for non-root user access
[fs/lustre-release.git] / lustre / llite / pcc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, DDN Storage Corporation.
24  */
25 /*
26  * Persistent Client Cache
27  *
28  * PCC is a new framework which provides a group of local cache on Lustre
29  * client side. It works in two modes: RW-PCC enables a read-write cache on the
30  * local SSDs of a single client; RO-PCC provides a read-only cache on the
31  * local SSDs of multiple clients. Less overhead is visible to the applications
32  * and network latencies and lock conflicts can be significantly reduced.
33  *
34  * For RW-PCC, no global namespace will be provided. Each client uses its own
35  * local storage as a cache for itself. Local file system is used to manage
36  * the data on local caches. Cached I/O is directed to local file system while
37  * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38  * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39  * PCC has a copytool instance running with unique archive number. Any remote
40  * access from another Lustre client would trigger the data synchronization. If
41  * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42  * other client temporarily. And after the RW-PCC client reboots and the
43  * copytool restarts, the data will be accessible again.
44  *
45  * Following is what will happen in different conditions for RW-PCC:
46  *
47  * > When file is being created on RW-PCC
48  *
49  * A normal HSM released file is created on MDT;
50  * An empty mirror file is created on local cache;
51  * The HSM status of the Lustre file will be set to archived and released;
52  * The archive number will be set to the proper value.
53  *
54  * > When file is being prefetched to RW-PCC
55  *
56  * An file is copied to the local cache;
57  * The HSM status of the Lustre file will be set to archived and released;
58  * The archive number will be set to the proper value.
59  *
60  * > When file is being accessed from PCC
61  *
62  * Data will be read directly from local cache;
63  * Metadata will be read from MDT, except file size;
64  * File size will be got from local cache.
65  *
66  * > When PCC cached file is being accessed on another client
67  *
68  * RW-PCC cached files are automatically restored when a process on another
69  * client tries to read or modify them. The corresponding I/O will block
70  * waiting for the released file to be restored. This is transparent to the
71  * process.
72  *
73  * For RW-PCC, when a file is being created, a rule-based policy is used to
74  * determine whether it will be cached. Rule-based caching of newly created
75  * files can determine which file can use a cache on PCC directly without any
76  * admission control.
77  *
78  * RW-PCC design can accelerate I/O intensive applications with one-to-one
79  * mappings between files and accessing clients. However, in several use cases,
80  * files will never be updated, but need to be read simultaneously from many
81  * clients. RO-PCC implements a read-only caching on Lustre clients using
82  * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83  * that no HSM mechanism is used.
84  *
85  * The main advantages to use this SSD cache on the Lustre clients via PCC
86  * is that:
87  * - The I/O stack becomes much simpler for the cached data, as there is no
88  *   interference with I/Os from other clients, which enables easier
89  *   performance optimizations;
90  * - The requirements on the HW inside the client nodes are small, any kind of
91  *   SSDs or even HDDs can be used as cache devices;
92  * - Caching reduces the pressure on the object storage targets (OSTs), as
93  *   small or random I/Os can be regularized to big sequential I/Os and
94  *   temporary files do not even need to be flushed to OSTs.
95  *
96  * PCC can accelerate applications with certain I/O patterns:
97  * - small-sized random writes (< 1MB) from a single client
98  * - repeated read of data that is larger than RAM
99  * - clients with high network latency
100  *
101  * Author: Li Xi <lixi@ddn.com>
102  * Author: Qian Yingjin <qian@ddn.com>
103  */
104
105 #define DEBUG_SUBSYSTEM S_LLITE
106
107 #include "pcc.h"
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
112
113 struct kmem_cache *pcc_inode_slab;
114
115 int pcc_super_init(struct pcc_super *super)
116 {
117         struct cred *cred;
118
119         super->pccs_cred = cred = prepare_creds();
120         if (!cred)
121                 return -ENOMEM;
122
123         /* Never override disk quota limits or use reserved space */
124         cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125         spin_lock_init(&super->pccs_lock);
126         INIT_LIST_HEAD(&super->pccs_datasets);
127
128         return 0;
129 }
130
131 /**
132  * pcc_dataset_add - Add a Cache policy to control which files need be
133  * cached and where it will be cached.
134  *
135  * @super: superblock of pcc
136  * @pathname: root path of pcc
137  * @id: HSM archive ID
138  * @projid: files with specified project ID will be cached.
139  */
140 static int
141 pcc_dataset_add(struct pcc_super *super, const char *pathname,
142                 __u32 archive_id, __u32 projid)
143 {
144         int rc;
145         struct pcc_dataset *dataset;
146         struct pcc_dataset *tmp;
147         bool found = false;
148
149         OBD_ALLOC_PTR(dataset);
150         if (dataset == NULL)
151                 return -ENOMEM;
152
153         rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
154         if (unlikely(rc)) {
155                 OBD_FREE_PTR(dataset);
156                 return rc;
157         }
158         strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
159         dataset->pccd_id = archive_id;
160         dataset->pccd_projid = projid;
161         atomic_set(&dataset->pccd_refcount, 1);
162
163         spin_lock(&super->pccs_lock);
164         list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
165                 if (tmp->pccd_id == archive_id) {
166                         found = true;
167                         break;
168                 }
169         }
170         if (!found)
171                 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
172         spin_unlock(&super->pccs_lock);
173
174         if (found) {
175                 pcc_dataset_put(dataset);
176                 rc = -EEXIST;
177         }
178
179         return rc;
180 }
181
182 struct pcc_dataset *
183 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
184 {
185         struct pcc_dataset *dataset;
186         struct pcc_dataset *selected = NULL;
187
188         if (projid == 0 && archive_id == 0)
189                 return NULL;
190
191         /*
192          * archive ID is unique in the list, projid might be duplicate,
193          * we just return last added one as first priority.
194          */
195         spin_lock(&super->pccs_lock);
196         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
197                 if (projid && dataset->pccd_projid != projid)
198                         continue;
199                 if (archive_id && dataset->pccd_id != archive_id)
200                         continue;
201                 atomic_inc(&dataset->pccd_refcount);
202                 selected = dataset;
203                 break;
204         }
205         spin_unlock(&super->pccs_lock);
206         if (selected)
207                 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
208                        selected->pccd_projid);
209         return selected;
210 }
211
212 void
213 pcc_dataset_put(struct pcc_dataset *dataset)
214 {
215         if (atomic_dec_and_test(&dataset->pccd_refcount)) {
216                 path_put(&dataset->pccd_path);
217                 OBD_FREE_PTR(dataset);
218         }
219 }
220
221 static int
222 pcc_dataset_del(struct pcc_super *super, char *pathname)
223 {
224         struct list_head *l, *tmp;
225         struct pcc_dataset *dataset;
226         int rc = -ENOENT;
227
228         spin_lock(&super->pccs_lock);
229         list_for_each_safe(l, tmp, &super->pccs_datasets) {
230                 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
231                 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
232                         list_del(&dataset->pccd_linkage);
233                         pcc_dataset_put(dataset);
234                         rc = 0;
235                         break;
236                 }
237         }
238         spin_unlock(&super->pccs_lock);
239         return rc;
240 }
241
242 static void
243 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
244 {
245         seq_printf(m, "%s:\n", dataset->pccd_pathname);
246         seq_printf(m, "  rwid: %u\n", dataset->pccd_id);
247         seq_printf(m, "  autocache: projid=%u\n", dataset->pccd_projid);
248 }
249
250 int
251 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
252 {
253         struct pcc_dataset *dataset;
254
255         spin_lock(&super->pccs_lock);
256         list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
257                 pcc_dataset_dump(dataset, m);
258         }
259         spin_unlock(&super->pccs_lock);
260         return 0;
261 }
262
263 static void pcc_remove_datasets(struct pcc_super *super)
264 {
265         struct pcc_dataset *dataset, *tmp;
266
267         list_for_each_entry_safe(dataset, tmp,
268                                  &super->pccs_datasets, pccd_linkage) {
269                 list_del(&dataset->pccd_linkage);
270                 pcc_dataset_put(dataset);
271         }
272 }
273
274 void pcc_super_fini(struct pcc_super *super)
275 {
276         pcc_remove_datasets(super);
277         put_cred(super->pccs_cred);
278 }
279
280 static bool pathname_is_valid(const char *pathname)
281 {
282         /* Needs to be absolute path */
283         if (pathname == NULL || strlen(pathname) == 0 ||
284             strlen(pathname) >= PATH_MAX || pathname[0] != '/')
285                 return false;
286         return true;
287 }
288
289 static struct pcc_cmd *
290 pcc_cmd_parse(char *buffer, unsigned long count)
291 {
292         static struct pcc_cmd *cmd;
293         char *token;
294         char *val;
295         unsigned long tmp;
296         int rc = 0;
297
298         OBD_ALLOC_PTR(cmd);
299         if (cmd == NULL)
300                 GOTO(out, rc = -ENOMEM);
301
302         /* clear all setting */
303         if (strncmp(buffer, "clear", 5) == 0) {
304                 cmd->pccc_cmd = PCC_CLEAR_ALL;
305                 GOTO(out, rc = 0);
306         }
307
308         val = buffer;
309         token = strsep(&val, " ");
310         if (val == NULL || strlen(val) == 0)
311                 GOTO(out_free_cmd, rc = -EINVAL);
312
313         /* Type of the command */
314         if (strcmp(token, "add") == 0)
315                 cmd->pccc_cmd = PCC_ADD_DATASET;
316         else if (strcmp(token, "del") == 0)
317                 cmd->pccc_cmd = PCC_DEL_DATASET;
318         else
319                 GOTO(out_free_cmd, rc = -EINVAL);
320
321         /* Pathname of the dataset */
322         token = strsep(&val, " ");
323         if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
324             !pathname_is_valid(token))
325                 GOTO(out_free_cmd, rc = -EINVAL);
326         cmd->pccc_pathname = token;
327
328         if (cmd->pccc_cmd == PCC_ADD_DATASET) {
329                 /* archive ID */
330                 token = strsep(&val, " ");
331                 if (val == NULL)
332                         GOTO(out_free_cmd, rc = -EINVAL);
333
334                 rc = kstrtoul(token, 10, &tmp);
335                 if (rc != 0)
336                         GOTO(out_free_cmd, rc = -EINVAL);
337                 if (tmp == 0)
338                         GOTO(out_free_cmd, rc = -EINVAL);
339                 cmd->u.pccc_add.pccc_id = tmp;
340
341                 token = val;
342                 rc = kstrtoul(token, 10, &tmp);
343                 if (rc != 0)
344                         GOTO(out_free_cmd, rc = -EINVAL);
345                 if (tmp == 0)
346                         GOTO(out_free_cmd, rc = -EINVAL);
347                 cmd->u.pccc_add.pccc_projid = tmp;
348         }
349
350         goto out;
351 out_free_cmd:
352         OBD_FREE_PTR(cmd);
353 out:
354         if (rc)
355                 cmd = ERR_PTR(rc);
356         return cmd;
357 }
358
359 int pcc_cmd_handle(char *buffer, unsigned long count,
360                    struct pcc_super *super)
361 {
362         int rc = 0;
363         struct pcc_cmd *cmd;
364
365         cmd = pcc_cmd_parse(buffer, count);
366         if (IS_ERR(cmd))
367                 return PTR_ERR(cmd);
368
369         switch (cmd->pccc_cmd) {
370         case PCC_ADD_DATASET:
371                 rc = pcc_dataset_add(super, cmd->pccc_pathname,
372                                       cmd->u.pccc_add.pccc_id,
373                                       cmd->u.pccc_add.pccc_projid);
374                 break;
375         case PCC_DEL_DATASET:
376                 rc = pcc_dataset_del(super, cmd->pccc_pathname);
377                 break;
378         case PCC_CLEAR_ALL:
379                 pcc_remove_datasets(super);
380                 break;
381         default:
382                 rc = -EINVAL;
383                 break;
384         }
385
386         OBD_FREE_PTR(cmd);
387         return rc;
388 }
389
390 static inline void pcc_inode_lock(struct inode *inode)
391 {
392         mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
393 }
394
395 static inline void pcc_inode_unlock(struct inode *inode)
396 {
397         mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
398 }
399
400 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
401 {
402         pcci->pcci_lli = lli;
403         lli->lli_pcc_inode = pcci;
404         atomic_set(&pcci->pcci_refcount, 0);
405         pcci->pcci_type = LU_PCC_NONE;
406         pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
407         atomic_set(&pcci->pcci_active_ios, 0);
408         init_waitqueue_head(&pcci->pcci_waitq);
409 }
410
411 static void pcc_inode_fini(struct pcc_inode *pcci)
412 {
413         struct ll_inode_info *lli = pcci->pcci_lli;
414
415         path_put(&pcci->pcci_path);
416         pcci->pcci_type = LU_PCC_NONE;
417         OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
418         lli->lli_pcc_inode = NULL;
419 }
420
421 static void pcc_inode_get(struct pcc_inode *pcci)
422 {
423         atomic_inc(&pcci->pcci_refcount);
424 }
425
426 static void pcc_inode_put(struct pcc_inode *pcci)
427 {
428         if (atomic_dec_and_test(&pcci->pcci_refcount))
429                 pcc_inode_fini(pcci);
430 }
431
432 void pcc_inode_free(struct inode *inode)
433 {
434         struct pcc_inode *pcci = ll_i2pcci(inode);
435
436         if (pcci) {
437                 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
438                 pcc_inode_put(pcci);
439         }
440 }
441
442 /*
443  * TODO:
444  * As Andreas suggested, we'd better use new layout to
445  * reduce overhead:
446  * (fid->f_oid >> 16 & oxFFFF)/FID
447  */
448 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
449 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
450 {
451         return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
452                         DFID_NOBRACE,
453                         (fid)->f_oid       & 0xFFFF,
454                         (fid)->f_oid >> 16 & 0xFFFF,
455                         (unsigned int)((fid)->f_seq       & 0xFFFF),
456                         (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
457                         (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
458                         (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
459                         PFID(fid));
460 }
461
462 static inline const struct cred *pcc_super_cred(struct super_block *sb)
463 {
464         return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
465 }
466
467 void pcc_file_init(struct pcc_file *pccf)
468 {
469         pccf->pccf_file = NULL;
470         pccf->pccf_type = LU_PCC_NONE;
471 }
472
473 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
474 {
475         return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
476 }
477
478 int pcc_file_open(struct inode *inode, struct file *file)
479 {
480         struct pcc_inode *pcci;
481         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
482         struct pcc_file *pccf = &fd->fd_pcc_file;
483         struct file *pcc_file;
484         struct path *path;
485         struct qstr *dname;
486         int rc = 0;
487
488         ENTRY;
489
490         if (!S_ISREG(inode->i_mode))
491                 RETURN(0);
492
493         pcc_inode_lock(inode);
494         pcci = ll_i2pcci(inode);
495         if (!pcci)
496                 GOTO(out_unlock, rc = 0);
497
498         if (atomic_read(&pcci->pcci_refcount) == 0 ||
499             !pcc_inode_has_layout(pcci))
500                 GOTO(out_unlock, rc = 0);
501
502         pcc_inode_get(pcci);
503         WARN_ON(pccf->pccf_file);
504
505         path = &pcci->pcci_path;
506         dname = &path->dentry->d_name;
507         CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
508                dname->name);
509
510 #ifdef HAVE_DENTRY_OPEN_USE_PATH
511         pcc_file = dentry_open(path, file->f_flags,
512                                pcc_super_cred(inode->i_sb));
513 #else
514         pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
515                                pcc_super_cred(inode->i_sb));
516 #endif
517         if (IS_ERR_OR_NULL(pcc_file)) {
518                 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
519                 pcc_inode_put(pcci);
520         } else {
521                 pccf->pccf_file = pcc_file;
522                 pccf->pccf_type = pcci->pcci_type;
523         }
524
525 out_unlock:
526         pcc_inode_unlock(inode);
527         RETURN(rc);
528 }
529
530 void pcc_file_release(struct inode *inode, struct file *file)
531 {
532         struct pcc_inode *pcci;
533         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
534         struct pcc_file *pccf;
535         struct path *path;
536         struct qstr *dname;
537
538         ENTRY;
539
540         if (!S_ISREG(inode->i_mode) || fd == NULL)
541                 RETURN_EXIT;
542
543         pccf = &fd->fd_pcc_file;
544         pcc_inode_lock(inode);
545         if (pccf->pccf_file == NULL)
546                 goto out;
547
548         pcci = ll_i2pcci(inode);
549         LASSERT(pcci);
550         path = &pcci->pcci_path;
551         dname = &path->dentry->d_name;
552         CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
553                dname->name);
554         pcc_inode_put(pcci);
555         fput(pccf->pccf_file);
556         pccf->pccf_file = NULL;
557 out:
558         pcc_inode_unlock(inode);
559         RETURN_EXIT;
560 }
561
562 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
563                                       __u32 gen)
564 {
565         pcci->pcci_layout_gen = gen;
566 }
567
568 static void pcc_io_init(struct inode *inode, bool *cached)
569 {
570         struct pcc_inode *pcci;
571
572         pcc_inode_lock(inode);
573         pcci = ll_i2pcci(inode);
574         if (pcci && pcc_inode_has_layout(pcci)) {
575                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
576                 atomic_inc(&pcci->pcci_active_ios);
577                 *cached = true;
578         } else {
579                 *cached = false;
580         }
581         pcc_inode_unlock(inode);
582 }
583
584 static void pcc_io_fini(struct inode *inode)
585 {
586         struct pcc_inode *pcci = ll_i2pcci(inode);
587
588         LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
589         if (atomic_dec_and_test(&pcci->pcci_active_ios))
590                 wake_up_all(&pcci->pcci_waitq);
591 }
592
593
594 static ssize_t
595 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
596 {
597         struct file *file = iocb->ki_filp;
598
599 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
600         return file->f_op->read_iter(iocb, iter);
601 #else
602         struct iovec iov;
603         struct iov_iter i;
604         ssize_t bytes = 0;
605
606         iov_for_each(iov, i, *iter) {
607                 ssize_t res;
608
609                 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
610                 if (-EIOCBQUEUED == res)
611                         res = wait_on_sync_kiocb(iocb);
612                 if (res <= 0) {
613                         if (bytes == 0)
614                                 bytes = res;
615                         break;
616                 }
617
618                 bytes += res;
619                 if (res < iov.iov_len)
620                         break;
621         }
622
623         if (bytes > 0)
624                 iov_iter_advance(iter, bytes);
625         return bytes;
626 #endif
627 }
628
629 ssize_t pcc_file_read_iter(struct kiocb *iocb,
630                            struct iov_iter *iter, bool *cached)
631 {
632         struct file *file = iocb->ki_filp;
633         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
634         struct pcc_file *pccf = &fd->fd_pcc_file;
635         struct inode *inode = file_inode(file);
636         ssize_t result;
637
638         ENTRY;
639
640         if (pccf->pccf_file == NULL) {
641                 *cached = false;
642                 RETURN(0);
643         }
644
645         pcc_io_init(inode, cached);
646         if (!*cached)
647                 RETURN(0);
648
649         iocb->ki_filp = pccf->pccf_file;
650         /* generic_file_aio_read does not support ext4-dax,
651          * __pcc_file_read_iter uses ->aio_read hook directly
652          * to add support for ext4-dax.
653          */
654         result = __pcc_file_read_iter(iocb, iter);
655         iocb->ki_filp = file;
656
657         pcc_io_fini(inode);
658         RETURN(result);
659 }
660
661 static ssize_t
662 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
663 {
664         struct file *file = iocb->ki_filp;
665
666 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
667         return file->f_op->write_iter(iocb, iter);
668 #else
669         struct iovec iov;
670         struct iov_iter i;
671         ssize_t bytes = 0;
672
673         iov_for_each(iov, i, *iter) {
674                 ssize_t res;
675
676                 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
677                 if (-EIOCBQUEUED == res)
678                         res = wait_on_sync_kiocb(iocb);
679                 if (res <= 0) {
680                         if (bytes == 0)
681                                 bytes = res;
682                         break;
683                 }
684
685                 bytes += res;
686                 if (res < iov.iov_len)
687                         break;
688         }
689
690         if (bytes > 0)
691                 iov_iter_advance(iter, bytes);
692         return bytes;
693 #endif
694 }
695
696 ssize_t pcc_file_write_iter(struct kiocb *iocb,
697                             struct iov_iter *iter, bool *cached)
698 {
699         struct file *file = iocb->ki_filp;
700         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
701         struct pcc_file *pccf = &fd->fd_pcc_file;
702         struct inode *inode = file_inode(file);
703         ssize_t result;
704
705         ENTRY;
706
707         if (pccf->pccf_file == NULL) {
708                 *cached = false;
709                 RETURN(0);
710         }
711
712         if (pccf->pccf_type != LU_PCC_READWRITE) {
713                 *cached = false;
714                 RETURN(-EAGAIN);
715         }
716
717         pcc_io_init(inode, cached);
718         if (!*cached)
719                 RETURN(0);
720
721         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
722                 GOTO(out, result = -ENOSPC);
723
724         iocb->ki_filp = pccf->pccf_file;
725
726         /* Since __pcc_file_write_iter makes write calls via
727          * the normal vfs interface to the local PCC file system,
728          * the inode lock is not needed.
729          */
730         result = __pcc_file_write_iter(iocb, iter);
731         iocb->ki_filp = file;
732 out:
733         pcc_io_fini(inode);
734         RETURN(result);
735 }
736
737 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
738                       bool *cached)
739 {
740         int rc;
741         const struct cred *old_cred;
742         struct iattr attr2 = *attr;
743         struct dentry *pcc_dentry;
744         struct pcc_inode *pcci;
745
746         ENTRY;
747
748         if (!S_ISREG(inode->i_mode)) {
749                 *cached = false;
750                 RETURN(0);
751         }
752
753         pcc_io_init(inode, cached);
754         if (!*cached)
755                 RETURN(0);
756
757         attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
758                          ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
759                          ATTR_CTIME | ATTR_UID | ATTR_GID);
760         pcci = ll_i2pcci(inode);
761         pcc_dentry = pcci->pcci_path.dentry;
762         inode_lock(pcc_dentry->d_inode);
763         old_cred = override_creds(pcc_super_cred(inode->i_sb));
764         rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
765         revert_creds(old_cred);
766         inode_unlock(pcc_dentry->d_inode);
767
768         pcc_io_fini(inode);
769         RETURN(rc);
770 }
771
772 int pcc_inode_getattr(struct inode *inode, bool *cached)
773 {
774         struct ll_inode_info *lli = ll_i2info(inode);
775         const struct cred *old_cred;
776         struct kstat stat;
777         s64 atime;
778         s64 mtime;
779         s64 ctime;
780         int rc;
781
782         ENTRY;
783
784         if (!S_ISREG(inode->i_mode)) {
785                 *cached = false;
786                 RETURN(0);
787         }
788
789         pcc_io_init(inode, cached);
790         if (!*cached)
791                 RETURN(0);
792
793         old_cred = override_creds(pcc_super_cred(inode->i_sb));
794         rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
795         revert_creds(old_cred);
796         if (rc)
797                 GOTO(out, rc);
798
799         ll_inode_size_lock(inode);
800         if (inode->i_atime.tv_sec < lli->lli_atime ||
801             lli->lli_update_atime) {
802                 inode->i_atime.tv_sec = lli->lli_atime;
803                 lli->lli_update_atime = 0;
804         }
805         inode->i_mtime.tv_sec = lli->lli_mtime;
806         inode->i_ctime.tv_sec = lli->lli_ctime;
807
808         atime = inode->i_atime.tv_sec;
809         mtime = inode->i_mtime.tv_sec;
810         ctime = inode->i_ctime.tv_sec;
811
812         if (atime < stat.atime.tv_sec)
813                 atime = stat.atime.tv_sec;
814
815         if (ctime < stat.ctime.tv_sec)
816                 ctime = stat.ctime.tv_sec;
817
818         if (mtime < stat.mtime.tv_sec)
819                 mtime = stat.mtime.tv_sec;
820
821         i_size_write(inode, stat.size);
822         inode->i_blocks = stat.blocks;
823
824         inode->i_atime.tv_sec = atime;
825         inode->i_mtime.tv_sec = mtime;
826         inode->i_ctime.tv_sec = ctime;
827
828         ll_inode_size_unlock(inode);
829 out:
830         pcc_io_fini(inode);
831         RETURN(rc);
832 }
833
834 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
835                              struct pipe_inode_info *pipe,
836                              size_t count, unsigned int flags,
837                              bool *cached)
838 {
839         struct inode *inode = file_inode(in_file);
840         struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
841         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
842         ssize_t result;
843
844         ENTRY;
845
846         *cached = false;
847         if (!pcc_file)
848                 RETURN(0);
849
850         if (!file_inode(pcc_file)->i_fop->splice_read)
851                 RETURN(-ENOTSUPP);
852
853         pcc_io_init(inode, cached);
854         if (!*cached)
855                 RETURN(0);
856
857         result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
858                                                           ppos, pipe, count,
859                                                           flags);
860
861         pcc_io_fini(inode);
862         RETURN(result);
863 }
864
865 int pcc_fsync(struct file *file, loff_t start, loff_t end,
866               int datasync, bool *cached)
867 {
868         struct inode *inode = file_inode(file);
869         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
870         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
871         int rc;
872
873         ENTRY;
874
875         if (!pcc_file) {
876                 *cached = false;
877                 RETURN(0);
878         }
879
880         pcc_io_init(inode, cached);
881         if (!*cached)
882                 RETURN(0);
883
884 #ifdef HAVE_FILE_FSYNC_4ARGS
885         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
886                                                 start, end, datasync);
887 #elif defined(HAVE_FILE_FSYNC_2ARGS)
888         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
889 #else
890         rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
891                                 file_dentry(dentry), datasync);
892 #endif
893
894         pcc_io_fini(inode);
895         RETURN(rc);
896 }
897
898 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
899                   bool *cached)
900 {
901         struct inode *inode = file_inode(file);
902         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
903         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
904         struct pcc_inode *pcci;
905         int rc = 0;
906
907         ENTRY;
908
909         if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
910                 *cached = false;
911                 RETURN(0);
912         }
913
914         pcc_inode_lock(inode);
915         pcci = ll_i2pcci(inode);
916         if (pcci && pcc_inode_has_layout(pcci)) {
917                 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
918                 *cached = true;
919                 vma->vm_file = pcc_file;
920                 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
921                 vma->vm_file = file;
922                 /* Save the vm ops of backend PCC */
923                 vma->vm_private_data = (void *)vma->vm_ops;
924         } else {
925                 *cached = false;
926         }
927         pcc_inode_unlock(inode);
928
929         RETURN(rc);
930 }
931
932 void pcc_vm_open(struct vm_area_struct *vma)
933 {
934         struct pcc_inode *pcci;
935         struct file *file = vma->vm_file;
936         struct inode *inode = file_inode(file);
937         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
938         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
939         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
940
941         ENTRY;
942
943         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
944                 RETURN_EXIT;
945
946         pcc_inode_lock(inode);
947         pcci = ll_i2pcci(inode);
948         if (pcci && pcc_inode_has_layout(pcci)) {
949                 vma->vm_file = pcc_file;
950                 pcc_vm_ops->open(vma);
951                 vma->vm_file = file;
952         }
953         pcc_inode_unlock(inode);
954         EXIT;
955 }
956
957 void pcc_vm_close(struct vm_area_struct *vma)
958 {
959         struct file *file = vma->vm_file;
960         struct inode *inode = file_inode(file);
961         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
962         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
963         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
964
965         ENTRY;
966
967         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
968                 RETURN_EXIT;
969
970         pcc_inode_lock(inode);
971         /* Layout lock maybe revoked here */
972         vma->vm_file = pcc_file;
973         pcc_vm_ops->close(vma);
974         vma->vm_file = file;
975         pcc_inode_unlock(inode);
976         EXIT;
977 }
978
979 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
980                      bool *cached)
981 {
982         struct page *page = vmf->page;
983         struct mm_struct *mm = vma->vm_mm;
984         struct file *file = vma->vm_file;
985         struct inode *inode = file_inode(file);
986         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
987         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
988         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
989         int rc;
990
991         ENTRY;
992
993         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
994                 *cached = false;
995                 RETURN(0);
996         }
997
998         /* Pause to allow for a race with concurrent detach */
999         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1000
1001         pcc_io_init(inode, cached);
1002         if (!*cached) {
1003                 /* This happens when the file is detached from PCC after got
1004                  * the fault page via ->fault() on the inode of the PCC copy.
1005                  * Here it can not simply fall back to normal Lustre I/O path.
1006                  * The reason is that the address space of fault page used by
1007                  * ->page_mkwrite() is still the one of PCC inode. In the
1008                  * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1009                  * handled as the address space of the fault page is not
1010                  * consistent with the one of the Lustre inode (though the
1011                  * fault page was truncated).
1012                  * As the file is detached from PCC, the fault page must
1013                  * be released frist, and retry the mmap write (->fault() and
1014                  * ->page_mkwrite).
1015                  * We use an ugly and tricky method by returning
1016                  * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1017                  * __do_page_fault and retry the memory fault handling.
1018                  */
1019                 if (page->mapping == file_inode(pcc_file)->i_mapping) {
1020                         *cached = true;
1021                         up_read(&mm->mmap_sem);
1022                         RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1023                 }
1024
1025                 RETURN(0);
1026         }
1027
1028         /*
1029          * This fault injection can also be used to simulate -ENOSPC and
1030          * -EDQUOT failure of underlying PCC backend fs.
1031          */
1032         if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1033                 pcc_io_fini(inode);
1034                 pcc_ioctl_detach(inode);
1035                 up_read(&mm->mmap_sem);
1036                 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1037         }
1038
1039         vma->vm_file = pcc_file;
1040 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1041         rc = pcc_vm_ops->page_mkwrite(vmf);
1042 #else
1043         rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1044 #endif
1045         vma->vm_file = file;
1046
1047         pcc_io_fini(inode);
1048         RETURN(rc);
1049 }
1050
1051 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1052               bool *cached)
1053 {
1054         struct file *file = vma->vm_file;
1055         struct inode *inode = file_inode(file);
1056         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1057         struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1058         struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1059         int rc;
1060
1061         ENTRY;
1062
1063         if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1064                 *cached = false;
1065                 RETURN(0);
1066         }
1067
1068         pcc_io_init(inode, cached);
1069         if (!*cached)
1070                 RETURN(0);
1071
1072         vma->vm_file = pcc_file;
1073 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1074         rc = pcc_vm_ops->fault(vmf);
1075 #else
1076         rc = pcc_vm_ops->fault(vma, vmf);
1077 #endif
1078         vma->vm_file = file;
1079
1080         pcc_io_fini(inode);
1081         RETURN(rc);
1082 }
1083
1084 static void pcc_layout_wait(struct pcc_inode *pcci)
1085 {
1086         struct l_wait_info lwi = { 0 };
1087
1088         while (atomic_read(&pcci->pcci_active_ios) > 0) {
1089                 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1090                        atomic_read(&pcci->pcci_active_ios));
1091                 l_wait_event(pcci->pcci_waitq,
1092                              atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1093         }
1094 }
1095
1096 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1097 {
1098         pcci->pcci_type = LU_PCC_NONE;
1099         pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1100         pcc_layout_wait(pcci);
1101 }
1102
1103 void pcc_layout_invalidate(struct inode *inode)
1104 {
1105         struct pcc_inode *pcci;
1106
1107         pcc_inode_lock(inode);
1108         pcci = ll_i2pcci(inode);
1109         if (pcci && pcc_inode_has_layout(pcci)) {
1110                 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1111                 __pcc_layout_invalidate(pcci);
1112
1113                 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1114                        PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1115
1116                 pcc_inode_put(pcci);
1117         }
1118         pcc_inode_unlock(inode);
1119 }
1120
1121 static int pcc_inode_remove(struct pcc_inode *pcci)
1122 {
1123         struct dentry *dentry;
1124         int rc;
1125
1126         dentry = pcci->pcci_path.dentry;
1127         rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1128         if (rc)
1129                 CWARN("failed to unlink cached file, rc = %d\n", rc);
1130
1131         return rc;
1132 }
1133
1134 /* Create directory under base if directory does not exist */
1135 static struct dentry *
1136 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1137 {
1138         int rc;
1139         struct dentry *dentry;
1140         struct inode *dir = base->d_inode;
1141
1142         inode_lock(dir);
1143         dentry = lookup_one_len(name, base, strlen(name));
1144         if (IS_ERR(dentry))
1145                 goto out;
1146
1147         if (d_is_positive(dentry))
1148                 goto out;
1149
1150         rc = vfs_mkdir(dir, dentry, mode);
1151         if (rc) {
1152                 dput(dentry);
1153                 dentry = ERR_PTR(rc);
1154                 goto out;
1155         }
1156 out:
1157         inode_unlock(dir);
1158         return dentry;
1159 }
1160
1161 static struct dentry *
1162 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1163 {
1164         char *ptr, *entry_name;
1165         struct dentry *parent;
1166         struct dentry *child = ERR_PTR(-EINVAL);
1167
1168         ptr = path;
1169         while (*ptr == '/')
1170                 ptr++;
1171
1172         entry_name = ptr;
1173         parent = dget(root);
1174         while ((ptr = strchr(ptr, '/')) != NULL) {
1175                 *ptr = '\0';
1176                 child = pcc_mkdir(parent, entry_name, mode);
1177                 *ptr = '/';
1178                 dput(parent);
1179                 if (IS_ERR(child))
1180                         break;
1181
1182                 parent = child;
1183                 ptr++;
1184                 entry_name = ptr;
1185         }
1186
1187         return child;
1188 }
1189
1190 /* Create file under base. If file already exist, return failure */
1191 static struct dentry *
1192 pcc_create(struct dentry *base, const char *name, umode_t mode)
1193 {
1194         int rc;
1195         struct dentry *dentry;
1196         struct inode *dir = base->d_inode;
1197
1198         inode_lock(dir);
1199         dentry = lookup_one_len(name, base, strlen(name));
1200         if (IS_ERR(dentry))
1201                 goto out;
1202
1203         if (d_is_positive(dentry))
1204                 goto out;
1205
1206         rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
1207         if (rc) {
1208                 dput(dentry);
1209                 dentry = ERR_PTR(rc);
1210                 goto out;
1211         }
1212 out:
1213         inode_unlock(dir);
1214         return dentry;
1215 }
1216
1217 /* Must be called with pcci->pcci_lock held */
1218 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1219                                   struct pcc_inode *pcci,
1220                                   struct dentry *dentry,
1221                                   enum lu_pcc_type type)
1222 {
1223         pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1224         pcci->pcci_path.dentry = dentry;
1225         LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1226         atomic_set(&pcci->pcci_refcount, 1);
1227         pcci->pcci_type = type;
1228         pcci->pcci_attr_valid = false;
1229 }
1230
1231 static int __pcc_inode_create(struct pcc_dataset *dataset,
1232                               struct lu_fid *fid,
1233                               struct dentry **dentry)
1234 {
1235         char *path;
1236         struct dentry *base;
1237         struct dentry *child;
1238         int rc = 0;
1239
1240         OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
1241         if (path == NULL)
1242                 return -ENOMEM;
1243
1244         pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
1245
1246         base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
1247         if (IS_ERR(base)) {
1248                 rc = PTR_ERR(base);
1249                 GOTO(out, rc);
1250         }
1251
1252         snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
1253         child = pcc_create(base, path, 0);
1254         if (IS_ERR(child)) {
1255                 rc = PTR_ERR(child);
1256                 GOTO(out_base, rc);
1257         }
1258         *dentry = child;
1259
1260 out_base:
1261         dput(base);
1262 out:
1263         OBD_FREE(path, MAX_PCC_DATABASE_PATH);
1264         return rc;
1265 }
1266
1267 /* TODO: Set the project ID for PCC copy */
1268 int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
1269 {
1270         struct inode *inode = dentry->d_inode;
1271         struct iattr attr;
1272         int rc;
1273
1274         ENTRY;
1275
1276         attr.ia_valid = ATTR_UID | ATTR_GID;
1277         attr.ia_uid = uid;
1278         attr.ia_gid = gid;
1279
1280         inode_lock(inode);
1281         rc = notify_change(dentry, &attr, NULL);
1282         inode_unlock(inode);
1283
1284         RETURN(rc);
1285 }
1286
1287 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
1288                      struct lu_fid *fid, struct dentry **pcc_dentry)
1289 {
1290         const struct cred *old_cred;
1291         int rc;
1292
1293         old_cred = override_creds(pcc_super_cred(sb));
1294         rc = __pcc_inode_create(dataset, fid, pcc_dentry);
1295         revert_creds(old_cred);
1296         return rc;
1297 }
1298
1299 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
1300                           struct dentry *pcc_dentry)
1301 {
1302         const struct cred *old_cred;
1303         struct pcc_inode *pcci;
1304         int rc = 0;
1305
1306         ENTRY;
1307
1308         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1309         pcc_inode_lock(inode);
1310         LASSERT(ll_i2pcci(inode) == NULL);
1311         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1312         if (pcci == NULL)
1313                 GOTO(out_unlock, rc = -ENOMEM);
1314
1315         rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
1316                                    old_cred->sgid);
1317         if (rc)
1318                 GOTO(out_unlock, rc);
1319
1320         pcc_inode_init(pcci, ll_i2info(inode));
1321         pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
1322         /* Set the layout generation of newly created file with 0 */
1323         pcc_layout_gen_set(pcci, 0);
1324
1325 out_unlock:
1326         if (rc) {
1327                 int rc2;
1328
1329                 rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1330                 if (rc2)
1331                         CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1332
1333                 dput(pcc_dentry);
1334         }
1335
1336         pcc_inode_unlock(inode);
1337         revert_creds(old_cred);
1338         if (rc && pcci)
1339                 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1340
1341         RETURN(rc);
1342 }
1343
1344 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
1345                           loff_t *offset)
1346 {
1347         while (count > 0) {
1348                 ssize_t size;
1349
1350                 size = vfs_write(filp, (const void __user *)buf, count, offset);
1351                 if (size < 0)
1352                         return size;
1353                 count -= size;
1354                 buf += size;
1355         }
1356         return 0;
1357 }
1358
1359 static int pcc_copy_data(struct file *src, struct file *dst)
1360 {
1361         int rc = 0;
1362         ssize_t rc2;
1363         mm_segment_t oldfs;
1364         loff_t pos, offset = 0;
1365         size_t buf_len = 1048576;
1366         void *buf;
1367
1368         ENTRY;
1369
1370         OBD_ALLOC_LARGE(buf, buf_len);
1371         if (buf == NULL)
1372                 RETURN(-ENOMEM);
1373
1374         oldfs = get_fs();
1375         set_fs(KERNEL_DS);
1376         while (1) {
1377                 pos = offset;
1378                 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
1379                 if (rc2 < 0)
1380                         GOTO(out_fs, rc = rc2);
1381                 else if (rc2 == 0)
1382                         break;
1383
1384                 pos = offset;
1385                 rc = pcc_filp_write(dst, buf, rc2, &pos);
1386                 if (rc < 0)
1387                         GOTO(out_fs, rc);
1388                 offset += rc2;
1389         }
1390
1391 out_fs:
1392         set_fs(oldfs);
1393         OBD_FREE_LARGE(buf, buf_len);
1394         RETURN(rc);
1395 }
1396
1397 static int pcc_attach_allowed_check(struct inode *inode)
1398 {
1399         struct ll_inode_info *lli = ll_i2info(inode);
1400         struct pcc_inode *pcci;
1401         int rc = 0;
1402
1403         ENTRY;
1404
1405         pcc_inode_lock(inode);
1406         if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1407                 GOTO(out_unlock, rc = -EBUSY);
1408
1409         pcci = ll_i2pcci(inode);
1410         if (pcci && pcc_inode_has_layout(pcci))
1411                 GOTO(out_unlock, rc = -EEXIST);
1412
1413         lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
1414 out_unlock:
1415         pcc_inode_unlock(inode);
1416         RETURN(rc);
1417 }
1418
1419 int pcc_readwrite_attach(struct file *file, struct inode *inode,
1420                          __u32 archive_id)
1421 {
1422         struct pcc_dataset *dataset;
1423         struct ll_inode_info *lli = ll_i2info(inode);
1424         struct pcc_inode *pcci;
1425         const struct cred *old_cred;
1426         struct dentry *dentry;
1427         struct file *pcc_filp;
1428         struct path path;
1429         int rc;
1430
1431         ENTRY;
1432
1433         rc = pcc_attach_allowed_check(inode);
1434         if (rc)
1435                 RETURN(rc);
1436
1437         dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
1438                                   archive_id);
1439         if (dataset == NULL)
1440                 RETURN(-ENOENT);
1441
1442         old_cred = override_creds(pcc_super_cred(inode->i_sb));
1443         rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
1444         if (rc) {
1445                 revert_creds(old_cred);
1446                 GOTO(out_dataset_put, rc);
1447         }
1448
1449         path.mnt = dataset->pccd_path.mnt;
1450         path.dentry = dentry;
1451 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1452         pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
1453                                current_cred());
1454 #else
1455         pcc_filp = dentry_open(path.dentry, path.mnt,
1456                                O_TRUNC | O_WRONLY | O_LARGEFILE,
1457                                current_cred());
1458 #endif
1459         if (IS_ERR_OR_NULL(pcc_filp)) {
1460                 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
1461                 revert_creds(old_cred);
1462                 GOTO(out_dentry, rc);
1463         }
1464
1465         rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
1466         revert_creds(old_cred);
1467         if (rc)
1468                 GOTO(out_fput, rc);
1469
1470         rc = pcc_copy_data(file, pcc_filp);
1471         if (rc)
1472                 GOTO(out_fput, rc);
1473
1474         /* Pause to allow for a race with concurrent HSM remove */
1475         OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
1476
1477         pcc_inode_lock(inode);
1478         pcci = ll_i2pcci(inode);
1479         LASSERT(!pcci);
1480         OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1481         if (pcci == NULL)
1482                 GOTO(out_unlock, rc = -ENOMEM);
1483
1484         pcc_inode_init(pcci, lli);
1485         pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
1486 out_unlock:
1487         pcc_inode_unlock(inode);
1488 out_fput:
1489         fput(pcc_filp);
1490 out_dentry:
1491         if (rc) {
1492                 int rc2;
1493
1494                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1495                 rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1496                 revert_creds(old_cred);
1497                 if (rc2)
1498                         CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1499
1500                 dput(dentry);
1501         }
1502 out_dataset_put:
1503         pcc_dataset_put(dataset);
1504         RETURN(rc);
1505 }
1506
1507 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1508                               __u32 gen, bool lease_broken, int rc,
1509                               bool attached)
1510 {
1511         struct ll_inode_info *lli = ll_i2info(inode);
1512         const struct cred *old_cred;
1513         struct pcc_inode *pcci;
1514         __u32 gen2;
1515
1516         ENTRY;
1517
1518         pcc_inode_lock(inode);
1519         pcci = ll_i2pcci(inode);
1520         lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
1521         if (rc || lease_broken) {
1522                 if (attached && pcci)
1523                         pcc_inode_put(pcci);
1524
1525                 GOTO(out_unlock, rc);
1526         }
1527
1528         /* PCC inode may be released due to layout lock revocatioin */
1529         if (!pcci)
1530                 GOTO(out_unlock, rc = -ESTALE);
1531
1532         LASSERT(attached);
1533         rc = ll_layout_refresh(inode, &gen2);
1534         if (!rc) {
1535                 if (gen2 == gen) {
1536                         pcc_layout_gen_set(pcci, gen);
1537                 } else {
1538                         CDEBUG(D_CACHE,
1539                                DFID" layout changed from %d to %d.\n",
1540                                PFID(ll_inode2fid(inode)), gen, gen2);
1541                         GOTO(out_put, rc = -ESTALE);
1542                 }
1543         }
1544
1545 out_put:
1546         if (rc) {
1547                 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1548                 pcc_inode_remove(pcci);
1549                 revert_creds(old_cred);
1550                 pcc_inode_put(pcci);
1551         }
1552 out_unlock:
1553         pcc_inode_unlock(inode);
1554         RETURN(rc);
1555 }
1556
1557 int pcc_ioctl_detach(struct inode *inode)
1558 {
1559         struct ll_inode_info *lli = ll_i2info(inode);
1560         struct pcc_inode *pcci;
1561         int rc = 0;
1562
1563         ENTRY;
1564
1565         pcc_inode_lock(inode);
1566         pcci = lli->lli_pcc_inode;
1567         if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
1568             !pcc_inode_has_layout(pcci))
1569                 GOTO(out_unlock, rc = 0);
1570
1571         __pcc_layout_invalidate(pcci);
1572         pcc_inode_put(pcci);
1573
1574 out_unlock:
1575         pcc_inode_unlock(inode);
1576         RETURN(rc);
1577 }
1578
1579 int pcc_ioctl_state(struct file *file, struct inode *inode,
1580                     struct lu_pcc_state *state)
1581 {
1582         int rc = 0;
1583         int count;
1584         char *buf;
1585         char *path;
1586         int buf_len = sizeof(state->pccs_path);
1587         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1588         struct pcc_file *pccf = &fd->fd_pcc_file;
1589         struct pcc_inode *pcci;
1590
1591         ENTRY;
1592
1593         if (buf_len <= 0)
1594                 RETURN(-EINVAL);
1595
1596         OBD_ALLOC(buf, buf_len);
1597         if (buf == NULL)
1598                 RETURN(-ENOMEM);
1599
1600         pcc_inode_lock(inode);
1601         pcci = ll_i2pcci(inode);
1602         if (pcci == NULL) {
1603                 state->pccs_type = LU_PCC_NONE;
1604                 GOTO(out_unlock, rc = 0);
1605         }
1606
1607         count = atomic_read(&pcci->pcci_refcount);
1608         if (count == 0) {
1609                 state->pccs_type = LU_PCC_NONE;
1610                 state->pccs_open_count = 0;
1611                 GOTO(out_unlock, rc = 0);
1612         }
1613
1614         if (pcc_inode_has_layout(pcci))
1615                 count--;
1616         if (pccf->pccf_file != NULL)
1617                 count--;
1618         state->pccs_type = pcci->pcci_type;
1619         state->pccs_open_count = count;
1620         state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
1621 #ifdef HAVE_DENTRY_PATH_RAW
1622         path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1623         if (IS_ERR(path))
1624                 GOTO(out_unlock, rc = PTR_ERR(path));
1625 #else
1626         path = "UNKNOWN";
1627 #endif
1628
1629         if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1630                 GOTO(out_unlock, rc = -ENAMETOOLONG);
1631
1632 out_unlock:
1633         pcc_inode_unlock(inode);
1634         OBD_FREE(buf, buf_len);
1635         RETURN(rc);
1636 }