4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, DDN Storage Corporation.
26 * Persistent Client Cache
28 * PCC is a new framework which provides a group of local cache on Lustre
29 * client side. It works in two modes: RW-PCC enables a read-write cache on the
30 * local SSDs of a single client; RO-PCC provides a read-only cache on the
31 * local SSDs of multiple clients. Less overhead is visible to the applications
32 * and network latencies and lock conflicts can be significantly reduced.
34 * For RW-PCC, no global namespace will be provided. Each client uses its own
35 * local storage as a cache for itself. Local file system is used to manage
36 * the data on local caches. Cached I/O is directed to local file system while
37 * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization.
38 * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each
39 * PCC has a copytool instance running with unique archive number. Any remote
40 * access from another Lustre client would trigger the data synchronization. If
41 * a client with RW-PCC goes offline, the cached data becomes inaccessible for
42 * other client temporarily. And after the RW-PCC client reboots and the
43 * copytool restarts, the data will be accessible again.
45 * Following is what will happen in different conditions for RW-PCC:
47 * > When file is being created on RW-PCC
49 * A normal HSM released file is created on MDT;
50 * An empty mirror file is created on local cache;
51 * The HSM status of the Lustre file will be set to archived and released;
52 * The archive number will be set to the proper value.
54 * > When file is being prefetched to RW-PCC
56 * An file is copied to the local cache;
57 * The HSM status of the Lustre file will be set to archived and released;
58 * The archive number will be set to the proper value.
60 * > When file is being accessed from PCC
62 * Data will be read directly from local cache;
63 * Metadata will be read from MDT, except file size;
64 * File size will be got from local cache.
66 * > When PCC cached file is being accessed on another client
68 * RW-PCC cached files are automatically restored when a process on another
69 * client tries to read or modify them. The corresponding I/O will block
70 * waiting for the released file to be restored. This is transparent to the
73 * For RW-PCC, when a file is being created, a rule-based policy is used to
74 * determine whether it will be cached. Rule-based caching of newly created
75 * files can determine which file can use a cache on PCC directly without any
78 * RW-PCC design can accelerate I/O intensive applications with one-to-one
79 * mappings between files and accessing clients. However, in several use cases,
80 * files will never be updated, but need to be read simultaneously from many
81 * clients. RO-PCC implements a read-only caching on Lustre clients using
82 * SSDs. RO-PCC is based on the same framework as RW-PCC, expect
83 * that no HSM mechanism is used.
85 * The main advantages to use this SSD cache on the Lustre clients via PCC
87 * - The I/O stack becomes much simpler for the cached data, as there is no
88 * interference with I/Os from other clients, which enables easier
89 * performance optimizations;
90 * - The requirements on the HW inside the client nodes are small, any kind of
91 * SSDs or even HDDs can be used as cache devices;
92 * - Caching reduces the pressure on the object storage targets (OSTs), as
93 * small or random I/Os can be regularized to big sequential I/Os and
94 * temporary files do not even need to be flushed to OSTs.
96 * PCC can accelerate applications with certain I/O patterns:
97 * - small-sized random writes (< 1MB) from a single client
98 * - repeated read of data that is larger than RAM
99 * - clients with high network latency
101 * Author: Li Xi <lixi@ddn.com>
102 * Author: Qian Yingjin <qian@ddn.com>
105 #define DEBUG_SUBSYSTEM S_LLITE
108 #include <linux/namei.h>
109 #include <linux/file.h>
110 #include <lustre_compat.h>
111 #include "llite_internal.h"
113 struct kmem_cache *pcc_inode_slab;
115 void pcc_super_init(struct pcc_super *super)
117 spin_lock_init(&super->pccs_lock);
118 INIT_LIST_HEAD(&super->pccs_datasets);
122 * pcc_dataset_add - Add a Cache policy to control which files need be
123 * cached and where it will be cached.
125 * @super: superblock of pcc
126 * @pathname: root path of pcc
127 * @id: HSM archive ID
128 * @projid: files with specified project ID will be cached.
131 pcc_dataset_add(struct pcc_super *super, const char *pathname,
132 __u32 archive_id, __u32 projid)
135 struct pcc_dataset *dataset;
136 struct pcc_dataset *tmp;
139 OBD_ALLOC_PTR(dataset);
143 rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path);
145 OBD_FREE_PTR(dataset);
148 strncpy(dataset->pccd_pathname, pathname, PATH_MAX);
149 dataset->pccd_id = archive_id;
150 dataset->pccd_projid = projid;
151 atomic_set(&dataset->pccd_refcount, 1);
153 spin_lock(&super->pccs_lock);
154 list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) {
155 if (tmp->pccd_id == archive_id) {
161 list_add(&dataset->pccd_linkage, &super->pccs_datasets);
162 spin_unlock(&super->pccs_lock);
165 pcc_dataset_put(dataset);
173 pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id)
175 struct pcc_dataset *dataset;
176 struct pcc_dataset *selected = NULL;
178 if (projid == 0 && archive_id == 0)
182 * archive ID is unique in the list, projid might be duplicate,
183 * we just return last added one as first priority.
185 spin_lock(&super->pccs_lock);
186 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
187 if (projid && dataset->pccd_projid != projid)
189 if (archive_id && dataset->pccd_id != archive_id)
191 atomic_inc(&dataset->pccd_refcount);
195 spin_unlock(&super->pccs_lock);
197 CDEBUG(D_CACHE, "matched projid %u, PCC create\n",
198 selected->pccd_projid);
203 pcc_dataset_put(struct pcc_dataset *dataset)
205 if (atomic_dec_and_test(&dataset->pccd_refcount)) {
206 path_put(&dataset->pccd_path);
207 OBD_FREE_PTR(dataset);
212 pcc_dataset_del(struct pcc_super *super, char *pathname)
214 struct list_head *l, *tmp;
215 struct pcc_dataset *dataset;
218 spin_lock(&super->pccs_lock);
219 list_for_each_safe(l, tmp, &super->pccs_datasets) {
220 dataset = list_entry(l, struct pcc_dataset, pccd_linkage);
221 if (strcmp(dataset->pccd_pathname, pathname) == 0) {
222 list_del(&dataset->pccd_linkage);
223 pcc_dataset_put(dataset);
228 spin_unlock(&super->pccs_lock);
233 pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m)
235 seq_printf(m, "%s:\n", dataset->pccd_pathname);
236 seq_printf(m, " rwid: %u\n", dataset->pccd_id);
237 seq_printf(m, " autocache: projid=%u\n", dataset->pccd_projid);
241 pcc_super_dump(struct pcc_super *super, struct seq_file *m)
243 struct pcc_dataset *dataset;
245 spin_lock(&super->pccs_lock);
246 list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) {
247 pcc_dataset_dump(dataset, m);
249 spin_unlock(&super->pccs_lock);
253 void pcc_super_fini(struct pcc_super *super)
255 struct pcc_dataset *dataset, *tmp;
257 list_for_each_entry_safe(dataset, tmp,
258 &super->pccs_datasets, pccd_linkage) {
259 list_del(&dataset->pccd_linkage);
260 pcc_dataset_put(dataset);
265 static bool pathname_is_valid(const char *pathname)
267 /* Needs to be absolute path */
268 if (pathname == NULL || strlen(pathname) == 0 ||
269 strlen(pathname) >= PATH_MAX || pathname[0] != '/')
274 static struct pcc_cmd *
275 pcc_cmd_parse(char *buffer, unsigned long count)
277 static struct pcc_cmd *cmd;
285 GOTO(out, rc = -ENOMEM);
287 /* clear all setting */
288 if (strncmp(buffer, "clear", 5) == 0) {
289 cmd->pccc_cmd = PCC_CLEAR_ALL;
294 token = strsep(&val, " ");
295 if (val == NULL || strlen(val) == 0)
296 GOTO(out_free_cmd, rc = -EINVAL);
298 /* Type of the command */
299 if (strcmp(token, "add") == 0)
300 cmd->pccc_cmd = PCC_ADD_DATASET;
301 else if (strcmp(token, "del") == 0)
302 cmd->pccc_cmd = PCC_DEL_DATASET;
304 GOTO(out_free_cmd, rc = -EINVAL);
306 /* Pathname of the dataset */
307 token = strsep(&val, " ");
308 if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) ||
309 !pathname_is_valid(token))
310 GOTO(out_free_cmd, rc = -EINVAL);
311 cmd->pccc_pathname = token;
313 if (cmd->pccc_cmd == PCC_ADD_DATASET) {
315 token = strsep(&val, " ");
317 GOTO(out_free_cmd, rc = -EINVAL);
319 rc = kstrtoul(token, 10, &tmp);
321 GOTO(out_free_cmd, rc = -EINVAL);
323 GOTO(out_free_cmd, rc = -EINVAL);
324 cmd->u.pccc_add.pccc_id = tmp;
327 rc = kstrtoul(token, 10, &tmp);
329 GOTO(out_free_cmd, rc = -EINVAL);
331 GOTO(out_free_cmd, rc = -EINVAL);
332 cmd->u.pccc_add.pccc_projid = tmp;
344 int pcc_cmd_handle(char *buffer, unsigned long count,
345 struct pcc_super *super)
350 cmd = pcc_cmd_parse(buffer, count);
354 switch (cmd->pccc_cmd) {
355 case PCC_ADD_DATASET:
356 rc = pcc_dataset_add(super, cmd->pccc_pathname,
357 cmd->u.pccc_add.pccc_id,
358 cmd->u.pccc_add.pccc_projid);
360 case PCC_DEL_DATASET:
361 rc = pcc_dataset_del(super, cmd->pccc_pathname);
364 pcc_super_fini(super);
375 static inline void pcc_inode_lock(struct inode *inode)
377 mutex_lock(&ll_i2info(inode)->lli_pcc_lock);
380 static inline void pcc_inode_unlock(struct inode *inode)
382 mutex_unlock(&ll_i2info(inode)->lli_pcc_lock);
385 static void pcc_inode_init(struct pcc_inode *pcci)
387 atomic_set(&pcci->pcci_refcount, 0);
388 pcci->pcci_type = LU_PCC_NONE;
391 static void pcc_inode_fini(struct pcc_inode *pcci)
393 path_put(&pcci->pcci_path);
394 pcci->pcci_type = LU_PCC_NONE;
395 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
398 static void pcc_inode_get(struct pcc_inode *pcci)
400 atomic_inc(&pcci->pcci_refcount);
403 static void pcc_inode_put(struct pcc_inode *pcci)
405 if (atomic_dec_and_test(&pcci->pcci_refcount))
406 pcc_inode_fini(pcci);
409 void pcc_inode_free(struct inode *inode)
411 struct ll_inode_info *lli = ll_i2info(inode);
412 struct pcc_inode *pcci = lli->lli_pcc_inode;
415 WARN_ON(atomic_read(&pcci->pcci_refcount) > 1);
417 lli->lli_pcc_inode = NULL;
423 * As Andreas suggested, we'd better use new layout to
425 * (fid->f_oid >> 16 & oxFFFF)/FID
427 #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1)
428 static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid)
430 return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/"
432 (fid)->f_oid & 0xFFFF,
433 (fid)->f_oid >> 16 & 0xFFFF,
434 (unsigned int)((fid)->f_seq & 0xFFFF),
435 (unsigned int)((fid)->f_seq >> 16 & 0xFFFF),
436 (unsigned int)((fid)->f_seq >> 32 & 0xFFFF),
437 (unsigned int)((fid)->f_seq >> 48 & 0xFFFF),
441 void pcc_file_init(struct pcc_file *pccf)
443 pccf->pccf_file = NULL;
444 pccf->pccf_type = LU_PCC_NONE;
447 int pcc_file_open(struct inode *inode, struct file *file)
449 struct pcc_inode *pcci;
450 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
451 struct pcc_file *pccf = &fd->fd_pcc_file;
452 struct file *pcc_file;
459 if (!S_ISREG(inode->i_mode))
462 pcc_inode_lock(inode);
463 pcci = ll_i2pcci(inode);
465 GOTO(out_unlock, rc = 0);
467 if (atomic_read(&pcci->pcci_refcount) == 0)
468 GOTO(out_unlock, rc = 0);
471 WARN_ON(pccf->pccf_file);
473 path = &pcci->pcci_path;
474 dname = &path->dentry->d_name;
475 CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len,
477 #ifdef HAVE_DENTRY_OPEN_USE_PATH
478 pcc_file = dentry_open(path, file->f_flags, current_cred());
480 pcc_file = dentry_open(path->dentry, path->mnt,
481 file->f_flags, current_cred());
483 if (IS_ERR_OR_NULL(pcc_file)) {
484 rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file);
487 pccf->pccf_file = pcc_file;
488 pccf->pccf_type = pcci->pcci_type;
492 pcc_inode_unlock(inode);
496 void pcc_file_release(struct inode *inode, struct file *file)
498 struct pcc_inode *pcci;
499 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
500 struct pcc_file *pccf;
506 if (!S_ISREG(inode->i_mode) || fd == NULL)
509 pccf = &fd->fd_pcc_file;
510 pcc_inode_lock(inode);
511 if (pccf->pccf_file == NULL)
514 pcci = ll_i2pcci(inode);
516 path = &pcci->pcci_path;
517 dname = &path->dentry->d_name;
518 CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len,
521 fput(pccf->pccf_file);
522 pccf->pccf_file = NULL;
524 pcc_inode_unlock(inode);
527 ssize_t pcc_file_read_iter(struct kiocb *iocb,
528 struct iov_iter *iter, bool *cached)
530 struct file *file = iocb->ki_filp;
531 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
532 struct pcc_file *pccf = &fd->fd_pcc_file;
537 if (pccf->pccf_file == NULL) {
542 iocb->ki_filp = pccf->pccf_file;
544 result = generic_file_read_iter(iocb, iter);
545 iocb->ki_filp = file;
551 __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter)
553 struct file *file = iocb->ki_filp;
555 #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER
556 return file->f_op->write_iter(iocb, iter);
562 iov_for_each(iov, i, *iter) {
565 res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos);
566 if (-EIOCBQUEUED == res)
567 res = wait_on_sync_kiocb(iocb);
575 if (res < iov.iov_len)
580 iov_iter_advance(iter, bytes);
585 ssize_t pcc_file_write_iter(struct kiocb *iocb,
586 struct iov_iter *iter, bool *cached)
588 struct file *file = iocb->ki_filp;
589 struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
590 struct pcc_file *pccf = &fd->fd_pcc_file;
595 if (pccf->pccf_file == NULL) {
601 if (pccf->pccf_type != LU_PCC_READWRITE)
602 RETURN(-EWOULDBLOCK);
604 iocb->ki_filp = pccf->pccf_file;
606 /* Since __pcc_file_write_iter makes write calls via
607 * the normal vfs interface to the local PCC file system,
608 * the inode lock is not needed.
610 result = __pcc_file_write_iter(iocb, iter);
611 iocb->ki_filp = file;
615 int pcc_inode_setattr(struct inode *inode, struct iattr *attr,
619 struct pcc_inode *pcci;
620 struct iattr attr2 = *attr;
621 struct dentry *pcc_dentry;
625 if (!S_ISREG(inode->i_mode)) {
630 pcc_inode_lock(inode);
631 pcci = ll_i2pcci(inode);
632 if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
633 GOTO(out_unlock, rc = 0);
636 attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME |
637 ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET |
639 pcc_dentry = pcci->pcci_path.dentry;
640 inode_lock(pcc_dentry->d_inode);
641 rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2);
642 inode_unlock(pcc_dentry->d_inode);
644 pcc_inode_unlock(inode);
648 int pcc_inode_getattr(struct inode *inode, bool *cached)
650 struct ll_inode_info *lli = ll_i2info(inode);
651 struct pcc_inode *pcci;
660 if (!S_ISREG(inode->i_mode)) {
665 pcc_inode_lock(inode);
666 pcci = ll_i2pcci(inode);
667 if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0)
668 GOTO(out_unlock, rc = 0);
671 rc = ll_vfs_getattr(&pcci->pcci_path, &stat);
673 GOTO(out_unlock, rc);
675 ll_inode_size_lock(inode);
676 if (inode->i_atime.tv_sec < lli->lli_atime ||
677 lli->lli_update_atime) {
678 inode->i_atime.tv_sec = lli->lli_atime;
679 lli->lli_update_atime = 0;
681 inode->i_mtime.tv_sec = lli->lli_mtime;
682 inode->i_ctime.tv_sec = lli->lli_ctime;
684 atime = inode->i_atime.tv_sec;
685 mtime = inode->i_mtime.tv_sec;
686 ctime = inode->i_ctime.tv_sec;
688 if (atime < stat.atime.tv_sec)
689 atime = stat.atime.tv_sec;
691 if (ctime < stat.ctime.tv_sec)
692 ctime = stat.ctime.tv_sec;
694 if (mtime < stat.mtime.tv_sec)
695 mtime = stat.mtime.tv_sec;
697 i_size_write(inode, stat.size);
698 inode->i_blocks = stat.blocks;
700 inode->i_atime.tv_sec = atime;
701 inode->i_mtime.tv_sec = mtime;
702 inode->i_ctime.tv_sec = ctime;
704 ll_inode_size_unlock(inode);
707 pcc_inode_unlock(inode);
711 /* Create directory under base if directory does not exist */
712 static struct dentry *
713 pcc_mkdir(struct dentry *base, const char *name, umode_t mode)
716 struct dentry *dentry;
717 struct inode *dir = base->d_inode;
720 dentry = lookup_one_len(name, base, strlen(name));
724 if (d_is_positive(dentry))
727 rc = vfs_mkdir(dir, dentry, mode);
730 dentry = ERR_PTR(rc);
738 static struct dentry *
739 pcc_mkdir_p(struct dentry *root, char *path, umode_t mode)
741 char *ptr, *entry_name;
742 struct dentry *parent;
743 struct dentry *child = ERR_PTR(-EINVAL);
751 while ((ptr = strchr(ptr, '/')) != NULL) {
753 child = pcc_mkdir(parent, entry_name, mode);
766 /* Create file under base. If file already exist, return failure */
767 static struct dentry *
768 pcc_create(struct dentry *base, const char *name, umode_t mode)
771 struct dentry *dentry;
772 struct inode *dir = base->d_inode;
775 dentry = lookup_one_len(name, base, strlen(name));
779 if (d_is_positive(dentry))
782 rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE);
785 dentry = ERR_PTR(rc);
793 /* Must be called with pcci->pcci_lock held */
794 static void pcc_inode_attach_init(struct pcc_dataset *dataset,
795 struct pcc_inode *pcci,
796 struct dentry *dentry,
797 enum lu_pcc_type type)
799 pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt);
800 pcci->pcci_path.dentry = dentry;
801 LASSERT(atomic_read(&pcci->pcci_refcount) == 0);
802 atomic_set(&pcci->pcci_refcount, 1);
803 pcci->pcci_type = type;
804 pcci->pcci_attr_valid = false;
807 static int __pcc_inode_create(struct pcc_dataset *dataset,
809 struct dentry **dentry)
813 struct dentry *child;
816 OBD_ALLOC(path, MAX_PCC_DATABASE_PATH);
820 pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid);
822 base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700);
828 snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid));
829 child = pcc_create(base, path, 0600);
839 OBD_FREE(path, MAX_PCC_DATABASE_PATH);
843 int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid,
844 struct dentry **pcc_dentry)
846 return __pcc_inode_create(dataset, fid, pcc_dentry);
849 int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode,
850 struct dentry *pcc_dentry)
852 struct ll_inode_info *lli = ll_i2info(inode);
853 struct pcc_inode *pcci;
857 LASSERT(ll_i2pcci(inode) == NULL);
858 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
862 pcc_inode_init(pcci);
863 pcc_inode_lock(inode);
864 pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE);
865 lli->lli_pcc_inode = pcci;
866 pcc_inode_unlock(inode);
871 static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count,
877 size = vfs_write(filp, (const void __user *)buf, count, offset);
886 static int pcc_copy_data(struct file *src, struct file *dst)
891 loff_t pos, offset = 0;
892 size_t buf_len = 1048576;
897 OBD_ALLOC_LARGE(buf, buf_len);
905 rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos);
907 GOTO(out_fs, rc = rc2);
912 rc = pcc_filp_write(dst, buf, rc2, &pos);
920 OBD_FREE_LARGE(buf, buf_len);
924 int pcc_readwrite_attach(struct file *file, struct inode *inode,
927 struct pcc_dataset *dataset;
928 struct ll_inode_info *lli = ll_i2info(inode);
929 struct pcc_inode *pcci;
930 struct dentry *dentry;
931 struct file *pcc_filp;
937 pcc_inode_lock(inode);
938 pcci = ll_i2pcci(inode);
940 OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS);
942 pcc_inode_unlock(inode);
946 pcc_inode_init(pcci);
947 } else if (atomic_read(&pcci->pcci_refcount) > 0) {
948 pcc_inode_unlock(inode);
951 pcc_inode_unlock(inode);
953 dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0,
956 GOTO(out_free_pcci, rc = -ENOENT);
958 rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry);
960 GOTO(out_dataset_put, rc);
962 path.mnt = dataset->pccd_path.mnt;
963 path.dentry = dentry;
964 #ifdef HAVE_DENTRY_OPEN_USE_PATH
965 pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE,
968 pcc_filp = dentry_open(path.dentry, path.mnt,
969 O_TRUNC | O_WRONLY | O_LARGEFILE,
972 if (IS_ERR_OR_NULL(pcc_filp)) {
973 rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp);
974 GOTO(out_dentry, rc);
977 rc = pcc_copy_data(file, pcc_filp);
981 pcc_inode_lock(inode);
982 if (lli->lli_pcc_inode)
983 GOTO(out_unlock, rc = -EEXIST);
984 pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE);
985 lli->lli_pcc_inode = pcci;
987 pcc_inode_unlock(inode);
994 pcc_dataset_put(dataset);
997 OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab);
1002 int pcc_readwrite_attach_fini(struct file *file, struct inode *inode,
1003 bool lease_broken, int rc, bool attached)
1005 struct pcc_inode *pcci = ll_i2pcci(inode);
1009 if ((rc || lease_broken) && attached && pcci)
1010 pcc_inode_put(pcci);
1015 int pcc_ioctl_detach(struct inode *inode)
1017 struct ll_inode_info *lli = ll_i2info(inode);
1018 struct pcc_inode *pcci = lli->lli_pcc_inode;
1024 pcc_inode_lock(inode);
1026 GOTO(out_unlock, rc = 0);
1028 count = atomic_read(&pcci->pcci_refcount);
1030 GOTO(out_unlock, rc = -EBUSY);
1031 else if (count == 0)
1032 GOTO(out_unlock, rc = 0);
1034 pcc_inode_put(pcci);
1035 lli->lli_pcc_inode = NULL;
1037 pcc_inode_unlock(inode);
1042 int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state)
1048 int buf_len = sizeof(state->pccs_path);
1049 struct pcc_inode *pcci;
1056 OBD_ALLOC(buf, buf_len);
1060 pcc_inode_lock(inode);
1061 pcci = ll_i2pcci(inode);
1063 state->pccs_type = LU_PCC_NONE;
1064 GOTO(out_unlock, rc = 0);
1067 count = atomic_read(&pcci->pcci_refcount);
1069 state->pccs_type = LU_PCC_NONE;
1070 GOTO(out_unlock, rc = 0);
1072 state->pccs_type = pcci->pcci_type;
1073 state->pccs_open_count = count - 1;
1074 state->pccs_flags = pcci->pcci_attr_valid ?
1075 PCC_STATE_FLAG_ATTR_VALID : 0;
1076 #ifdef HAVE_DENTRY_PATH_RAW
1077 path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len);
1079 GOTO(out_unlock, rc = PTR_ERR(path));
1084 if (strlcpy(state->pccs_path, path, buf_len) >= buf_len)
1085 GOTO(out_unlock, rc = -ENAMETOOLONG);
1088 pcc_inode_unlock(inode);
1089 OBD_FREE(buf, buf_len);