4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 int pcc_super_init(struct pcc_super *super)
119 super->pccs_cred = cred = prepare_creds();
123 /* Never override disk quota limits or use reserved space */
124 cap_lower(cred->cap_effective, CAP_SYS_RESOURCE);
125 spin_lock_init(&super->pccs_lock);
126 INIT_LIST_HEAD(&super->pccs_datasets);
132 * pcc_dataset_add - Add a Cache policy to control which files need be
133 * cached and where it will be cached.
135 * @super: superblock of pcc
136 * @pathname: root path of pcc
137 * @id: HSM archive ID
138 * @projid: files with specified project ID will be cached.
141 pcc_dataset_add(struct pcc_super *super, const char *pathname,
142 __u32 archive_id, __u32 projid)
145 struct pcc_dataset *dataset;
146 struct pcc_dataset *tmp;
149 OBD_ALLOC_PTR(dataset);
153 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
155 OBD_FREE_PTR(dataset);
158 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
159 dataset->pccd_id = archive_id;
160 dataset->pccd_projid = projid;
161 atomic_set(&dataset->pccd_refcount, 1);
163 spin_lock(&super->pccs_lock);
164 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
165 if (tmp->pccd_id == archive_id) {
171 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
172 spin_unlock(&super->pccs_lock);
175 pcc_dataset_put(dataset);
183 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
185 struct pcc_dataset *dataset;
186 struct pcc_dataset *selected = NULL;
188 if (projid == 0 && archive_id == 0)
192 * archive ID is unique in the list, projid might be duplicate,
193 * we just return last added one as first priority.
195 spin_lock(&super->pccs_lock);
196 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
197 if (projid && dataset->pccd_projid != projid)
199 if (archive_id && dataset->pccd_id != archive_id)
201 atomic_inc(&dataset->pccd_refcount);
205 spin_unlock(&super->pccs_lock);
207 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
208 selected->pccd_projid);
213 pcc_dataset_put(struct pcc_dataset *dataset)
215 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
216 path_put(&dataset->pccd_path);
217 OBD_FREE_PTR(dataset);
222 pcc_dataset_del(struct pcc_super *super, char *pathname)
224 struct list_head *l, *tmp;
225 struct pcc_dataset *dataset;
228 spin_lock(&super->pccs_lock);
229 list_for_each_safe(l, tmp, &super->pccs_datasets) {
230 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
231 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
232 list_del(&dataset->pccd_linkage);
233 pcc_dataset_put(dataset);
238 spin_unlock(&super->pccs_lock);
243 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
245 seq_printf(m, "%s:\n", dataset->pccd_pathname);
246 seq_printf(m, " rwid: %u\n", dataset->pccd_id);
247 seq_printf(m, " autocache: projid=%u\n", dataset->pccd_projid);
251 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
253 struct pcc_dataset *dataset;
255 spin_lock(&super->pccs_lock);
256 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
257 pcc_dataset_dump(dataset, m);
259 spin_unlock(&super->pccs_lock);
263 static void pcc_remove_datasets(struct pcc_super *super)
265 struct pcc_dataset *dataset, *tmp;
267 list_for_each_entry_safe(dataset, tmp,
268 &super->pccs_datasets, pccd_linkage) {
269 list_del(&dataset->pccd_linkage);
270 pcc_dataset_put(dataset);
274 void pcc_super_fini(struct pcc_super *super)
276 pcc_remove_datasets(super);
277 put_cred(super->pccs_cred);
280 static bool pathname_is_valid(const char *pathname)
282 /* Needs to be absolute path */
283 if (pathname == NULL || strlen(pathname) == 0 ||
284 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
289 static struct pcc_cmd *
290 pcc_cmd_parse(char *buffer, unsigned long count)
292 static struct pcc_cmd *cmd;
300 GOTO(out, rc = -ENOMEM);
302 /* clear all setting */
303 if (strncmp(buffer, "clear", 5) == 0) {
304 cmd->pccc_cmd = PCC_CLEAR_ALL;
309 token = strsep(&val, " ");
310 if (val == NULL || strlen(val) == 0)
311 GOTO(out_free_cmd, rc = -EINVAL);
313 /* Type of the command */
314 if (strcmp(token, "add") == 0)
315 cmd->pccc_cmd = PCC_ADD_DATASET;
316 else if (strcmp(token, "del") == 0)
317 cmd->pccc_cmd = PCC_DEL_DATASET;
319 GOTO(out_free_cmd, rc = -EINVAL);
321 /* Pathname of the dataset */
322 token = strsep(&val, " ");
323 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
324 !pathname_is_valid(token))
325 GOTO(out_free_cmd, rc = -EINVAL);
326 cmd->pccc_pathname = token;
328 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
330 token = strsep(&val, " ");
332 GOTO(out_free_cmd, rc = -EINVAL);
334 rc = kstrtoul(token, 10, &tmp);
336 GOTO(out_free_cmd, rc = -EINVAL);
338 GOTO(out_free_cmd, rc = -EINVAL);
339 cmd->u.pccc_add.pccc_id = tmp;
342 rc = kstrtoul(token, 10, &tmp);
344 GOTO(out_free_cmd, rc = -EINVAL);
346 GOTO(out_free_cmd, rc = -EINVAL);
347 cmd->u.pccc_add.pccc_projid = tmp;
359 int pcc_cmd_handle(char *buffer, unsigned long count,
360 struct pcc_super *super)
365 cmd = pcc_cmd_parse(buffer, count);
369 switch (cmd->pccc_cmd) {
370 case PCC_ADD_DATASET:
371 rc = pcc_dataset_add(super, cmd->pccc_pathname,
372 cmd->u.pccc_add.pccc_id,
373 cmd->u.pccc_add.pccc_projid);
375 case PCC_DEL_DATASET:
376 rc = pcc_dataset_del(super, cmd->pccc_pathname);
379 pcc_remove_datasets(super);
390 static inline void pcc_inode_lock(struct inode *inode)
392 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
395 static inline void pcc_inode_unlock(struct inode *inode)
397 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
400 static void pcc_inode_init(struct pcc_inode *pcci, struct ll_inode_info *lli)
402 pcci->pcci_lli = lli;
403 lli->lli_pcc_inode = pcci;
404 atomic_set(&pcci->pcci_refcount, 0);
405 pcci->pcci_type = LU_PCC_NONE;
406 pcci->pcci_layout_gen = CL_LAYOUT_GEN_NONE;
407 atomic_set(&pcci->pcci_active_ios, 0);
408 init_waitqueue_head(&pcci->pcci_waitq);
411 static void pcc_inode_fini(struct pcc_inode *pcci)
413 struct ll_inode_info *lli = pcci->pcci_lli;
415 path_put(&pcci->pcci_path);
416 pcci->pcci_type = LU_PCC_NONE;
417 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
418 lli->lli_pcc_inode = NULL;
421 static void pcc_inode_get(struct pcc_inode *pcci)
423 atomic_inc(&pcci->pcci_refcount);
426 static void pcc_inode_put(struct pcc_inode *pcci)
428 if (atomic_dec_and_test(&pcci->pcci_refcount))
429 pcc_inode_fini(pcci);
432 void pcc_inode_free(struct inode *inode)
434 struct pcc_inode *pcci = ll_i2pcci(inode);
437 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
444 * As Andreas suggested, we'd better use new layout to
446 * (fid->f_oid >> 16 & oxFFFF)/FID
448 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
449 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
451 return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
453 (fid)->f_oid & 0xFFFF,
454 (fid)->f_oid >> 16 & 0xFFFF,
455 (unsigned int)((fid)->f_seq & 0xFFFF),
456 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
457 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
458 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
462 static inline const struct cred *pcc_super_cred(struct super_block *sb)
464 return ll_s2sbi(sb)->ll_pcc_super.pccs_cred;
467 void pcc_file_init(struct pcc_file *pccf)
469 pccf->pccf_file = NULL;
470 pccf->pccf_type = LU_PCC_NONE;
473 static inline bool pcc_inode_has_layout(struct pcc_inode *pcci)
475 return pcci->pcci_layout_gen != CL_LAYOUT_GEN_NONE;
478 int pcc_file_open(struct inode *inode, struct file *file)
480 struct pcc_inode *pcci;
481 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
482 struct pcc_file *pccf = &fd->fd_pcc_file;
483 struct file *pcc_file;
490 if (!S_ISREG(inode->i_mode))
493 pcc_inode_lock(inode);
494 pcci = ll_i2pcci(inode);
496 GOTO(out_unlock, rc = 0);
498 if (atomic_read(&pcci->pcci_refcount) == 0 ||
499 !pcc_inode_has_layout(pcci))
500 GOTO(out_unlock, rc = 0);
503 WARN_ON(pccf->pccf_file);
505 path = &pcci->pcci_path;
506 dname = &path->dentry->d_name;
507 CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
510 #ifdef HAVE_DENTRY_OPEN_USE_PATH
511 pcc_file = dentry_open(path, file->f_flags,
512 pcc_super_cred(inode->i_sb));
514 pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags,
515 pcc_super_cred(inode->i_sb));
517 if (IS_ERR_OR_NULL(pcc_file)) {
518 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
521 pccf->pccf_file = pcc_file;
522 pccf->pccf_type = pcci->pcci_type;
526 pcc_inode_unlock(inode);
530 void pcc_file_release(struct inode *inode, struct file *file)
532 struct pcc_inode *pcci;
533 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
534 struct pcc_file *pccf;
540 if (!S_ISREG(inode->i_mode) || fd == NULL)
543 pccf = &fd->fd_pcc_file;
544 pcc_inode_lock(inode);
545 if (pccf->pccf_file == NULL)
548 pcci = ll_i2pcci(inode);
550 path = &pcci->pcci_path;
551 dname = &path->dentry->d_name;
552 CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
555 fput(pccf->pccf_file);
556 pccf->pccf_file = NULL;
558 pcc_inode_unlock(inode);
562 static inline void pcc_layout_gen_set(struct pcc_inode *pcci,
565 pcci->pcci_layout_gen = gen;
568 static void pcc_io_init(struct inode *inode, bool *cached)
570 struct pcc_inode *pcci;
572 pcc_inode_lock(inode);
573 pcci = ll_i2pcci(inode);
574 if (pcci && pcc_inode_has_layout(pcci)) {
575 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
576 atomic_inc(&pcci->pcci_active_ios);
581 pcc_inode_unlock(inode);
584 static void pcc_io_fini(struct inode *inode)
586 struct pcc_inode *pcci = ll_i2pcci(inode);
588 LASSERT(pcci && atomic_read(&pcci->pcci_active_ios) > 0);
589 if (atomic_dec_and_test(&pcci->pcci_active_ios))
590 wake_up_all(&pcci->pcci_waitq);
595 __pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter)
597 struct file *file = iocb->ki_filp;
599 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
600 return file->f_op->read_iter(iocb, iter);
606 iov_for_each(iov, i, *iter) {
609 res = file->f_op->aio_read(iocb, &iov, 1, iocb->ki_pos);
610 if (-EIOCBQUEUED == res)
611 res = wait_on_sync_kiocb(iocb);
619 if (res < iov.iov_len)
624 iov_iter_advance(iter, bytes);
629 ssize_t pcc_file_read_iter(struct kiocb *iocb,
630 struct iov_iter *iter, bool *cached)
632 struct file *file = iocb->ki_filp;
633 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
634 struct pcc_file *pccf = &fd->fd_pcc_file;
635 struct inode *inode = file_inode(file);
640 if (pccf->pccf_file == NULL) {
645 pcc_io_init(inode, cached);
649 iocb->ki_filp = pccf->pccf_file;
650 /* generic_file_aio_read does not support ext4-dax,
651 * __pcc_file_read_iter uses ->aio_read hook directly
652 * to add support for ext4-dax.
654 result = __pcc_file_read_iter(iocb, iter);
655 iocb->ki_filp = file;
662 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
664 struct file *file = iocb->ki_filp;
666 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
667 return file->f_op->write_iter(iocb, iter);
673 iov_for_each(iov, i, *iter) {
676 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
677 if (-EIOCBQUEUED == res)
678 res = wait_on_sync_kiocb(iocb);
686 if (res < iov.iov_len)
691 iov_iter_advance(iter, bytes);
696 ssize_t pcc_file_write_iter(struct kiocb *iocb,
697 struct iov_iter *iter, bool *cached)
699 struct file *file = iocb->ki_filp;
700 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
701 struct pcc_file *pccf = &fd->fd_pcc_file;
702 struct inode *inode = file_inode(file);
707 if (pccf->pccf_file == NULL) {
712 if (pccf->pccf_type != LU_PCC_READWRITE) {
717 pcc_io_init(inode, cached);
721 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_FAKE_ERROR))
722 GOTO(out, result = -ENOSPC);
724 iocb->ki_filp = pccf->pccf_file;
726 /* Since __pcc_file_write_iter makes write calls via
727 * the normal vfs interface to the local PCC file system,
728 * the inode lock is not needed.
730 result = __pcc_file_write_iter(iocb, iter);
731 iocb->ki_filp = file;
737 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
741 const struct cred *old_cred;
742 struct iattr attr2 = *attr;
743 struct dentry *pcc_dentry;
744 struct pcc_inode *pcci;
748 if (!S_ISREG(inode->i_mode)) {
753 pcc_io_init(inode, cached);
757 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
758 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
759 ATTR_CTIME | ATTR_UID | ATTR_GID);
760 pcci = ll_i2pcci(inode);
761 pcc_dentry = pcci->pcci_path.dentry;
762 inode_lock(pcc_dentry->d_inode);
763 old_cred = override_creds(pcc_super_cred(inode->i_sb));
764 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
765 revert_creds(old_cred);
766 inode_unlock(pcc_dentry->d_inode);
772 int pcc_inode_getattr(struct inode *inode, bool *cached)
774 struct ll_inode_info *lli = ll_i2info(inode);
775 const struct cred *old_cred;
784 if (!S_ISREG(inode->i_mode)) {
789 pcc_io_init(inode, cached);
793 old_cred = override_creds(pcc_super_cred(inode->i_sb));
794 rc = ll_vfs_getattr(&ll_i2pcci(inode)->pcci_path, &stat);
795 revert_creds(old_cred);
799 ll_inode_size_lock(inode);
800 if (inode->i_atime.tv_sec < lli->lli_atime ||
801 lli->lli_update_atime) {
802 inode->i_atime.tv_sec = lli->lli_atime;
803 lli->lli_update_atime = 0;
805 inode->i_mtime.tv_sec = lli->lli_mtime;
806 inode->i_ctime.tv_sec = lli->lli_ctime;
808 atime = inode->i_atime.tv_sec;
809 mtime = inode->i_mtime.tv_sec;
810 ctime = inode->i_ctime.tv_sec;
812 if (atime < stat.atime.tv_sec)
813 atime = stat.atime.tv_sec;
815 if (ctime < stat.ctime.tv_sec)
816 ctime = stat.ctime.tv_sec;
818 if (mtime < stat.mtime.tv_sec)
819 mtime = stat.mtime.tv_sec;
821 i_size_write(inode, stat.size);
822 inode->i_blocks = stat.blocks;
824 inode->i_atime.tv_sec = atime;
825 inode->i_mtime.tv_sec = mtime;
826 inode->i_ctime.tv_sec = ctime;
828 ll_inode_size_unlock(inode);
834 ssize_t pcc_file_splice_read(struct file *in_file, loff_t *ppos,
835 struct pipe_inode_info *pipe,
836 size_t count, unsigned int flags,
839 struct inode *inode = file_inode(in_file);
840 struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
841 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
850 if (!file_inode(pcc_file)->i_fop->splice_read)
853 pcc_io_init(inode, cached);
857 result = file_inode(pcc_file)->i_fop->splice_read(pcc_file,
865 int pcc_fsync(struct file *file, loff_t start, loff_t end,
866 int datasync, bool *cached)
868 struct inode *inode = file_inode(file);
869 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
870 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
880 pcc_io_init(inode, cached);
884 #ifdef HAVE_FILE_FSYNC_4ARGS
885 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
886 start, end, datasync);
887 #elif defined(HAVE_FILE_FSYNC_2ARGS)
888 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file, datasync);
890 rc = file_inode(pcc_file)->i_fop->fsync(pcc_file,
891 file_dentry(dentry), datasync);
898 int pcc_file_mmap(struct file *file, struct vm_area_struct *vma,
901 struct inode *inode = file_inode(file);
902 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
903 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
904 struct pcc_inode *pcci;
909 if (!pcc_file || !file_inode(pcc_file)->i_fop->mmap) {
914 pcc_inode_lock(inode);
915 pcci = ll_i2pcci(inode);
916 if (pcci && pcc_inode_has_layout(pcci)) {
917 LASSERT(atomic_read(&pcci->pcci_refcount) > 1);
919 vma->vm_file = pcc_file;
920 rc = file_inode(pcc_file)->i_fop->mmap(pcc_file, vma);
922 /* Save the vm ops of backend PCC */
923 vma->vm_private_data = (void *)vma->vm_ops;
927 pcc_inode_unlock(inode);
932 void pcc_vm_open(struct vm_area_struct *vma)
934 struct pcc_inode *pcci;
935 struct file *file = vma->vm_file;
936 struct inode *inode = file_inode(file);
937 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
938 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
939 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
943 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->open)
946 pcc_inode_lock(inode);
947 pcci = ll_i2pcci(inode);
948 if (pcci && pcc_inode_has_layout(pcci)) {
949 vma->vm_file = pcc_file;
950 pcc_vm_ops->open(vma);
953 pcc_inode_unlock(inode);
957 void pcc_vm_close(struct vm_area_struct *vma)
959 struct file *file = vma->vm_file;
960 struct inode *inode = file_inode(file);
961 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
962 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
963 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
967 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->close)
970 pcc_inode_lock(inode);
971 /* Layout lock maybe revoked here */
972 vma->vm_file = pcc_file;
973 pcc_vm_ops->close(vma);
975 pcc_inode_unlock(inode);
979 int pcc_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
982 struct page *page = vmf->page;
983 struct mm_struct *mm = vma->vm_mm;
984 struct file *file = vma->vm_file;
985 struct inode *inode = file_inode(file);
986 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
987 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
988 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
993 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->page_mkwrite) {
998 /* Pause to allow for a race with concurrent detach */
999 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_MKWRITE_PAUSE, cfs_fail_val);
1001 pcc_io_init(inode, cached);
1003 /* This happens when the file is detached from PCC after got
1004 * the fault page via ->fault() on the inode of the PCC copy.
1005 * Here it can not simply fall back to normal Lustre I/O path.
1006 * The reason is that the address space of fault page used by
1007 * ->page_mkwrite() is still the one of PCC inode. In the
1008 * normal Lustre ->page_mkwrite() I/O path, it will be wrongly
1009 * handled as the address space of the fault page is not
1010 * consistent with the one of the Lustre inode (though the
1011 * fault page was truncated).
1012 * As the file is detached from PCC, the fault page must
1013 * be released frist, and retry the mmap write (->fault() and
1015 * We use an ugly and tricky method by returning
1016 * VM_FAULT_NOPAGE | VM_FAULT_RETRY to the caller
1017 * __do_page_fault and retry the memory fault handling.
1019 if (page->mapping == file_inode(pcc_file)->i_mapping) {
1021 up_read(&mm->mmap_sem);
1022 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1029 * This fault injection can also be used to simulate -ENOSPC and
1030 * -EDQUOT failure of underlying PCC backend fs.
1032 if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_PCC_DETACH_MKWRITE)) {
1034 pcc_ioctl_detach(inode);
1035 up_read(&mm->mmap_sem);
1036 RETURN(VM_FAULT_RETRY | VM_FAULT_NOPAGE);
1039 vma->vm_file = pcc_file;
1040 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1041 rc = pcc_vm_ops->page_mkwrite(vmf);
1043 rc = pcc_vm_ops->page_mkwrite(vma, vmf);
1045 vma->vm_file = file;
1051 int pcc_fault(struct vm_area_struct *vma, struct vm_fault *vmf,
1054 struct file *file = vma->vm_file;
1055 struct inode *inode = file_inode(file);
1056 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1057 struct file *pcc_file = fd->fd_pcc_file.pccf_file;
1058 struct vm_operations_struct *pcc_vm_ops = vma->vm_private_data;
1063 if (!pcc_file || !pcc_vm_ops || !pcc_vm_ops->fault) {
1068 pcc_io_init(inode, cached);
1072 vma->vm_file = pcc_file;
1073 #ifdef HAVE_VM_OPS_USE_VM_FAULT_ONLY
1074 rc = pcc_vm_ops->fault(vmf);
1076 rc = pcc_vm_ops->fault(vma, vmf);
1078 vma->vm_file = file;
1084 static void pcc_layout_wait(struct pcc_inode *pcci)
1086 struct l_wait_info lwi = { 0 };
1088 while (atomic_read(&pcci->pcci_active_ios) > 0) {
1089 CDEBUG(D_CACHE, "Waiting for IO completion: %d\n",
1090 atomic_read(&pcci->pcci_active_ios));
1091 l_wait_event(pcci->pcci_waitq,
1092 atomic_read(&pcci->pcci_active_ios) == 0, &lwi);
1096 static void __pcc_layout_invalidate(struct pcc_inode *pcci)
1098 pcci->pcci_type = LU_PCC_NONE;
1099 pcc_layout_gen_set(pcci, CL_LAYOUT_GEN_NONE);
1100 pcc_layout_wait(pcci);
1103 void pcc_layout_invalidate(struct inode *inode)
1105 struct pcc_inode *pcci;
1107 pcc_inode_lock(inode);
1108 pcci = ll_i2pcci(inode);
1109 if (pcci && pcc_inode_has_layout(pcci)) {
1110 LASSERT(atomic_read(&pcci->pcci_refcount) > 0);
1111 __pcc_layout_invalidate(pcci);
1113 CDEBUG(D_CACHE, "Invalidate "DFID" layout gen %d\n",
1114 PFID(&ll_i2info(inode)->lli_fid), pcci->pcci_layout_gen);
1116 pcc_inode_put(pcci);
1118 pcc_inode_unlock(inode);
1121 static int pcc_inode_remove(struct pcc_inode *pcci)
1123 struct dentry *dentry;
1126 dentry = pcci->pcci_path.dentry;
1127 rc = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1129 CWARN("failed to unlink cached file, rc = %d\n", rc);
1134 /* Create directory under base if directory does not exist */
1135 static struct dentry *
1136 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
1139 struct dentry *dentry;
1140 struct inode *dir = base->d_inode;
1143 dentry = lookup_one_len(name, base, strlen(name));
1147 if (d_is_positive(dentry))
1150 rc = vfs_mkdir(dir, dentry, mode);
1153 dentry = ERR_PTR(rc);
1161 static struct dentry *
1162 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
1164 char *ptr, *entry_name;
1165 struct dentry *parent;
1166 struct dentry *child = ERR_PTR(-EINVAL);
1173 parent = dget(root);
1174 while ((ptr = strchr(ptr, '/')) != NULL) {
1176 child = pcc_mkdir(parent, entry_name, mode);
1190 /* Create file under base. If file already exist, return failure */
1191 static struct dentry *
1192 pcc_create(struct dentry *base, const char *name, umode_t mode)
1195 struct dentry *dentry;
1196 struct inode *dir = base->d_inode;
1199 dentry = lookup_one_len(name, base, strlen(name));
1203 if (d_is_positive(dentry))
1206 rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
1209 dentry = ERR_PTR(rc);
1217 /* Must be called with pcci->pcci_lock held */
1218 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
1219 struct pcc_inode *pcci,
1220 struct dentry *dentry,
1221 enum lu_pcc_type type)
1223 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
1224 pcci->pcci_path.dentry = dentry;
1225 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
1226 atomic_set(&pcci->pcci_refcount, 1);
1227 pcci->pcci_type = type;
1228 pcci->pcci_attr_valid = false;
1231 static int __pcc_inode_create(struct pcc_dataset *dataset,
1233 struct dentry **dentry)
1236 struct dentry *base;
1237 struct dentry *child;
1240 OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
1244 pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
1246 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0);
1252 snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
1253 child = pcc_create(base, path, 0);
1254 if (IS_ERR(child)) {
1255 rc = PTR_ERR(child);
1263 OBD_FREE(path, MAX_PCC_DATABASE_PATH);
1267 /* TODO: Set the project ID for PCC copy */
1268 int pcc_inode_store_ugpid(struct dentry *dentry, kuid_t uid, kgid_t gid)
1270 struct inode *inode = dentry->d_inode;
1276 attr.ia_valid = ATTR_UID | ATTR_GID;
1281 rc = notify_change(dentry, &attr, NULL);
1282 inode_unlock(inode);
1287 int pcc_inode_create(struct super_block *sb, struct pcc_dataset *dataset,
1288 struct lu_fid *fid, struct dentry **pcc_dentry)
1290 const struct cred *old_cred;
1293 old_cred = override_creds(pcc_super_cred(sb));
1294 rc = __pcc_inode_create(dataset, fid, pcc_dentry);
1295 revert_creds(old_cred);
1299 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
1300 struct dentry *pcc_dentry)
1302 const struct cred *old_cred;
1303 struct pcc_inode *pcci;
1308 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1309 pcc_inode_lock(inode);
1310 LASSERT(ll_i2pcci(inode) == NULL);
1311 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1313 GOTO(out_unlock, rc = -ENOMEM);
1315 rc = pcc_inode_store_ugpid(pcc_dentry, old_cred->suid,
1318 GOTO(out_unlock, rc);
1320 pcc_inode_init(pcci, ll_i2info(inode));
1321 pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
1322 /* Set the layout generation of newly created file with 0 */
1323 pcc_layout_gen_set(pcci, 0);
1329 rc2 = ll_vfs_unlink(pcc_dentry->d_parent->d_inode, pcc_dentry);
1331 CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1336 pcc_inode_unlock(inode);
1337 revert_creds(old_cred);
1339 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1344 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
1350 size = vfs_write(filp, (const void __user *)buf, count, offset);
1359 static int pcc_copy_data(struct file *src, struct file *dst)
1364 loff_t pos, offset = 0;
1365 size_t buf_len = 1048576;
1370 OBD_ALLOC_LARGE(buf, buf_len);
1378 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
1380 GOTO(out_fs, rc = rc2);
1385 rc = pcc_filp_write(dst, buf, rc2, &pos);
1393 OBD_FREE_LARGE(buf, buf_len);
1397 static int pcc_attach_allowed_check(struct inode *inode)
1399 struct ll_inode_info *lli = ll_i2info(inode);
1400 struct pcc_inode *pcci;
1405 pcc_inode_lock(inode);
1406 if (lli->lli_pcc_state & PCC_STATE_FL_ATTACHING)
1407 GOTO(out_unlock, rc = -EBUSY);
1409 pcci = ll_i2pcci(inode);
1410 if (pcci && pcc_inode_has_layout(pcci))
1411 GOTO(out_unlock, rc = -EEXIST);
1413 lli->lli_pcc_state |= PCC_STATE_FL_ATTACHING;
1415 pcc_inode_unlock(inode);
1419 int pcc_readwrite_attach(struct file *file, struct inode *inode,
1422 struct pcc_dataset *dataset;
1423 struct ll_inode_info *lli = ll_i2info(inode);
1424 struct pcc_inode *pcci;
1425 const struct cred *old_cred;
1426 struct dentry *dentry;
1427 struct file *pcc_filp;
1433 rc = pcc_attach_allowed_check(inode);
1437 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
1439 if (dataset == NULL)
1442 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1443 rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
1445 revert_creds(old_cred);
1446 GOTO(out_dataset_put, rc);
1449 path.mnt = dataset->pccd_path.mnt;
1450 path.dentry = dentry;
1451 #ifdef HAVE_DENTRY_OPEN_USE_PATH
1452 pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
1455 pcc_filp = dentry_open(path.dentry, path.mnt,
1456 O_TRUNC | O_WRONLY | O_LARGEFILE,
1459 if (IS_ERR_OR_NULL(pcc_filp)) {
1460 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
1461 revert_creds(old_cred);
1462 GOTO(out_dentry, rc);
1465 rc = pcc_inode_store_ugpid(dentry, old_cred->uid, old_cred->gid);
1466 revert_creds(old_cred);
1470 rc = pcc_copy_data(file, pcc_filp);
1474 /* Pause to allow for a race with concurrent HSM remove */
1475 OBD_FAIL_TIMEOUT(OBD_FAIL_LLITE_PCC_ATTACH_PAUSE, cfs_fail_val);
1477 pcc_inode_lock(inode);
1478 pcci = ll_i2pcci(inode);
1480 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
1482 GOTO(out_unlock, rc = -ENOMEM);
1484 pcc_inode_init(pcci, lli);
1485 pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
1487 pcc_inode_unlock(inode);
1494 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1495 rc2 = ll_vfs_unlink(dentry->d_parent->d_inode, dentry);
1496 revert_creds(old_cred);
1498 CWARN("failed to unlink PCC file, rc = %d\n", rc2);
1503 pcc_dataset_put(dataset);
1507 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1508 __u32 gen, bool lease_broken, int rc,
1511 struct ll_inode_info *lli = ll_i2info(inode);
1512 const struct cred *old_cred;
1513 struct pcc_inode *pcci;
1518 pcc_inode_lock(inode);
1519 pcci = ll_i2pcci(inode);
1520 lli->lli_pcc_state &= ~PCC_STATE_FL_ATTACHING;
1521 if (rc || lease_broken) {
1522 if (attached && pcci)
1523 pcc_inode_put(pcci);
1525 GOTO(out_unlock, rc);
1528 /* PCC inode may be released due to layout lock revocatioin */
1530 GOTO(out_unlock, rc = -ESTALE);
1533 rc = ll_layout_refresh(inode, &gen2);
1536 pcc_layout_gen_set(pcci, gen);
1539 DFID" layout changed from %d to %d.\n",
1540 PFID(ll_inode2fid(inode)), gen, gen2);
1541 GOTO(out_put, rc = -ESTALE);
1547 old_cred = override_creds(pcc_super_cred(inode->i_sb));
1548 pcc_inode_remove(pcci);
1549 revert_creds(old_cred);
1550 pcc_inode_put(pcci);
1553 pcc_inode_unlock(inode);
1557 int pcc_ioctl_detach(struct inode *inode)
1559 struct ll_inode_info *lli = ll_i2info(inode);
1560 struct pcc_inode *pcci;
1565 pcc_inode_lock(inode);
1566 pcci = lli->lli_pcc_inode;
1567 if (!pcci || lli->lli_pcc_state & PCC_STATE_FL_ATTACHING ||
1568 !pcc_inode_has_layout(pcci))
1569 GOTO(out_unlock, rc = 0);
1571 __pcc_layout_invalidate(pcci);
1572 pcc_inode_put(pcci);
1575 pcc_inode_unlock(inode);
1579 int pcc_ioctl_state(struct file *file, struct inode *inode,
1580 struct lu_pcc_state *state)
1586 int buf_len = sizeof(state->pccs_path);
1587 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
1588 struct pcc_file *pccf = &fd->fd_pcc_file;
1589 struct pcc_inode *pcci;
1596 OBD_ALLOC(buf, buf_len);
1600 pcc_inode_lock(inode);
1601 pcci = ll_i2pcci(inode);
1603 state->pccs_type = LU_PCC_NONE;
1604 GOTO(out_unlock, rc = 0);
1607 count = atomic_read(&pcci->pcci_refcount);
1609 state->pccs_type = LU_PCC_NONE;
1610 state->pccs_open_count = 0;
1611 GOTO(out_unlock, rc = 0);
1614 if (pcc_inode_has_layout(pcci))
1616 if (pccf->pccf_file != NULL)
1618 state->pccs_type = pcci->pcci_type;
1619 state->pccs_open_count = count;
1620 state->pccs_flags = ll_i2info(inode)->lli_pcc_state;
1621 #ifdef HAVE_DENTRY_PATH_RAW
1622 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1624 GOTO(out_unlock, rc = PTR_ERR(path));
1629 if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1630 GOTO(out_unlock, rc = -ENAMETOOLONG);
1633 pcc_inode_unlock(inode);
1634 OBD_FREE(buf, buf_len);