Whamcloud - gitweb
LU-10092 llite: Add persistent cache on client
[fs/lustre-release.git] / lustre / llite / file.c
index 6f26724..e340c71 100644 (file)
@@ -58,6 +58,11 @@ struct split_param {
        __u16           sp_mirror_id;
 };
 
+struct pcc_param {
+       __u64   pa_data_version;
+       __u32   pa_archive_id;
+};
+
 static int
 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 
@@ -73,6 +78,7 @@ static struct ll_file_data *ll_file_data_get(void)
                return NULL;
 
        fd->fd_write_failed = false;
+       pcc_file_init(&fd->fd_pcc_file);
 
        return fd;
 }
@@ -190,6 +196,17 @@ static int ll_close_inode_openhandle(struct inode *inode,
                break;
        }
 
+       case MDS_PCC_ATTACH: {
+               struct pcc_param *param = data;
+
+               LASSERT(data != NULL);
+               op_data->op_bias |= MDS_HSM_RELEASE | MDS_PCC_ATTACH;
+               op_data->op_archive_id = param->pa_archive_id;
+               op_data->op_data_version = param->pa_data_version;
+               op_data->op_lease_handle = och->och_lease_handle;
+               break;
+       }
+
        case MDS_HSM_RELEASE:
                LASSERT(data != NULL);
                op_data->op_bias |= MDS_HSM_RELEASE;
@@ -372,6 +389,8 @@ int ll_file_release(struct inode *inode, struct file *file)
                RETURN(0);
        }
 
+       pcc_file_release(inode, file);
+
        if (!S_ISDIR(inode->i_mode)) {
                if (lli->lli_clob != NULL)
                        lov_read_and_clear_async_rc(lli->lli_clob);
@@ -813,6 +832,10 @@ restart:
                if (rc)
                        GOTO(out_och_free, rc);
        }
+       rc = pcc_file_open(inode, file);
+       if (rc)
+               GOTO(out_och_free, rc);
+
        mutex_unlock(&lli->lli_och_mutex);
         fd = NULL;
 
@@ -837,6 +860,7 @@ out_och_free:
 out_openerr:
                if (lli->lli_opendir_key == fd)
                        ll_deauthorize_statahead(inode, fd);
