4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 void pcc_super_init(struct pcc_super *super)
117 spin_lock_init(&super->pccs_lock);
118 INIT_LIST_HEAD(&super->pccs_datasets);
122 * pcc_dataset_add - Add a Cache policy to control which files need be
123 * cached and where it will be cached.
125 * @super: superblock of pcc
126 * @pathname: root path of pcc
127 * @id: HSM archive ID
128 * @projid: files with specified project ID will be cached.
131 pcc_dataset_add(struct pcc_super *super, const char *pathname,
132 __u32 archive_id, __u32 projid)
135 struct pcc_dataset *dataset;
136 struct pcc_dataset *tmp;
139 OBD_ALLOC_PTR(dataset);
143 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
145 OBD_FREE_PTR(dataset);
148 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
149 dataset->pccd_id = archive_id;
150 dataset->pccd_projid = projid;
151 atomic_set(&dataset->pccd_refcount, 1);
153 spin_lock(&super->pccs_lock);
154 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
155 if (tmp->pccd_id == archive_id) {
161 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
162 spin_unlock(&super->pccs_lock);
165 pcc_dataset_put(dataset);
173 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
175 struct pcc_dataset *dataset;
176 struct pcc_dataset *selected = NULL;
178 if (projid == 0 && archive_id == 0)
182 * archive ID is unique in the list, projid might be duplicate,
183 * we just return last added one as first priority.
185 spin_lock(&super->pccs_lock);
186 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
187 if (projid && dataset->pccd_projid != projid)
189 if (archive_id && dataset->pccd_id != archive_id)
191 atomic_inc(&dataset->pccd_refcount);
195 spin_unlock(&super->pccs_lock);
197 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
198 selected->pccd_projid);
203 pcc_dataset_put(struct pcc_dataset *dataset)
205 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
206 path_put(&dataset->pccd_path);
207 OBD_FREE_PTR(dataset);
212 pcc_dataset_del(struct pcc_super *super, char *pathname)
214 struct list_head *l, *tmp;
215 struct pcc_dataset *dataset;
218 spin_lock(&super->pccs_lock);
219 list_for_each_safe(l, tmp, &super->pccs_datasets) {
220 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
221 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
222 list_del(&dataset->pccd_linkage);
223 pcc_dataset_put(dataset);
228 spin_unlock(&super->pccs_lock);
233 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
235 seq_printf(m, "%s:\n", dataset->pccd_pathname);
236 seq_printf(m, " rwid: %u\n", dataset->pccd_id);
237 seq_printf(m, " autocache: projid=%u\n", dataset->pccd_projid);
241 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
243 struct pcc_dataset *dataset;
245 spin_lock(&super->pccs_lock);
246 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
247 pcc_dataset_dump(dataset, m);
249 spin_unlock(&super->pccs_lock);
253 void pcc_super_fini(struct pcc_super *super)
255 struct pcc_dataset *dataset, *tmp;
257 list_for_each_entry_safe(dataset, tmp,
258 &super->pccs_datasets, pccd_linkage) {
259 list_del(&dataset->pccd_linkage);
260 pcc_dataset_put(dataset);
265 static bool pathname_is_valid(const char *pathname)
267 /* Needs to be absolute path */
268 if (pathname == NULL || strlen(pathname) == 0 ||
269 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
274 static struct pcc_cmd *
275 pcc_cmd_parse(char *buffer, unsigned long count)
277 static struct pcc_cmd *cmd;
285 GOTO(out, rc = -ENOMEM);
287 /* clear all setting */
288 if (strncmp(buffer, "clear", 5) == 0) {
289 cmd->pccc_cmd = PCC_CLEAR_ALL;
294 token = strsep(&val, " ");
295 if (val == NULL || strlen(val) == 0)
296 GOTO(out_free_cmd, rc = -EINVAL);
298 /* Type of the command */
299 if (strcmp(token, "add") == 0)
300 cmd->pccc_cmd = PCC_ADD_DATASET;
301 else if (strcmp(token, "del") == 0)
302 cmd->pccc_cmd = PCC_DEL_DATASET;
304 GOTO(out_free_cmd, rc = -EINVAL);
306 /* Pathname of the dataset */
307 token = strsep(&val, " ");
308 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
309 !pathname_is_valid(token))
310 GOTO(out_free_cmd, rc = -EINVAL);
311 cmd->pccc_pathname = token;
313 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
315 token = strsep(&val, " ");
317 GOTO(out_free_cmd, rc = -EINVAL);
319 rc = kstrtoul(token, 10, &tmp);
321 GOTO(out_free_cmd, rc = -EINVAL);
323 GOTO(out_free_cmd, rc = -EINVAL);
324 cmd->u.pccc_add.pccc_id = tmp;
327 rc = kstrtoul(token, 10, &tmp);
329 GOTO(out_free_cmd, rc = -EINVAL);
331 GOTO(out_free_cmd, rc = -EINVAL);
332 cmd->u.pccc_add.pccc_projid = tmp;
344 int pcc_cmd_handle(char *buffer, unsigned long count,
345 struct pcc_super *super)
350 cmd = pcc_cmd_parse(buffer, count);
354 switch (cmd->pccc_cmd) {
355 case PCC_ADD_DATASET:
356 rc = pcc_dataset_add(super, cmd->pccc_pathname,
357 cmd->u.pccc_add.pccc_id,
358 cmd->u.pccc_add.pccc_projid);
360 case PCC_DEL_DATASET:
361 rc = pcc_dataset_del(super, cmd->pccc_pathname);
364 pcc_super_fini(super);
375 static inline void pcc_inode_lock(struct inode *inode)
377 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
380 static inline void pcc_inode_unlock(struct inode *inode)
382 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
385 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
387 pcci->pcci_lli = lli;
388 lli->lli_pcc_inode = pcci;
389 atomic_set(&pcci->pcci_refcount, 0);
390 pcci->pcci_type = LU_PCC_NONE;
391 pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
392 atomic_set(&pcci->pcci_active_ios, 0);
393 init_waitqueue_head(&pcci->pcci_waitq);
396 static void pcc_inode_fini(struct pcc_inode *pcci)
398 struct ll_inode_info *lli = pcci->pcci_lli;
400 path_put(&pcci->pcci_path);
401 pcci->pcci_type = LU_PCC_NONE;
402 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
403 lli->lli_pcc_inode = NULL;
406 static void pcc_inode_get(struct pcc_inode *pcci)
408 atomic_inc(&pcci->pcci_refcount);
411 static void pcc_inode_put(struct pcc_inode *pcci)
413 if (atomic_dec_and_test(&pcci->pcci_refcount))
414 pcc_inode_fini(pcci);
417 void pcc_inode_free(struct inode *inode)
419 struct pcc_inode *pcci = ll_i2pcci(inode);
422 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
429 * As Andreas suggested, we'd better use new layout to
431 * (fid->f_oid >> 16 & oxFFFF)/FID
433 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
434 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
436 return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
438 (fid)->f_oid & 0xFFFF,
439 (fid)->f_oid >> 16 & 0xFFFF,
440 (unsigned int)((fid)->f_seq & 0xFFFF),
441 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
442 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
443 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
447 void pcc_file_init(struct pcc_file *pccf)
449 pccf->pccf_file = NULL;
450 pccf->pccf_type = LU_PCC_NONE;
453 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
455 return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
458 int pcc_file_open(struct inode *inode, struct file *file)
460 struct pcc_inode *pcci;
461 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
462 struct pcc_file *pccf = &fd->fd_pcc_file;
463 struct file *pcc_file;
470 if (!S_ISREG(inode->i_mode))
473 pcc_inode_lock(inode);
474 pcci = ll_i2pcci(inode);
476 GOTO(out_unlock, rc = 0);
478 if (atomic_read(&pcci->pcci_refcount) == 0 ||
479 !pcc_inode_has_layout(pcci))
480 GOTO(out_unlock, rc = 0);
483 WARN_ON(pccf->pccf_file);
485 path = &pcci->pcci_path;
486 dname = &path->dentry->d_name;
487 CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
489 #ifdef HAVE_DENTRY_OPEN_USE_PATH
490 pcc_file = dentry_open(path, file->f_flags, current_cred());
492 pcc_file = dentry_open(path->dentry, path->mnt,
493 file->f_flags, current_cred());
495 if (IS_ERR_OR_NULL(pcc_file)) {
496 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
499 pccf->pccf_file = pcc_file;
500 pccf->pccf_type = pcci->pcci_type;
504 pcc_inode_unlock(inode);
508 void pcc_file_release(struct inode *inode, struct file *file)
510 struct pcc_inode *pcci;
511 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
512 struct pcc_file *pccf;
518 if (!S_ISREG(inode->i_mode) || fd == NULL)
521 pccf = &fd->fd_pcc_file;
522 pcc_inode_lock(inode);
523 if (pccf->pccf_file == NULL)
526 pcci = ll_i2pcci(inode);
528 path = &pcci->pcci_path;
529 dname = &path->dentry->d_name;
530 CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
533 fput(pccf->pccf_file);
534 pccf->pccf_file = NULL;
536 pcc_inode_unlock(inode);
540 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
543 pcci->pcci_layout_gen = gen;
546 static void pcc_io_init(struct inode *inode, bool *cached)
548 struct pcc_inode *pcci;
550 pcc_inode_lock(inode);
551 pcci = ll_i2pcci(inode);
552 if (pcci && pcc_inode_has_layout(pcci)) {
553 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
554 atomic_inc(&pcci->pcci_active_ios);
559 pcc_inode_unlock(inode);
562 static void pcc_io_fini(struct inode *inode)
564 struct pcc_inode *pcci = ll_i2pcci(inode);
566 LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
567 if (atomic_dec_and_test(&pcci->pcci_active_ios))
568 wake_up_all(&pcci->pcci_waitq);
573 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
575 struct file *file = iocb->ki_filp;
577 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
578 return file->f_op->read_iter(iocb, iter);
584 iov_for_each(iov, i, *iter) {
587 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
588 if (-EIOCBQUEUED == res)
589 res = wait_on_sync_kiocb(iocb);
597 if (res < iov.iov_len)
602 iov_iter_advance(iter, bytes);
607 ssize_t pcc_file_read_iter(struct kiocb *iocb,
608 struct iov_iter *iter, bool *cached)
610 struct file *file = iocb->ki_filp;
611 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
612 struct pcc_file *pccf = &fd->fd_pcc_file;
613 struct inode *inode = file_inode(file);
618 if (pccf->pccf_file == NULL) {
623 pcc_io_init(inode, cached);
627 iocb->ki_filp = pccf->pccf_file;
628 /* generic_file_aio_read does not support ext4-dax,
629 * __pcc_file_read_iter uses ->aio_read hook directly
630 * to add support for ext4-dax.
632 result = __pcc_file_read_iter(iocb, iter);
633 iocb->ki_filp = file;
640 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
642 struct file *file = iocb->ki_filp;
644 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
645 return file->f_op->write_iter(iocb, iter);
651 iov_for_each(iov, i, *iter) {
654 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
655 if (-EIOCBQUEUED == res)
656 res = wait_on_sync_kiocb(iocb);
664 if (res < iov.iov_len)
669 iov_iter_advance(iter, bytes);
674 ssize_t pcc_file_write_iter(struct kiocb *iocb,
675 struct iov_iter *iter, bool *cached)
677 struct file *file = iocb->ki_filp;
678 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
679 struct pcc_file *pccf = &fd->fd_pcc_file;
680 struct inode *inode = file_inode(file);
685 if (pccf->pccf_file == NULL) {
690 if (pccf->pccf_type != LU_PCC_READWRITE) {
695 pcc_io_init(inode, cached);
699 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
700 GOTO(out, result = -ENOSPC);
702 iocb->ki_filp = pccf->pccf_file;
704 /* Since __pcc_file_write_iter makes write calls via
705 * the normal vfs interface to the local PCC file system,
706 * the inode lock is not needed.
708 result = __pcc_file_write_iter(iocb, iter);
709 iocb->ki_filp = file;
715 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
719 struct iattr attr2 = *attr;
720 struct dentry *pcc_dentry;
721 struct pcc_inode *pcci;
725 if (!S_ISREG(inode->i_mode)) {
730 pcc_io_init(inode, cached);
734 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
735 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
737 pcci = ll_i2pcci(inode);
738 pcc_dentry = pcci->pcci_path.dentry;
739 inode_lock(pcc_dentry->d_inode);
740 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
741 inode_unlock(pcc_dentry->d_inode);
747 int pcc_inode_getattr(struct inode *inode, bool *cached)
749 struct ll_inode_info *lli = ll_i2info(inode);
758 if (!S_ISREG(inode->i_mode)) {
763 pcc_io_init(inode, cached);
767 rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
771 ll_inode_size_lock(inode);
772 if (inode->i_atime.tv_sec < lli->lli_atime ||
773 lli->lli_update_atime) {
774 inode->i_atime.tv_sec = lli->lli_atime;
775 lli->lli_update_atime = 0;
777 inode->i_mtime.tv_sec = lli->lli_mtime;
778 inode->i_ctime.tv_sec = lli->lli_ctime;
780 atime = inode->i_atime.tv_sec;
781 mtime = inode->i_mtime.tv_sec;
782 ctime = inode->i_ctime.tv_sec;
784 if (atime < stat.atime.tv_sec)
785 atime = stat.atime.tv_sec;
787 if (ctime < stat.ctime.tv_sec)
788 ctime = stat.ctime.tv_sec;
790 if (mtime < stat.mtime.tv_sec)
791 mtime = stat.mtime.tv_sec;
793 i_size_write(inode, stat.size);
794 inode->i_blocks = stat.blocks;
796 inode->i_atime.tv_sec = atime;
797 inode->i_mtime.tv_sec = mtime;
798 inode->i_ctime.tv_sec = ctime;
800 ll_inode_size_unlock(inode);
806 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
807 struct pipe_inode_info *pipe,
808 size_t count, unsigned int flags,
811 struct inode *inode = file_inode(in_file);
812 struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
813 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
822 if (!file_inode(pcc_file)->i_fop->splice_read)
825 pcc_io_init(inode, cached);
829 result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
837 int pcc_fsync(struct file *file, loff_t start, loff_t end,
838 int datasync, bool *cached)
840 struct inode *inode = file_inode(file);
841 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
842 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
852 pcc_io_init(inode, cached);
856 #ifdef HAVE_FILE_FSYNC_4ARGS
857 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
858 start, end, datasync);
859 #elif defined(HAVE_FILE_FSYNC_2ARGS)
860 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
862 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
863 file_dentry(dentry), datasync);
870 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
873 struct inode *inode = file_inode(file);
874 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
875 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
876 struct pcc_inode *pcci;
881 if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
886 pcc_inode_lock(inode);
887 pcci = ll_i2pcci(inode);
888 if (pcci && pcc_inode_has_layout(pcci)) {
889 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
891 vma->vm_file = pcc_file;
892 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
894 /* Save the vm ops of backend PCC */
895 vma->vm_private_data = (void *)vma->vm_ops;
899 pcc_inode_unlock(inode);
904 void pcc_vm_open(struct vm_area_struct *vma)
906 struct pcc_inode *pcci;
907 struct file *file = vma->vm_file;
908 struct inode *inode = file_inode(file);
909 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
910 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
911 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
915 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
918 pcc_inode_lock(inode);
919 pcci = ll_i2pcci(inode);
920 if (pcci && pcc_inode_has_layout(pcci)) {
921 vma->vm_file = pcc_file;
922 pcc_vm_ops->open(vma);
925 pcc_inode_unlock(inode);
929 void pcc_vm_close(struct vm_area_struct *vma)
931 struct file *file = vma->vm_file;
932 struct inode *inode = file_inode(file);
933 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
934 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
935 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
939 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
942 pcc_inode_lock(inode);
943 /* Layout lock maybe revoked here */
944 vma->vm_file = pcc_file;
945 pcc_vm_ops->close(vma);
947 pcc_inode_unlock(inode);
951 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
954 struct page *page = vmf->page;
955 struct mm_struct *mm = vma->vm_mm;
956 struct file *file = vma->vm_file;
957 struct inode *inode = file_inode(file);
958 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
959 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
960 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
965 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
970 /* Pause to allow for a race with concurrent detach */
971 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
973 pcc_io_init(inode, cached);
975 /* This happens when the file is detached from PCC after got
976 * the fault page via ->fault() on the inode of the PCC copy.
977 * Here it can not simply fall back to normal Lustre I/O path.
978 * The reason is that the address space of fault page used by
979 * ->page_mkwrite() is still the one of PCC inode. In the
980 * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
981 * handled as the address space of the fault page is not
982 * consistent with the one of the Lustre inode (though the
983 * fault page was truncated).
984 * As the file is detached from PCC, the fault page must
985 * be released frist, and retry the mmap write (->fault() and
987 * We use an ugly and tricky method by returning
988 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
989 * __do_page_fault and retry the memory fault handling.
991 if (page->mapping == file_inode(pcc_file)->i_mapping) {
993 up_read(&mm->mmap_sem);
994 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1001 * This fault injection can also be used to simulate -ENOSPC and
1002 * -EDQUOT failure of underlying PCC backend fs.
1004 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1006 pcc_ioctl_detach(inode);
1007 up_read(&mm->mmap_sem);
1008 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1011 vma->vm_file = pcc_file;
1012 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1013 rc = pcc_vm_ops->page_mkwrite(vmf);
1015 rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1017 vma->vm_file = file;
1023 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1026 struct file *file = vma->vm_file;
1027 struct inode *inode = file_inode(file);
1028 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1029 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1030 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1035 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1040 pcc_io_init(inode, cached);
1044 vma->vm_file = pcc_file;
1045 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1046 rc = pcc_vm_ops->fault(vmf);
1048 rc = pcc_vm_ops->fault(vma, vmf);
1050 vma->vm_file = file;
1056 static void pcc_layout_wait(struct pcc_inode *pcci)
1058 struct l_wait_info lwi = { 0 };
1060 while (atomic_read(&pcci->pcci_active_ios) > 0) {
1061 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1062 atomic_read(&pcci->pcci_active_ios));
1063 l_wait_event(pcci->pcci_waitq,
1064 atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1068 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1070 pcci->pcci_type = LU_PCC_NONE;
1071 pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1072 pcc_layout_wait(pcci);
1075 void pcc_layout_invalidate(struct inode *inode)
1077 struct pcc_inode *pcci;
1079 pcc_inode_lock(inode);
1080 pcci = ll_i2pcci(inode);
1081 if (pcci && pcc_inode_has_layout(pcci)) {
1082 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1083 __pcc_layout_invalidate(pcci);
1085 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1086 PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1088 pcc_inode_put(pcci);
1090 pcc_inode_unlock(inode);
1093 static int pcc_inode_remove(struct pcc_inode *pcci)
1095 struct dentry *dentry;
1098 dentry = pcci->pcci_path.dentry;
1099 rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1101 CWARN("failed to unlink cached file, rc = %d\n", rc);
1106 /* Create directory under base if directory does not exist */
1107 static struct dentry *
1108 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1111 struct dentry *dentry;
1112 struct inode *dir = base->d_inode;
1115 dentry = lookup_one_len(name, base, strlen(name));
1119 if (d_is_positive(dentry))
1122 rc = vfs_mkdir(dir, dentry, mode);
1125 dentry = ERR_PTR(rc);
1133 static struct dentry *
1134 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1136 char *ptr, *entry_name;
1137 struct dentry *parent;
1138 struct dentry *child = ERR_PTR(-EINVAL);
1145 parent = dget(root);
1146 while ((ptr = strchr(ptr, '/')) != NULL) {
1148 child = pcc_mkdir(parent, entry_name, mode);
1162 /* Create file under base. If file already exist, return failure */
1163 static struct dentry *
1164 pcc_create(struct dentry *base, const char *name, umode_t mode)
1167 struct dentry *dentry;
1168 struct inode *dir = base->d_inode;
1171 dentry = lookup_one_len(name, base, strlen(name));
1175 if (d_is_positive(dentry))
1178 rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
1181 dentry = ERR_PTR(rc);
1189 /* Must be called with pcci->pcci_lock held */
1190 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1191 struct pcc_inode *pcci,
1192 struct dentry *dentry,
1193 enum lu_pcc_type type)
1195 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1196 pcci->pcci_path.dentry = dentry;
1197 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1198 atomic_set(&pcci->pcci_refcount, 1);
1199 pcci->pcci_type = type;
1200 pcci->pcci_attr_valid = false;
1203 static int __pcc_inode_create(struct pcc_dataset *dataset,
1205 struct dentry **dentry)
1208 struct dentry *base;
1209 struct dentry *child;
1212 OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
1216 pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
1218 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
1224 snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
1225 child = pcc_create(base, path, 0600);
1226 if (IS_ERR(child)) {
1227 rc = PTR_ERR(child);
1235 OBD_FREE(path, MAX_PCC_DATABASE_PATH);
1239 int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
1240 struct dentry **pcc_dentry)
1242 return __pcc_inode_create(dataset, fid, pcc_dentry);
1245 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
1246 struct dentry *pcc_dentry)
1248 struct pcc_inode *pcci;
1253 pcc_inode_lock(inode);
1254 LASSERT(ll_i2pcci(inode) == NULL);
1255 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1257 GOTO(out_unlock, rc = -ENOMEM);
1259 pcc_inode_init(pcci, ll_i2info(inode));
1260 pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
1261 /* Set the layout generation of newly created file with 0 */
1262 pcc_layout_gen_set(pcci, 0);
1268 rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1270 CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1275 pcc_inode_unlock(inode);
1279 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
1285 size = vfs_write(filp, (const void __user *)buf, count, offset);
1294 static int pcc_copy_data(struct file *src, struct file *dst)
1299 loff_t pos, offset = 0;
1300 size_t buf_len = 1048576;
1305 OBD_ALLOC_LARGE(buf, buf_len);
1313 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
1315 GOTO(out_fs, rc = rc2);
1320 rc = pcc_filp_write(dst, buf, rc2, &pos);
1328 OBD_FREE_LARGE(buf, buf_len);
1332 static int pcc_attach_allowed_check(struct inode *inode)
1334 struct ll_inode_info *lli = ll_i2info(inode);
1335 struct pcc_inode *pcci;
1340 pcc_inode_lock(inode);
1341 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1342 GOTO(out_unlock, rc = -EBUSY);
1344 pcci = ll_i2pcci(inode);
1345 if (pcci && pcc_inode_has_layout(pcci))
1346 GOTO(out_unlock, rc = -EEXIST);
1348 lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
1350 pcc_inode_unlock(inode);
1354 int pcc_readwrite_attach(struct file *file, struct inode *inode,
1357 struct pcc_dataset *dataset;
1358 struct ll_inode_info *lli = ll_i2info(inode);
1359 struct pcc_inode *pcci;
1360 struct dentry *dentry;
1361 struct file *pcc_filp;
1367 rc = pcc_attach_allowed_check(inode);
1371 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
1373 if (dataset == NULL)
1376 rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
1378 GOTO(out_dataset_put, rc);
1380 path.mnt = dataset->pccd_path.mnt;
1381 path.dentry = dentry;
1382 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1383 pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
1386 pcc_filp = dentry_open(path.dentry, path.mnt,
1387 O_TRUNC | O_WRONLY | O_LARGEFILE,
1390 if (IS_ERR_OR_NULL(pcc_filp)) {
1391 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
1392 GOTO(out_dentry, rc);
1395 rc = pcc_copy_data(file, pcc_filp);
1399 /* Pause to allow for a race with concurrent HSM remove */
1400 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
1402 pcc_inode_lock(inode);
1403 pcci = ll_i2pcci(inode);
1405 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1407 GOTO(out_unlock, rc = -ENOMEM);
1409 pcc_inode_init(pcci, lli);
1410 pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
1412 pcc_inode_unlock(inode);
1419 rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1421 CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1426 pcc_dataset_put(dataset);
1430 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1431 __u32 gen, bool lease_broken, int rc,
1434 struct ll_inode_info *lli = ll_i2info(inode);
1435 struct pcc_inode *pcci;
1440 pcc_inode_lock(inode);
1441 pcci = ll_i2pcci(inode);
1442 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
1443 if ((rc || lease_broken)) {
1444 if (attached && pcci)
1445 pcc_inode_put(pcci);
1447 GOTO(out_unlock, rc);
1450 /* PCC inode may be released due to layout lock revocatioin */
1452 GOTO(out_unlock, rc = -ESTALE);
1455 rc = ll_layout_refresh(inode, &gen2);
1458 pcc_layout_gen_set(pcci, gen);
1461 DFID" layout changed from %d to %d.\n",
1462 PFID(ll_inode2fid(inode)), gen, gen2);
1463 GOTO(out_put, rc = -ESTALE);
1469 pcc_inode_remove(pcci);
1470 pcc_inode_put(pcci);
1473 pcc_inode_unlock(inode);
1477 int pcc_ioctl_detach(struct inode *inode)
1479 struct ll_inode_info *lli = ll_i2info(inode);
1480 struct pcc_inode *pcci;
1485 pcc_inode_lock(inode);
1486 pcci = lli->lli_pcc_inode;
1487 if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
1488 !pcc_inode_has_layout(pcci))
1489 GOTO(out_unlock, rc = 0);
1491 __pcc_layout_invalidate(pcci);
1492 pcc_inode_put(pcci);
1495 pcc_inode_unlock(inode);
1499 int pcc_ioctl_state(struct file *file, struct inode *inode,
1500 struct lu_pcc_state *state)
1506 int buf_len = sizeof(state->pccs_path);
1507 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1508 struct pcc_file *pccf = &fd->fd_pcc_file;
1509 struct pcc_inode *pcci;
1516 OBD_ALLOC(buf, buf_len);
1520 pcc_inode_lock(inode);
1521 pcci = ll_i2pcci(inode);
1523 state->pccs_type = LU_PCC_NONE;
1524 GOTO(out_unlock, rc = 0);
1527 count = atomic_read(&pcci->pcci_refcount);
1529 state->pccs_type = LU_PCC_NONE;
1530 state->pccs_open_count = 0;
1531 GOTO(out_unlock, rc = 0);
1534 if (pcc_inode_has_layout(pcci))
1536 if (pccf->pccf_file != NULL)
1538 state->pccs_type = pcci->pcci_type;
1539 state->pccs_open_count = count;
1540 state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
1541 #ifdef HAVE_DENTRY_PATH_RAW
1542 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1544 GOTO(out_unlock, rc = PTR_ERR(path));
1549 if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1550 GOTO(out_unlock, rc = -ENAMETOOLONG);
1553 pcc_inode_unlock(inode);
1554 OBD_FREE(buf, buf_len);