/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 only, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License version 2 for more details (a copy is included * in the LICENSE file that accompanied this code). * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* * Copyright (c) 2017, DDN Storage Corporation. */ /* * Persistent Client Cache * * PCC is a new framework which provides a group of local cache on Lustre * client side. It works in two modes: RW-PCC enables a read-write cache on the * local SSDs of a single client; RO-PCC provides a read-only cache on the * local SSDs of multiple clients. Less overhead is visible to the applications * and network latencies and lock conflicts can be significantly reduced. * * For RW-PCC, no global namespace will be provided. Each client uses its own * local storage as a cache for itself. Local file system is used to manage * the data on local caches. Cached I/O is directed to local file system while * normal I/O is directed to OSTs. RW-PCC uses HSM for data synchronization. * It uses HSM copytool to restore file from local caches to Lustre OSTs. Each * PCC has a copytool instance running with unique archive number. Any remote * access from another Lustre client would trigger the data synchronization. If * a client with RW-PCC goes offline, the cached data becomes inaccessible for * other client temporarily. And after the RW-PCC client reboots and the * copytool restarts, the data will be accessible again. * * Following is what will happen in different conditions for RW-PCC: * * > When file is being created on RW-PCC * * A normal HSM released file is created on MDT; * An empty mirror file is created on local cache; * The HSM status of the Lustre file will be set to archived and released; * The archive number will be set to the proper value. * * > When file is being prefetched to RW-PCC * * An file is copied to the local cache; * The HSM status of the Lustre file will be set to archived and released; * The archive number will be set to the proper value. * * > When file is being accessed from PCC * * Data will be read directly from local cache; * Metadata will be read from MDT, except file size; * File size will be got from local cache. * * > When PCC cached file is being accessed on another client * * RW-PCC cached files are automatically restored when a process on another * client tries to read or modify them. The corresponding I/O will block * waiting for the released file to be restored. This is transparent to the * process. * * For RW-PCC, when a file is being created, a rule-based policy is used to * determine whether it will be cached. Rule-based caching of newly created * files can determine which file can use a cache on PCC directly without any * admission control. * * RW-PCC design can accelerate I/O intensive applications with one-to-one * mappings between files and accessing clients. However, in several use cases, * files will never be updated, but need to be read simultaneously from many * clients. RO-PCC implements a read-only caching on Lustre clients using * SSDs. RO-PCC is based on the same framework as RW-PCC, expect * that no HSM mechanism is used. * * The main advantages to use this SSD cache on the Lustre clients via PCC * is that: * - The I/O stack becomes much simpler for the cached data, as there is no * interference with I/Os from other clients, which enables easier * performance optimizations; * - The requirements on the HW inside the client nodes are small, any kind of * SSDs or even HDDs can be used as cache devices; * - Caching reduces the pressure on the object storage targets (OSTs), as * small or random I/Os can be regularized to big sequential I/Os and * temporary files do not even need to be flushed to OSTs. * * PCC can accelerate applications with certain I/O patterns: * - small-sized random writes (< 1MB) from a single client * - repeated read of data that is larger than RAM * - clients with high network latency * * Author: Li Xi * Author: Qian Yingjin */ #define DEBUG_SUBSYSTEM S_LLITE #include "pcc.h" #include #include #include #include "llite_internal.h" struct kmem_cache *pcc_inode_slab; void pcc_super_init(struct pcc_super *super) { spin_lock_init(&super->pccs_lock); INIT_LIST_HEAD(&super->pccs_datasets); } /** * pcc_dataset_add - Add a Cache policy to control which files need be * cached and where it will be cached. * * @super: superblock of pcc * @pathname: root path of pcc * @id: HSM archive ID * @projid: files with specified project ID will be cached. */ static int pcc_dataset_add(struct pcc_super *super, const char *pathname, __u32 archive_id, __u32 projid) { int rc; struct pcc_dataset *dataset; struct pcc_dataset *tmp; bool found = false; OBD_ALLOC_PTR(dataset); if (dataset == NULL) return -ENOMEM; rc = kern_path(pathname, LOOKUP_DIRECTORY, &dataset->pccd_path); if (unlikely(rc)) { OBD_FREE_PTR(dataset); return rc; } strncpy(dataset->pccd_pathname, pathname, PATH_MAX); dataset->pccd_id = archive_id; dataset->pccd_projid = projid; atomic_set(&dataset->pccd_refcount, 1); spin_lock(&super->pccs_lock); list_for_each_entry(tmp, &super->pccs_datasets, pccd_linkage) { if (tmp->pccd_id == archive_id) { found = true; break; } } if (!found) list_add(&dataset->pccd_linkage, &super->pccs_datasets); spin_unlock(&super->pccs_lock); if (found) { pcc_dataset_put(dataset); rc = -EEXIST; } return rc; } struct pcc_dataset * pcc_dataset_get(struct pcc_super *super, __u32 projid, __u32 archive_id) { struct pcc_dataset *dataset; struct pcc_dataset *selected = NULL; if (projid == 0 && archive_id == 0) return NULL; /* * archive ID is unique in the list, projid might be duplicate, * we just return last added one as first priority. */ spin_lock(&super->pccs_lock); list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) { if (projid && dataset->pccd_projid != projid) continue; if (archive_id && dataset->pccd_id != archive_id) continue; atomic_inc(&dataset->pccd_refcount); selected = dataset; break; } spin_unlock(&super->pccs_lock); if (selected) CDEBUG(D_CACHE, "matched projid %u, PCC create\n", selected->pccd_projid); return selected; } void pcc_dataset_put(struct pcc_dataset *dataset) { if (atomic_dec_and_test(&dataset->pccd_refcount)) { path_put(&dataset->pccd_path); OBD_FREE_PTR(dataset); } } static int pcc_dataset_del(struct pcc_super *super, char *pathname) { struct list_head *l, *tmp; struct pcc_dataset *dataset; int rc = -ENOENT; spin_lock(&super->pccs_lock); list_for_each_safe(l, tmp, &super->pccs_datasets) { dataset = list_entry(l, struct pcc_dataset, pccd_linkage); if (strcmp(dataset->pccd_pathname, pathname) == 0) { list_del(&dataset->pccd_linkage); pcc_dataset_put(dataset); rc = 0; break; } } spin_unlock(&super->pccs_lock); return rc; } static void pcc_dataset_dump(struct pcc_dataset *dataset, struct seq_file *m) { seq_printf(m, "%s:\n", dataset->pccd_pathname); seq_printf(m, " rwid: %u\n", dataset->pccd_id); seq_printf(m, " autocache: projid=%u\n", dataset->pccd_projid); } int pcc_super_dump(struct pcc_super *super, struct seq_file *m) { struct pcc_dataset *dataset; spin_lock(&super->pccs_lock); list_for_each_entry(dataset, &super->pccs_datasets, pccd_linkage) { pcc_dataset_dump(dataset, m); } spin_unlock(&super->pccs_lock); return 0; } void pcc_super_fini(struct pcc_super *super) { struct pcc_dataset *dataset, *tmp; list_for_each_entry_safe(dataset, tmp, &super->pccs_datasets, pccd_linkage) { list_del(&dataset->pccd_linkage); pcc_dataset_put(dataset); } } static bool pathname_is_valid(const char *pathname) { /* Needs to be absolute path */ if (pathname == NULL || strlen(pathname) == 0 || strlen(pathname) >= PATH_MAX || pathname[0] != '/') return false; return true; } static struct pcc_cmd * pcc_cmd_parse(char *buffer, unsigned long count) { static struct pcc_cmd *cmd; char *token; char *val; unsigned long tmp; int rc = 0; OBD_ALLOC_PTR(cmd); if (cmd == NULL) GOTO(out, rc = -ENOMEM); /* clear all setting */ if (strncmp(buffer, "clear", 5) == 0) { cmd->pccc_cmd = PCC_CLEAR_ALL; GOTO(out, rc = 0); } val = buffer; token = strsep(&val, " "); if (val == NULL || strlen(val) == 0) GOTO(out_free_cmd, rc = -EINVAL); /* Type of the command */ if (strcmp(token, "add") == 0) cmd->pccc_cmd = PCC_ADD_DATASET; else if (strcmp(token, "del") == 0) cmd->pccc_cmd = PCC_DEL_DATASET; else GOTO(out_free_cmd, rc = -EINVAL); /* Pathname of the dataset */ token = strsep(&val, " "); if ((val == NULL && cmd->pccc_cmd != PCC_DEL_DATASET) || !pathname_is_valid(token)) GOTO(out_free_cmd, rc = -EINVAL); cmd->pccc_pathname = token; if (cmd->pccc_cmd == PCC_ADD_DATASET) { /* archive ID */ token = strsep(&val, " "); if (val == NULL) GOTO(out_free_cmd, rc = -EINVAL); rc = kstrtoul(token, 10, &tmp); if (rc != 0) GOTO(out_free_cmd, rc = -EINVAL); if (tmp == 0) GOTO(out_free_cmd, rc = -EINVAL); cmd->u.pccc_add.pccc_id = tmp; token = val; rc = kstrtoul(token, 10, &tmp); if (rc != 0) GOTO(out_free_cmd, rc = -EINVAL); if (tmp == 0) GOTO(out_free_cmd, rc = -EINVAL); cmd->u.pccc_add.pccc_projid = tmp; } goto out; out_free_cmd: OBD_FREE_PTR(cmd); out: if (rc) cmd = ERR_PTR(rc); return cmd; } int pcc_cmd_handle(char *buffer, unsigned long count, struct pcc_super *super) { int rc = 0; struct pcc_cmd *cmd; cmd = pcc_cmd_parse(buffer, count); if (IS_ERR(cmd)) return PTR_ERR(cmd); switch (cmd->pccc_cmd) { case PCC_ADD_DATASET: rc = pcc_dataset_add(super, cmd->pccc_pathname, cmd->u.pccc_add.pccc_id, cmd->u.pccc_add.pccc_projid); break; case PCC_DEL_DATASET: rc = pcc_dataset_del(super, cmd->pccc_pathname); break; case PCC_CLEAR_ALL: pcc_super_fini(super); break; default: rc = -EINVAL; break; } OBD_FREE_PTR(cmd); return rc; } static inline void pcc_inode_lock(struct inode *inode) { mutex_lock(&ll_i2info(inode)->lli_pcc_lock); } static inline void pcc_inode_unlock(struct inode *inode) { mutex_unlock(&ll_i2info(inode)->lli_pcc_lock); } static void pcc_inode_init(struct pcc_inode *pcci) { atomic_set(&pcci->pcci_refcount, 0); pcci->pcci_type = LU_PCC_NONE; } static void pcc_inode_fini(struct pcc_inode *pcci) { path_put(&pcci->pcci_path); pcci->pcci_type = LU_PCC_NONE; OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab); } static void pcc_inode_get(struct pcc_inode *pcci) { atomic_inc(&pcci->pcci_refcount); } static void pcc_inode_put(struct pcc_inode *pcci) { if (atomic_dec_and_test(&pcci->pcci_refcount)) pcc_inode_fini(pcci); } void pcc_inode_free(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci = lli->lli_pcc_inode; if (pcci) { WARN_ON(atomic_read(&pcci->pcci_refcount) > 1); pcc_inode_put(pcci); lli->lli_pcc_inode = NULL; } } /* * TODO: * As Andreas suggested, we'd better use new layout to * reduce overhead: * (fid->f_oid >> 16 & oxFFFF)/FID */ #define MAX_PCC_DATABASE_PATH (6 * 5 + FID_NOBRACE_LEN + 1) static int pcc_fid2dataset_path(char *buf, int sz, struct lu_fid *fid) { return snprintf(buf, sz, "%04x/%04x/%04x/%04x/%04x/%04x/" DFID_NOBRACE, (fid)->f_oid & 0xFFFF, (fid)->f_oid >> 16 & 0xFFFF, (unsigned int)((fid)->f_seq & 0xFFFF), (unsigned int)((fid)->f_seq >> 16 & 0xFFFF), (unsigned int)((fid)->f_seq >> 32 & 0xFFFF), (unsigned int)((fid)->f_seq >> 48 & 0xFFFF), PFID(fid)); } void pcc_file_init(struct pcc_file *pccf) { pccf->pccf_file = NULL; pccf->pccf_type = LU_PCC_NONE; } int pcc_file_open(struct inode *inode, struct file *file) { struct pcc_inode *pcci; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct pcc_file *pccf = &fd->fd_pcc_file; struct file *pcc_file; struct path *path; struct qstr *dname; int rc = 0; ENTRY; if (!S_ISREG(inode->i_mode)) RETURN(0); pcc_inode_lock(inode); pcci = ll_i2pcci(inode); if (!pcci) GOTO(out_unlock, rc = 0); if (atomic_read(&pcci->pcci_refcount) == 0) GOTO(out_unlock, rc = 0); pcc_inode_get(pcci); WARN_ON(pccf->pccf_file); path = &pcci->pcci_path; dname = &path->dentry->d_name; CDEBUG(D_CACHE, "opening pcc file '%.*s'\n", dname->len, dname->name); #ifdef HAVE_DENTRY_OPEN_USE_PATH pcc_file = dentry_open(path, file->f_flags, current_cred()); #else pcc_file = dentry_open(path->dentry, path->mnt, file->f_flags, current_cred()); #endif if (IS_ERR_OR_NULL(pcc_file)) { rc = pcc_file == NULL ? -EINVAL : PTR_ERR(pcc_file); pcc_inode_put(pcci); } else { pccf->pccf_file = pcc_file; pccf->pccf_type = pcci->pcci_type; } out_unlock: pcc_inode_unlock(inode); RETURN(rc); } void pcc_file_release(struct inode *inode, struct file *file) { struct pcc_inode *pcci; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct pcc_file *pccf; struct path *path; struct qstr *dname; ENTRY; if (!S_ISREG(inode->i_mode) || fd == NULL) RETURN_EXIT; pccf = &fd->fd_pcc_file; pcc_inode_lock(inode); if (pccf->pccf_file == NULL) goto out; pcci = ll_i2pcci(inode); LASSERT(pcci); path = &pcci->pcci_path; dname = &path->dentry->d_name; CDEBUG(D_CACHE, "releasing pcc file \"%.*s\"\n", dname->len, dname->name); pcc_inode_put(pcci); fput(pccf->pccf_file); pccf->pccf_file = NULL; out: pcc_inode_unlock(inode); } ssize_t pcc_file_read_iter(struct kiocb *iocb, struct iov_iter *iter, bool *cached) { struct file *file = iocb->ki_filp; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct pcc_file *pccf = &fd->fd_pcc_file; ssize_t result; ENTRY; if (pccf->pccf_file == NULL) { *cached = false; RETURN(0); } *cached = true; iocb->ki_filp = pccf->pccf_file; result = generic_file_read_iter(iocb, iter); iocb->ki_filp = file; RETURN(result); } static ssize_t __pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter) { struct file *file = iocb->ki_filp; #ifdef HAVE_FILE_OPERATIONS_READ_WRITE_ITER return file->f_op->write_iter(iocb, iter); #else struct iovec iov; struct iov_iter i; ssize_t bytes = 0; iov_for_each(iov, i, *iter) { ssize_t res; res = file->f_op->aio_write(iocb, &iov, 1, iocb->ki_pos); if (-EIOCBQUEUED == res) res = wait_on_sync_kiocb(iocb); if (res <= 0) { if (bytes == 0) bytes = res; break; } bytes += res; if (res < iov.iov_len) break; } if (bytes > 0) iov_iter_advance(iter, bytes); return bytes; #endif } ssize_t pcc_file_write_iter(struct kiocb *iocb, struct iov_iter *iter, bool *cached) { struct file *file = iocb->ki_filp; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct pcc_file *pccf = &fd->fd_pcc_file; ssize_t result; ENTRY; if (pccf->pccf_file == NULL) { *cached = false; RETURN(0); } *cached = true; if (pccf->pccf_type != LU_PCC_READWRITE) RETURN(-EWOULDBLOCK); iocb->ki_filp = pccf->pccf_file; /* Since __pcc_file_write_iter makes write calls via * the normal vfs interface to the local PCC file system, * the inode lock is not needed. */ result = __pcc_file_write_iter(iocb, iter); iocb->ki_filp = file; RETURN(result); } int pcc_inode_setattr(struct inode *inode, struct iattr *attr, bool *cached) { int rc; struct pcc_inode *pcci; struct iattr attr2 = *attr; struct dentry *pcc_dentry; ENTRY; if (!S_ISREG(inode->i_mode)) { *cached = false; RETURN(0); } pcc_inode_lock(inode); pcci = ll_i2pcci(inode); if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0) GOTO(out_unlock, rc = 0); *cached = true; attr2.ia_valid = attr->ia_valid & (ATTR_SIZE | ATTR_ATIME | ATTR_ATIME_SET | ATTR_MTIME | ATTR_MTIME_SET | ATTR_CTIME); pcc_dentry = pcci->pcci_path.dentry; inode_lock(pcc_dentry->d_inode); rc = pcc_dentry->d_inode->i_op->setattr(pcc_dentry, &attr2); inode_unlock(pcc_dentry->d_inode); out_unlock: pcc_inode_unlock(inode); RETURN(rc); } int pcc_inode_getattr(struct inode *inode, bool *cached) { struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci; struct kstat stat; s64 atime; s64 mtime; s64 ctime; int rc; ENTRY; if (!S_ISREG(inode->i_mode)) { *cached = false; RETURN(0); } pcc_inode_lock(inode); pcci = ll_i2pcci(inode); if (pcci == NULL || atomic_read(&pcci->pcci_refcount) == 0) GOTO(out_unlock, rc = 0); *cached = true; rc = ll_vfs_getattr(&pcci->pcci_path, &stat); if (rc) GOTO(out_unlock, rc); ll_inode_size_lock(inode); if (inode->i_atime.tv_sec < lli->lli_atime || lli->lli_update_atime) { inode->i_atime.tv_sec = lli->lli_atime; lli->lli_update_atime = 0; } inode->i_mtime.tv_sec = lli->lli_mtime; inode->i_ctime.tv_sec = lli->lli_ctime; atime = inode->i_atime.tv_sec; mtime = inode->i_mtime.tv_sec; ctime = inode->i_ctime.tv_sec; if (atime < stat.atime.tv_sec) atime = stat.atime.tv_sec; if (ctime < stat.ctime.tv_sec) ctime = stat.ctime.tv_sec; if (mtime < stat.mtime.tv_sec) mtime = stat.mtime.tv_sec; i_size_write(inode, stat.size); inode->i_blocks = stat.blocks; inode->i_atime.tv_sec = atime; inode->i_mtime.tv_sec = mtime; inode->i_ctime.tv_sec = ctime; ll_inode_size_unlock(inode); out_unlock: pcc_inode_unlock(inode); RETURN(rc); } /* Create directory under base if directory does not exist */ static struct dentry * pcc_mkdir(struct dentry *base, const char *name, umode_t mode) { int rc; struct dentry *dentry; struct inode *dir = base->d_inode; inode_lock(dir); dentry = lookup_one_len(name, base, strlen(name)); if (IS_ERR(dentry)) goto out; if (d_is_positive(dentry)) goto out; rc = vfs_mkdir(dir, dentry, mode); if (rc) { dput(dentry); dentry = ERR_PTR(rc); goto out; } out: inode_unlock(dir); return dentry; } static struct dentry * pcc_mkdir_p(struct dentry *root, char *path, umode_t mode) { char *ptr, *entry_name; struct dentry *parent; struct dentry *child = ERR_PTR(-EINVAL); ptr = path; while (*ptr == '/') ptr++; entry_name = ptr; parent = dget(root); while ((ptr = strchr(ptr, '/')) != NULL) { *ptr = '\0'; child = pcc_mkdir(parent, entry_name, mode); *ptr = '/'; if (IS_ERR(child)) break; dput(parent); parent = child; ptr++; entry_name = ptr; } return child; } /* Create file under base. If file already exist, return failure */ static struct dentry * pcc_create(struct dentry *base, const char *name, umode_t mode) { int rc; struct dentry *dentry; struct inode *dir = base->d_inode; inode_lock(dir); dentry = lookup_one_len(name, base, strlen(name)); if (IS_ERR(dentry)) goto out; if (d_is_positive(dentry)) goto out; rc = vfs_create(dir, dentry, mode, LL_VFS_CREATE_FALSE); if (rc) { dput(dentry); dentry = ERR_PTR(rc); goto out; } out: inode_unlock(dir); return dentry; } /* Must be called with pcci->pcci_lock held */ static void pcc_inode_attach_init(struct pcc_dataset *dataset, struct pcc_inode *pcci, struct dentry *dentry, enum lu_pcc_type type) { pcci->pcci_path.mnt = mntget(dataset->pccd_path.mnt); pcci->pcci_path.dentry = dentry; LASSERT(atomic_read(&pcci->pcci_refcount) == 0); atomic_set(&pcci->pcci_refcount, 1); pcci->pcci_type = type; pcci->pcci_attr_valid = false; } static int __pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid, struct dentry **dentry) { char *path; struct dentry *base; struct dentry *child; int rc = 0; OBD_ALLOC(path, MAX_PCC_DATABASE_PATH); if (path == NULL) return -ENOMEM; pcc_fid2dataset_path(path, MAX_PCC_DATABASE_PATH, fid); base = pcc_mkdir_p(dataset->pccd_path.dentry, path, 0700); if (IS_ERR(base)) { rc = PTR_ERR(base); GOTO(out, rc); } snprintf(path, MAX_PCC_DATABASE_PATH, DFID_NOBRACE, PFID(fid)); child = pcc_create(base, path, 0600); if (IS_ERR(child)) { rc = PTR_ERR(child); GOTO(out_base, rc); } *dentry = child; out_base: dput(base); out: OBD_FREE(path, MAX_PCC_DATABASE_PATH); return rc; } int pcc_inode_create(struct pcc_dataset *dataset, struct lu_fid *fid, struct dentry **pcc_dentry) { return __pcc_inode_create(dataset, fid, pcc_dentry); } int pcc_inode_create_fini(struct pcc_dataset *dataset, struct inode *inode, struct dentry *pcc_dentry) { struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci; ENTRY; LASSERT(ll_i2pcci(inode) == NULL); OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); if (pcci == NULL) RETURN(-ENOMEM); pcc_inode_init(pcci); pcc_inode_lock(inode); pcc_inode_attach_init(dataset, pcci, pcc_dentry, LU_PCC_READWRITE); lli->lli_pcc_inode = pcci; pcc_inode_unlock(inode); RETURN(0); } static int pcc_filp_write(struct file *filp, const void *buf, ssize_t count, loff_t *offset) { while (count > 0) { ssize_t size; size = vfs_write(filp, (const void __user *)buf, count, offset); if (size < 0) return size; count -= size; buf += size; } return 0; } static int pcc_copy_data(struct file *src, struct file *dst) { int rc = 0; ssize_t rc2; mm_segment_t oldfs; loff_t pos, offset = 0; size_t buf_len = 1048576; void *buf; ENTRY; OBD_ALLOC_LARGE(buf, buf_len); if (buf == NULL) RETURN(-ENOMEM); oldfs = get_fs(); set_fs(KERNEL_DS); while (1) { pos = offset; rc2 = vfs_read(src, (void __user *)buf, buf_len, &pos); if (rc2 < 0) GOTO(out_fs, rc = rc2); else if (rc2 == 0) break; pos = offset; rc = pcc_filp_write(dst, buf, rc2, &pos); if (rc < 0) GOTO(out_fs, rc); offset += rc2; } out_fs: set_fs(oldfs); OBD_FREE_LARGE(buf, buf_len); RETURN(rc); } int pcc_readwrite_attach(struct file *file, struct inode *inode, __u32 archive_id) { struct pcc_dataset *dataset; struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci; struct dentry *dentry; struct file *pcc_filp; struct path path; int rc; ENTRY; pcc_inode_lock(inode); pcci = ll_i2pcci(inode); if (pcci == NULL) { OBD_SLAB_ALLOC_PTR_GFP(pcci, pcc_inode_slab, GFP_NOFS); if (pcci == NULL) { pcc_inode_unlock(inode); RETURN(-ENOMEM); } pcc_inode_init(pcci); } else if (atomic_read(&pcci->pcci_refcount) > 0) { pcc_inode_unlock(inode); RETURN(-EEXIST); } pcc_inode_unlock(inode); dataset = pcc_dataset_get(&ll_i2sbi(inode)->ll_pcc_super, 0, archive_id); if (dataset == NULL) GOTO(out_free_pcci, rc = -ENOENT); rc = __pcc_inode_create(dataset, &lli->lli_fid, &dentry); if (rc) GOTO(out_dataset_put, rc); path.mnt = dataset->pccd_path.mnt; path.dentry = dentry; #ifdef HAVE_DENTRY_OPEN_USE_PATH pcc_filp = dentry_open(&path, O_TRUNC | O_WRONLY | O_LARGEFILE, current_cred()); #else pcc_filp = dentry_open(path.dentry, path.mnt, O_TRUNC | O_WRONLY | O_LARGEFILE, current_cred()); #endif if (IS_ERR_OR_NULL(pcc_filp)) { rc = pcc_filp == NULL ? -EINVAL : PTR_ERR(pcc_filp); GOTO(out_dentry, rc); } rc = pcc_copy_data(file, pcc_filp); if (rc) GOTO(out_fput, rc); pcc_inode_lock(inode); if (lli->lli_pcc_inode) GOTO(out_unlock, rc = -EEXIST); pcc_inode_attach_init(dataset, pcci, dentry, LU_PCC_READWRITE); lli->lli_pcc_inode = pcci; out_unlock: pcc_inode_unlock(inode); out_fput: fput(pcc_filp); out_dentry: if (rc) dput(dentry); out_dataset_put: pcc_dataset_put(dataset); out_free_pcci: if (rc) OBD_SLAB_FREE_PTR(pcci, pcc_inode_slab); RETURN(rc); } int pcc_readwrite_attach_fini(struct file *file, struct inode *inode, bool lease_broken, int rc, bool attached) { struct pcc_inode *pcci = ll_i2pcci(inode); ENTRY; if ((rc || lease_broken) && attached && pcci) pcc_inode_put(pcci); RETURN(rc); } int pcc_ioctl_detach(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); struct pcc_inode *pcci = lli->lli_pcc_inode; int rc = 0; int count; ENTRY; pcc_inode_lock(inode); if (pcci == NULL) GOTO(out_unlock, rc = 0); count = atomic_read(&pcci->pcci_refcount); if (count > 1) GOTO(out_unlock, rc = -EBUSY); else if (count == 0) GOTO(out_unlock, rc = 0); pcc_inode_put(pcci); lli->lli_pcc_inode = NULL; out_unlock: pcc_inode_unlock(inode); RETURN(rc); } int pcc_ioctl_state(struct inode *inode, struct lu_pcc_state *state) { int rc = 0; int count; char *buf; char *path; int buf_len = sizeof(state->pccs_path); struct pcc_inode *pcci; ENTRY; if (buf_len <= 0) RETURN(-EINVAL); OBD_ALLOC(buf, buf_len); if (buf == NULL) RETURN(-ENOMEM); pcc_inode_lock(inode); pcci = ll_i2pcci(inode); if (pcci == NULL) { state->pccs_type = LU_PCC_NONE; GOTO(out_unlock, rc = 0); } count = atomic_read(&pcci->pcci_refcount); if (count == 0) { state->pccs_type = LU_PCC_NONE; GOTO(out_unlock, rc = 0); } state->pccs_type = pcci->pcci_type; state->pccs_open_count = count - 1; state->pccs_flags = pcci->pcci_attr_valid ? PCC_STATE_FLAG_ATTR_VALID : 0; #ifdef HAVE_DENTRY_PATH_RAW path = dentry_path_raw(pcci->pcci_path.dentry, buf, buf_len); if (IS_ERR(path)) GOTO(out_unlock, rc = PTR_ERR(path)); #else path = "UNKNOWN"; #endif if (strlcpy(state->pccs_path, path, buf_len) >= buf_len) GOTO(out_unlock, rc = -ENAMETOOLONG); out_unlock: pcc_inode_unlock(inode); OBD_FREE(buf, buf_len); RETURN(rc); }