__u16 sp_mirror_id;
};
+struct pcc_param {
+ __u64 pa_data_version;
+ __u32 pa_archive_id;
+ __u32 pa_layout_gen;
+};
+
static int
ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
return NULL;
fd->fd_write_failed = false;
+ pcc_file_init(&fd->fd_pcc_file);
return fd;
}
break;
}
+ case MDS_PCC_ATTACH: {
+ struct pcc_param *param = data;
+
+ LASSERT(data != NULL);
+ op_data->op_bias |= MDS_HSM_RELEASE | MDS_PCC_ATTACH;
+ op_data->op_archive_id = param->pa_archive_id;
+ op_data->op_data_version = param->pa_data_version;
+ op_data->op_lease_handle = och->och_lease_handle;
+ break;
+ }
+
case MDS_HSM_RELEASE:
LASSERT(data != NULL);
op_data->op_bias |= MDS_HSM_RELEASE;
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
if (!(body->mbo_valid & OBD_MD_CLOSE_INTENT_EXECED))
rc = -EBUSY;
+
+ if (bias & MDS_PCC_ATTACH) {
+ struct pcc_param *param = data;
+
+ param->pa_layout_gen = body->mbo_layout_gen;
+ }
}
ll_finish_md_op_data(op_data);
RETURN(0);
}
+ pcc_file_release(inode, file);
+
if (!S_ISDIR(inode->i_mode)) {
if (lli->lli_clob != NULL)
lov_read_and_clear_async_rc(lli->lli_clob);
struct address_space *mapping = inode->i_mapping;
struct page *vmpage;
struct niobuf_remote *rnb;
+ struct mdt_body *body;
char *data;
- struct lustre_handle lockh;
- struct ldlm_lock *lock;
unsigned long index, start;
struct niobuf_local lnb;
- bool dom_lock = false;
ENTRY;
if (obj == NULL)
RETURN_EXIT;
- if (it->it_lock_mode != 0) {
- lockh.cookie = it->it_lock_handle;
- lock = ldlm_handle2lock(&lockh);
- if (lock != NULL)
- dom_lock = ldlm_has_dom(lock);
- LDLM_LOCK_PUT(lock);
- }
- if (!dom_lock)
- RETURN_EXIT;
-
if (!req_capsule_has_field(&req->rq_pill, &RMF_NIOBUF_INLINE,
RCL_SERVER))
RETURN_EXIT;
if (rnb->rnb_offset % PAGE_SIZE)
RETURN_EXIT;
- /* Server returns whole file or just file tail if it fills in
- * reply buffer, in both cases total size should be inode size.
+ /* Server returns whole file or just file tail if it fills in reply
+ * buffer, in both cases total size should be equal to the file size.
*/
- if (rnb->rnb_offset + rnb->rnb_len < i_size_read(inode)) {
- CERROR("%s: server returns off/len %llu/%u < i_size %llu\n",
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (rnb->rnb_offset + rnb->rnb_len != body->mbo_dom_size) {
+ CERROR("%s: server returns off/len %llu/%u but size %llu\n",
ll_i2sbi(inode)->ll_fsname, rnb->rnb_offset,
- rnb->rnb_len, i_size_read(inode));
+ rnb->rnb_len, body->mbo_dom_size);
RETURN_EXIT;
}
- CDEBUG(D_INFO, "Get data along with open at %llu len %i, i_size %llu\n",
- rnb->rnb_offset, rnb->rnb_len, i_size_read(inode));
+ CDEBUG(D_INFO, "Get data along with open at %llu len %i, size %llu\n",
+ rnb->rnb_offset, rnb->rnb_len, body->mbo_dom_size);
data = (char *)rnb + sizeof(*rnb);
rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
if (!rc && itp->it_lock_mode) {
- ll_dom_finish_open(de->d_inode, req, itp);
+ struct lustre_handle handle = {.cookie = itp->it_lock_handle};
+ struct ldlm_lock *lock;
+ bool has_dom_bit = false;
+
+ /* If we got a lock back and it has a LOOKUP bit set,
+ * make sure the dentry is marked as valid so we can find it.
+ * We don't need to care about actual hashing since other bits
+ * of kernel will deal with that later.
+ */
+ lock = ldlm_handle2lock(&handle);
+ if (lock) {
+ has_dom_bit = ldlm_has_dom(lock);
+ if (lock->l_policy_data.l_inodebits.bits &
+ MDS_INODELOCK_LOOKUP)
+ d_lustre_revalidate(de);
+
+ LDLM_LOCK_PUT(lock);
+ }
ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
+ if (has_dom_bit)
+ ll_dom_finish_open(de->d_inode, req, itp);
}
out:
if (rc)
GOTO(out_och_free, rc);
}
+
+ rc = pcc_file_open(inode, file);
+ if (rc)
+ GOTO(out_och_free, rc);
+
mutex_unlock(&lli->lli_och_mutex);
fd = NULL;
out_openerr:
if (lli->lli_opendir_key == fd)
ll_deauthorize_statahead(inode, fd);
+
if (fd != NULL)
ll_file_data_put(fd);
} else {
return false;
}
-static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
+void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
{
struct inode *inode = file_inode(file);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
io->ci_lockreq = CILR_MANDATORY;
}
io->ci_noatime = file_is_noatime(file);
+ io->ci_async_readahead = false;
/* FLR: only use non-delay I/O for read as there is only one
* avaliable mirror for write. */
ssize_t result;
ssize_t rc2;
__u16 refcheck;
+ bool cached;
+
+ if (!iov_iter_count(to))
+ return 0;
+
+ /**
+ * Currently when PCC read failed, we do not fall back to the
+ * normal read path, just return the error.
+ * The resaon is that: for RW-PCC, the file data may be modified
+ * in the PCC and inconsistent with the data on OSTs (or file
+ * data has been removed from the Lustre file system), at this
+ * time, fallback to the normal read path may read the wrong
+ * data.
+ * TODO: for RO-PCC (readonly PCC), fall back to normal read
+ * path: read data from data copy on OSTs.
+ */
+ result = pcc_file_read_iter(iocb, to, &cached);
+ if (cached)
+ return result;
ll_ras_enter(iocb->ki_filp);
struct lu_env *env;
ssize_t rc_tiny = 0, rc_normal;
__u16 refcheck;
+ bool cached;
+ int result;
ENTRY;
+ if (!iov_iter_count(from))
+ GOTO(out, rc_normal = 0);
+
+ /**
+ * When PCC write failed, we usually do not fall back to the normal
+ * write path, just return the error. But there is a special case when
+ * returned error code is -ENOSPC due to running out of space on PCC HSM
+ * bakcend. At this time, it will fall back to normal I/O path and
+ * retry the I/O. As the file is in HSM released state, it will restore
+ * the file data to OSTs first and redo the write again. And the
+ * restore process will revoke the layout lock and detach the file
+ * from PCC cache automatically.
+ */
+ result = pcc_file_write_iter(iocb, from, &cached);
+ if (cached && result != -ENOSPC && result != -EDQUOT)
+ return result;
+
/* NB: we can't do direct IO for tiny writes because they use the page
* cache, we can't do sync writes because tiny writes can't flush
* pages, and we can't do append writes because we can't guarantee the
if (result)
RETURN(result);
+ if (!iov_count)
+ RETURN(0);
+
# ifdef HAVE_IOV_ITER_INIT_DIRECTION
iov_iter_init(&to, READ, iov, nr_segs, iov_count);
# else /* !HAVE_IOV_ITER_INIT_DIRECTION */
struct iovec iov = { .iov_base = buf, .iov_len = count };
struct kiocb kiocb;
ssize_t result;
+
ENTRY;
+ if (!count)
+ RETURN(0);
+
init_sync_kiocb(&kiocb, file);
kiocb.ki_pos = *ppos;
#ifdef HAVE_KIOCB_KI_LEFT
if (result)
RETURN(result);
+ if (!iov_count)
+ RETURN(0);
+
# ifdef HAVE_IOV_ITER_INIT_DIRECTION
iov_iter_init(&from, WRITE, iov, nr_segs, iov_count);
# else /* !HAVE_IOV_ITER_INIT_DIRECTION */
ENTRY;
+ if (!count)
+ RETURN(0);
+
init_sync_kiocb(&kiocb, file);
kiocb.ki_pos = *ppos;
#ifdef HAVE_KIOCB_KI_LEFT
struct pipe_inode_info *pipe, size_t count,
unsigned int flags)
{
- struct lu_env *env;
- struct vvp_io_args *args;
- ssize_t result;
- __u16 refcheck;
+ struct lu_env *env;
+ struct vvp_io_args *args;
+ ssize_t result;
+ __u16 refcheck;
+ bool cached;
+
ENTRY;
+ result = pcc_file_splice_read(in_file, ppos, pipe,
+ count, flags, &cached);
+ if (cached)
+ RETURN(result);
+
ll_ras_enter(in_file);
- env = cl_env_get(&refcheck);
+ env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
struct ll_inode_info *lli = ll_i2info(inode);
struct obd_client_handle *och = NULL;
struct split_param sp;
- bool lease_broken;
+ struct pcc_param param;
+ bool lease_broken = false;
fmode_t fmode = 0;
enum mds_op_bias bias = 0;
struct file *layout_file = NULL;
void *data = NULL;
size_t data_size = 0;
- long rc;
+ bool attached = false;
+ long rc, rc2 = 0;
+
ENTRY;
mutex_lock(&lli->lli_och_mutex);
mutex_unlock(&lli->lli_och_mutex);
if (och == NULL)
- GOTO(out, rc = -ENOLCK);
+ RETURN(-ENOLCK);
fmode = och->och_flags;
switch (ioc->lil_flags) {
case LL_LEASE_RESYNC_DONE:
if (ioc->lil_count > IOC_IDS_MAX)
- GOTO(out, rc = -EINVAL);
+ GOTO(out_lease_close, rc = -EINVAL);
data_size = offsetof(typeof(*ioc), lil_ids[ioc->lil_count]);
OBD_ALLOC(data, data_size);
if (!data)
- GOTO(out, rc = -ENOMEM);
+ GOTO(out_lease_close, rc = -ENOMEM);
if (copy_from_user(data, (void __user *)arg, data_size))
- GOTO(out, rc = -EFAULT);
+ GOTO(out_lease_close, rc = -EFAULT);
bias = MDS_CLOSE_RESYNC_DONE;
break;
int fd;
if (ioc->lil_count != 1)
- GOTO(out, rc = -EINVAL);
+ GOTO(out_lease_close, rc = -EINVAL);
arg += sizeof(*ioc);
if (copy_from_user(&fd, (void __user *)arg, sizeof(__u32)))
- GOTO(out, rc = -EFAULT);
+ GOTO(out_lease_close, rc = -EFAULT);
layout_file = fget(fd);
if (!layout_file)
- GOTO(out, rc = -EBADF);
+ GOTO(out_lease_close, rc = -EBADF);
if ((file->f_flags & O_ACCMODE) == O_RDONLY ||
(layout_file->f_flags & O_ACCMODE) == O_RDONLY)
- GOTO(out, rc = -EPERM);
+ GOTO(out_lease_close, rc = -EPERM);
data = file_inode(layout_file);
bias = MDS_CLOSE_LAYOUT_MERGE;
int mirror_id;
if (ioc->lil_count != 2)
- GOTO(out, rc = -EINVAL);
+ GOTO(out_lease_close, rc = -EINVAL);
arg += sizeof(*ioc);
if (copy_from_user(&fdv, (void __user *)arg, sizeof(__u32)))
- GOTO(out, rc = -EFAULT);
+ GOTO(out_lease_close, rc = -EFAULT);
arg += sizeof(__u32);
if (copy_from_user(&mirror_id, (void __user *)arg,
sizeof(__u32)))
- GOTO(out, rc = -EFAULT);
+ GOTO(out_lease_close, rc = -EFAULT);
layout_file = fget(fdv);
if (!layout_file)
- GOTO(out, rc = -EBADF);
+ GOTO(out_lease_close, rc = -EBADF);
sp.sp_inode = file_inode(layout_file);
sp.sp_mirror_id = (__u16)mirror_id;
bias = MDS_CLOSE_LAYOUT_SPLIT;
break;
}
+ case LL_LEASE_PCC_ATTACH:
+ if (ioc->lil_count != 1)
+ RETURN(-EINVAL);
+
+ arg += sizeof(*ioc);
+ if (copy_from_user(¶m.pa_archive_id, (void __user *)arg,
+ sizeof(__u32)))
+ GOTO(out_lease_close, rc2 = -EFAULT);
+
+ rc2 = pcc_readwrite_attach(file, inode, param.pa_archive_id);
+ if (rc2)
+ GOTO(out_lease_close, rc2);
+
+ attached = true;
+ /* Grab latest data version */
+ rc2 = ll_data_version(inode, ¶m.pa_data_version,
+ LL_DV_WR_FLUSH);
+ if (rc2)
+ GOTO(out_lease_close, rc2);
+
+ data = ¶m;
+ bias = MDS_PCC_ATTACH;
+ break;
default:
/* without close intent */
break;
}
+out_lease_close:
rc = ll_lease_close_intent(och, inode, &lease_broken, bias, data);
if (rc < 0)
GOTO(out, rc);
if (layout_file)
fput(layout_file);
break;
+ case LL_LEASE_PCC_ATTACH:
+ if (!rc)
+ rc = rc2;
+ rc = pcc_readwrite_attach_fini(file, inode,
+ param.pa_layout_gen,
+ lease_broken, rc,
+ attached);
+ break;
}
if (!rc)
rc = ll_heat_set(inode, flags);
RETURN(rc);
}
+ case LL_IOC_PCC_DETACH: {
+ struct lu_pcc_detach *detach;
+
+ OBD_ALLOC_PTR(detach);
+ if (detach == NULL)
+ RETURN(-ENOMEM);
+
+ if (copy_from_user(detach,
+ (const struct lu_pcc_detach __user *)arg,
+ sizeof(*detach)))
+ GOTO(out_detach_free, rc = -EFAULT);
+
+ if (!S_ISREG(inode->i_mode))
+ GOTO(out_detach_free, rc = -EINVAL);
+
+ if (!inode_owner_or_capable(inode))
+ GOTO(out_detach_free, rc = -EPERM);
+
+ rc = pcc_ioctl_detach(inode, detach->pccd_opt);
+out_detach_free:
+ OBD_FREE_PTR(detach);
+ RETURN(rc);
+ }
+ case LL_IOC_PCC_STATE: {
+ struct lu_pcc_state __user *ustate =
+ (struct lu_pcc_state __user *)arg;
+ struct lu_pcc_state *state;
+
+ OBD_ALLOC_PTR(state);
+ if (state == NULL)
+ RETURN(-ENOMEM);
+
+ if (copy_from_user(state, ustate, sizeof(*state)))
+ GOTO(out_state, rc = -EFAULT);
+
+ rc = pcc_ioctl_state(file, inode, state);
+ if (rc)
+ GOTO(out_state, rc);
+
+ if (copy_to_user(ustate, state, sizeof(*state)))
+ GOTO(out_state, rc = -EFAULT);
+
+out_state:
+ OBD_FREE_PTR(state);
+ RETURN(rc);
+ }
default:
RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
(void __user *)arg));
struct ll_inode_info *lli = ll_i2info(inode);
struct ptlrpc_request *req;
int rc, err;
+
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
if (S_ISREG(inode->i_mode)) {
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ bool cached;
- err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
+ /* Sync metadata on MDT first, and then sync the cached data
+ * on PCC.
+ */
+ err = pcc_fsync(file, start, end, datasync, &cached);
+ if (!cached)
+ err = cl_sync_file_range(inode, start, end,
+ CL_FSYNC_ALL, 0);
if (rc == 0 && err < 0)
rc = err;
if (rc < 0)
if (!(exp_connect_flags2(ll_i2sbi(parent)->ll_md_exp) &
OBD_CONNECT2_DIR_MIGRATE)) {
if (le32_to_cpu(lum->lum_stripe_count) > 1 ||
- ll_i2info(child_inode)->lli_lsm_md) {
+ ll_dir_striped(child_inode)) {
CERROR("%s: MDT doesn't support stripe directory "
"migration!\n", ll_i2sbi(parent)->ll_fsname);
GOTO(out_iput, rc = -EOPNOTSUPP);
static int
ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
{
- ENTRY;
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ ENTRY;
- RETURN(-ENOSYS);
+ /*
+ * In order to avoid flood of warning messages, only print one message
+ * for one file. And the entire message rate on the client is limited
+ * by CDEBUG_LIMIT too.
+ */
+ if (!(fd->fd_flags & LL_FILE_FLOCK_WARNING)) {
+ fd->fd_flags |= LL_FILE_FLOCK_WARNING;
+ CDEBUG_LIMIT(D_TTY | D_CONSOLE,
+ "flock disabled, mount with '-o [local]flock' to enable\r\n");
+ }
+ RETURN(-ENOSYS);
}
/**
/* If it is striped directory, and there is bad stripe
* Let's revalidate the dentry again, instead of returning
* error */
- if (S_ISDIR(inode->i_mode) &&
- ll_i2info(inode)->lli_lsm_md != NULL)
+ if (ll_dir_striped(inode))
return 0;
/* This path cannot be hit for regular files unless in
LASSERT(lli->lli_lsm_md != NULL);
- /* foreign dir is not striped dir */
- if (lli->lli_lsm_md->lsm_md_magic == LMV_MAGIC_FOREIGN)
+ if (!lmv_dir_striped(lli->lli_lsm_md))
RETURN(0);
down_read(&lli->lli_lsm_sem);
RETURN(0);
}
-static inline dev_t ll_compat_encode_dev(dev_t dev)
+int ll_getattr_dentry(struct dentry *de, struct kstat *stat)
{
- /* The compat_sys_*stat*() syscalls will fail unless the
- * device majors and minors are both less than 256. Note that
- * the value returned here will be passed through
- * old_encode_dev() in cp_compat_stat(). And so we are not
- * trying to return a valid compat (u16) device number, just
- * one that will pass the old_valid_dev() check. */
-
- return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
-}
-
-#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
-int ll_getattr(const struct path *path, struct kstat *stat,
- u32 request_mask, unsigned int flags)
-{
- struct dentry *de = path->dentry;
-#else
-int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
-{
-#endif
struct inode *inode = de->d_inode;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_inode_info *lli = ll_i2info(inode);
RETURN(rc);
if (S_ISREG(inode->i_mode)) {
+ bool cached;
+
+ rc = pcc_inode_getattr(inode, &cached);
+ if (cached && rc < 0)
+ RETURN(rc);
+
/* In case of restore, the MDT has the right size and has
* already send it back without granting the layout lock,
* inode is up-to-date so glimpse is useless.
* restore the MDT holds the layout lock so the glimpse will
* block up to the end of restore (getattr will block)
*/
- if (!ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
+ if (!cached && !ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
rc = ll_glimpse_size(inode);
if (rc < 0)
RETURN(rc);
}
} else {
/* If object isn't regular a file then don't validate size. */
- if (S_ISDIR(inode->i_mode) &&
- lli->lli_lsm_md != NULL) {
+ if (ll_dir_striped(inode)) {
rc = ll_merge_md_attr(inode);
if (rc < 0)
RETURN(rc);
return 0;
}
+#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
+int ll_getattr(const struct path *path, struct kstat *stat,
+ u32 request_mask, unsigned int flags)
+{
+ struct dentry *de = path->dentry;
+#else
+int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
+{
+#endif
+ return ll_getattr_dentry(de, stat);
+}
+
static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
__u64 start, __u64 len)
{
/* mostly layout lock is caching on the local side, so try to
* match it before grabbing layout lock mutex. */
mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
- LCK_CR | LCK_CW | LCK_PR | LCK_PW);
+ LCK_CR | LCK_CW | LCK_PR |
+ LCK_PW | LCK_EX);
if (mode != 0) { /* hit cached lock */
rc = ll_layout_lock_set(&lockh, mode, inode);
if (rc == -EAGAIN)