+
                if (fd != NULL)
                        ll_file_data_put(fd);
         } else {
@@ -1617,6 +1641,22 @@ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
        ssize_t result;
        ssize_t rc2;
        __u16 refcheck;
+       bool cached = false;
+
+       /**
+        * Currently when PCC read failed, we do not fall back to the
+        * normal read path, just return the error.
+        * The resaon is that: for RW-PCC, the file data may be modified
+        * in the PCC and inconsistent with the data on OSTs (or file
+        * data has been removed from the Lustre file system), at this
+        * time, fallback to the normal read path may read the wrong
+        * data.
+        * TODO: for RO-PCC (readonly PCC), fall back to normal read
+        * path: read data from data copy on OSTs.
+        */
+       result = pcc_file_read_iter(iocb, to, &cached);
+       if (cached)
+               return result;
 
        ll_ras_enter(iocb->ki_filp);
 
@@ -1712,9 +1752,24 @@ static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
        struct lu_env *env;
        ssize_t rc_tiny = 0, rc_normal;
        __u16 refcheck;
+       bool cached = false;
+       int result;
 
        ENTRY;
 
+       /**
+        * When PCC write failed, we do not fall back to the normal
+        * write path, just return the error. The reason is that:
+        * PCC is actually a HSM device, and HSM does not handle the
+        * failure especially -ENOSPC due to space used out; Moreover,
+        * the fallback to normal I/O path for ENOSPC failure, needs
+        * to restore the file data to OSTs first and redo the write
+        * again, making the logic of PCC very complex.
+        */
+       result = pcc_file_write_iter(iocb, from, &cached);
+       if (cached)
+               return result;
+
        /* NB: we can't do direct IO for tiny writes because they use the page
         * cache, we can't do sync writes because tiny writes can't flush
         * pages, and we can't do append writes because we can't guarantee the
@@ -1895,8 +1950,16 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
         struct vvp_io_args *args;
         ssize_t             result;
        __u16               refcheck;
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(in_file);
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
+
         ENTRY;
 
+       /* pcc cache path */
+       if (pcc_file && file_inode(pcc_file)->i_fop->splice_read)
+               return file_inode(pcc_file)->i_fop->splice_read(pcc_file,
+                                               ppos, pipe, count, flags);
+
        ll_ras_enter(in_file);
 
         env = cl_env_get(&refcheck);
@@ -3105,13 +3168,16 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct obd_client_handle *och = NULL;
        struct split_param sp;
-       bool lease_broken;
+       struct pcc_param param;
+       bool lease_broken = false;
        fmode_t fmode = 0;
        enum mds_op_bias bias = 0;
        struct file *layout_file = NULL;
        void *data = NULL;
        size_t data_size = 0;
-       long rc;
+       bool attached = false;
+       long rc, rc2 = 0;
+
        ENTRY;
 
        mutex_lock(&lli->lli_och_mutex);
@@ -3122,22 +3188,22 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
        mutex_unlock(&lli->lli_och_mutex);
 
        if (och == NULL)
-               GOTO(out, rc = -ENOLCK);
+               RETURN(-ENOLCK);
 
        fmode = och->och_flags;
 
        switch (ioc->lil_flags) {
        case LL_LEASE_RESYNC_DONE:
                if (ioc->lil_count > IOC_IDS_MAX)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                data_size = offsetof(typeof(*ioc), lil_ids[ioc->lil_count]);
                OBD_ALLOC(data, data_size);
                if (!data)
-                       GOTO(out, rc = -ENOMEM);
+                       GOTO(out_lease_close, rc = -ENOMEM);
 
                if (copy_from_user(data, (void __user *)arg, data_size))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                bias = MDS_CLOSE_RESYNC_DONE;
                break;
@@ -3145,19 +3211,19 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                int fd;
 
                if (ioc->lil_count != 1)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                arg += sizeof(*ioc);
                if (copy_from_user(&fd, (void __user *)arg, sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                layout_file = fget(fd);
                if (!layout_file)
-                       GOTO(out, rc = -EBADF);
+                       GOTO(out_lease_close, rc = -EBADF);
 
                if ((file->f_flags & O_ACCMODE) == O_RDONLY ||
                                (layout_file->f_flags & O_ACCMODE) == O_RDONLY)
-                       GOTO(out, rc = -EPERM);
+                       GOTO(out_lease_close, rc = -EPERM);
 
                data = file_inode(layout_file);
                bias = MDS_CLOSE_LAYOUT_MERGE;
@@ -3168,20 +3234,20 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                int mirror_id;
 
                if (ioc->lil_count != 2)
-                       GOTO(out, rc = -EINVAL);
+                       GOTO(out_lease_close, rc = -EINVAL);
 
                arg += sizeof(*ioc);
                if (copy_from_user(&fdv, (void __user *)arg, sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                arg += sizeof(__u32);
                if (copy_from_user(&mirror_id, (void __user *)arg,
                                   sizeof(__u32)))
-                       GOTO(out, rc = -EFAULT);
+                       GOTO(out_lease_close, rc = -EFAULT);
 
                layout_file = fget(fdv);
                if (!layout_file)
-                       GOTO(out, rc = -EBADF);
+                       GOTO(out_lease_close, rc = -EBADF);
 
                sp.sp_inode = file_inode(layout_file);
                sp.sp_mirror_id = (__u16)mirror_id;
@@ -3189,11 +3255,35 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                bias = MDS_CLOSE_LAYOUT_SPLIT;
                break;
        }
+       case LL_LEASE_PCC_ATTACH:
+               if (ioc->lil_count != 1)
+                       RETURN(-EINVAL);
+
+               arg += sizeof(*ioc);
+               if (copy_from_user(&param.pa_archive_id, (void __user *)arg,
+                                  sizeof(__u32)))
+                       GOTO(out_lease_close, rc2 = -EFAULT);
+
+               rc2 = pcc_readwrite_attach(file, inode, param.pa_archive_id);
+               if (rc2)
+                       GOTO(out_lease_close, rc2);
+
+               attached = true;
+               /* Grab latest data version */
+               rc2 = ll_data_version(inode, &param.pa_data_version,
+                                    LL_DV_WR_FLUSH);
+               if (rc2)
+                       GOTO(out_lease_close, rc2);
+
+               data = &param;
+               bias = MDS_PCC_ATTACH;
+               break;
        default:
                /* without close intent */
                break;
        }
 
+out_lease_close:
        rc = ll_lease_close_intent(och, inode, &lease_broken, bias, data);
        if (rc < 0)
                GOTO(out, rc);
@@ -3217,6 +3307,12 @@ out:
                if (layout_file)
                        fput(layout_file);
                break;
+       case LL_LEASE_PCC_ATTACH:
+               if (!rc)
+                       rc = rc2;
+               rc = pcc_readwrite_attach_fini(file, inode, lease_broken,
+                                              rc, attached);
+               break;
        }
 
        if (!rc)
@@ -3740,6 +3836,29 @@ out_ladvise:
                rc = ll_heat_set(inode, flags);
                RETURN(rc);
        }
+       case LL_IOC_PCC_STATE: {
+               struct lu_pcc_state __user *ustate =
+                       (struct lu_pcc_state __user *)arg;
+               struct lu_pcc_state *state;
+
+               OBD_ALLOC_PTR(state);
+               if (state == NULL)
+                       RETURN(-ENOMEM);
+
+               if (copy_from_user(state, ustate, sizeof(*state)))
+                       GOTO(out_state, rc = -EFAULT);
+
+               rc = pcc_ioctl_state(inode, state);
+               if (rc)
+                       GOTO(out_state, rc);
+
+               if (copy_to_user(ustate, state, sizeof(*state)))
+                       GOTO(out_state, rc = -EFAULT);
+
+out_state:
+               OBD_FREE_PTR(state);
+               RETURN(rc);
+       }
        default:
                RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                     (void __user *)arg));
@@ -3936,7 +4055,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
 #endif
        struct inode *inode = dentry->d_inode;
        struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
        struct ptlrpc_request *req;
+       struct file *pcc_file = fd->fd_pcc_file.pccf_file;
        int rc, err;
        ENTRY;
 
@@ -3944,6 +4065,19 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
               PFID(ll_inode2fid(inode)), inode);
        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
 
+       /* pcc cache path */
+       if (pcc_file)
+#ifdef HAVE_FILE_FSYNC_4ARGS
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       start, end, datasync);
+#elif defined(HAVE_FILE_FSYNC_2ARGS)
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       datasync);
+#else
+               return file_inode(pcc_file)->i_fop->fsync(pcc_file,
+                                       dentry, datasync);
+#endif
+
 #ifdef HAVE_FILE_FSYNC_4ARGS
        rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
        inode_lock(inode);
@@ -4503,27 +4637,8 @@ static int ll_merge_md_attr(struct inode *inode)
        RETURN(0);
 }
 
-static inline dev_t ll_compat_encode_dev(dev_t dev)
-{
-       /* The compat_sys_*stat*() syscalls will fail unless the
-        * device majors and minors are both less than 256. Note that
-        * the value returned here will be passed through
-        * old_encode_dev() in cp_compat_stat(). And so we are not
-        * trying to return a valid compat (u16) device number, just
-        * one that will pass the old_valid_dev() check. */
-
-       return MKDEV(MAJOR(dev) & 0xff, MINOR(dev) & 0xff);
-}
-
-#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
-int ll_getattr(const struct path *path, struct kstat *stat,
-              u32 request_mask, unsigned int flags)
-{
-       struct dentry *de = path->dentry;
-#else
-int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
+int ll_getattr_dentry(struct dentry *de, struct kstat *stat)
 {
-#endif
        struct inode *inode = de->d_inode;
        struct ll_sb_info *sbi = ll_i2sbi(inode);
        struct ll_inode_info *lli = ll_i2info(inode);
@@ -4536,6 +4651,11 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
                RETURN(rc);
 
        if (S_ISREG(inode->i_mode)) {
+               bool cached = false;
+
+               rc = pcc_inode_getattr(inode, &cached);
+               if (cached && rc < 0)
+                       RETURN(rc);
                /* In case of restore, the MDT has the right size and has
                 * already send it back without granting the layout lock,
                 * inode is up-to-date so glimpse is useless.
@@ -4543,7 +4663,7 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
                 * restore the MDT holds the layout lock so the glimpse will
                 * block up to the end of restore (getattr will block)
                 */
-               if (!ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
+               if (!cached && !ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
                        rc = ll_glimpse_size(inode);
                        if (rc < 0)
                                RETURN(rc);
@@ -4589,6 +4709,18 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
         return 0;
 }
 
+#ifdef HAVE_INODEOPS_ENHANCED_GETATTR
+int ll_getattr(const struct path *path, struct kstat *stat,
+              u32 request_mask, unsigned int flags)
+{
+       struct dentry *de = path->dentry;
+#else
+int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
+{
+#endif
+       return ll_getattr_dentry(de, stat);
+}
+
 static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
                     __u64 start, __u64 len)
 {