Whamcloud - gitweb
LU-10810 clio: SEEK_HOLE/SEEK_DATA on client side
[fs/lustre-release.git] / lustre / llite / file.c
index aac8685..675a5a1 100644 (file)
@@ -46,6 +46,7 @@
 #include <linux/falloc.h>
 
 #include <uapi/linux/lustre/lustre_ioctl.h>
+#include <uapi/linux/llcrypt.h>
 #include <lustre_swab.h>
 
 #include "cl_object.h"
@@ -422,6 +423,9 @@ static inline int ll_dom_readpage(void *data, struct page *page)
 {
        struct niobuf_local *lnb = data;
        void *kaddr;
+       int rc = 0;
+
+       struct inode *inode = page2inode(page);
 
        kaddr = kmap_atomic(page);
        memcpy(kaddr, lnb->lnb_data, lnb->lnb_len);
@@ -431,14 +435,40 @@ static inline int ll_dom_readpage(void *data, struct page *page)
        flush_dcache_page(page);
        SetPageUptodate(page);
        kunmap_atomic(kaddr);
+
+       if (inode && IS_ENCRYPTED(inode) && S_ISREG(inode->i_mode)) {
+               if (!llcrypt_has_encryption_key(inode))
+                       CDEBUG(D_SEC, "no enc key for "DFID"\n",
+                              PFID(ll_inode2fid(inode)));
+               else {
+                       unsigned int offs = 0;
+
+                       while (offs < PAGE_SIZE) {
+                               /* decrypt only if page is not empty */
+                               if (memcmp(page_address(page) + offs,
+                                          page_address(ZERO_PAGE(0)),
+                                          LUSTRE_ENCRYPTION_UNIT_SIZE) == 0)
+                                       break;
+
+                               rc = llcrypt_decrypt_pagecache_blocks(page,
+                                                   LUSTRE_ENCRYPTION_UNIT_SIZE,
+                                                                     offs);
+                               if (rc)
+                                       break;
+
+                               offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
+                       }
+               }
+       }
        unlock_page(page);
 
-       return 0;
+       return rc;
 }
 
-void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
-                       struct lookup_intent *it)
+void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req)
 {
+       struct lu_env *env;
+       struct cl_io *io;
        struct ll_inode_info *lli = ll_i2info(inode);
        struct cl_object *obj = lli->lli_clob;
        struct address_space *mapping = inode->i_mapping;
@@ -448,6 +478,8 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
        char *data;
        unsigned long index, start;
        struct niobuf_local lnb;
+       __u16 refcheck;
+       int rc;
 
        ENTRY;
 
@@ -474,13 +506,24 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
         * buffer, in both cases total size should be equal to the file size.
         */
        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-       if (rnb->rnb_offset + rnb->rnb_len != body->mbo_dom_size) {
+       if (rnb->rnb_offset + rnb->rnb_len != body->mbo_dom_size &&
+           !(inode && IS_ENCRYPTED(inode))) {
                CERROR("%s: server returns off/len %llu/%u but size %llu\n",
                       ll_i2sbi(inode)->ll_fsname, rnb->rnb_offset,
                       rnb->rnb_len, body->mbo_dom_size);
                RETURN_EXIT;
        }
 
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN_EXIT;
+       io = vvp_env_thread_io(env);
+       io->ci_obj = obj;
+       io->ci_ignore_layout = 1;
+       rc = cl_io_init(env, io, CIT_MISC, obj);
+       if (rc)
+               GOTO(out_io, rc);
+
        CDEBUG(D_INFO, "Get data along with open at %llu len %i, size %llu\n",
               rnb->rnb_offset, rnb->rnb_len, body->mbo_dom_size);
 
@@ -492,6 +535,8 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
        LASSERT((lnb.lnb_file_offset & ~PAGE_MASK) == 0);
        lnb.lnb_page_offset = 0;
        do {
+               struct cl_page *page;
+
                lnb.lnb_data = data + (index << PAGE_SHIFT);
                lnb.lnb_len = rnb->rnb_len - (index << PAGE_SHIFT);
                if (lnb.lnb_len > PAGE_SIZE)
@@ -507,9 +552,33 @@ void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
                              PTR_ERR(vmpage));
                        break;
                }
+               lock_page(vmpage);
+               if (vmpage->mapping == NULL) {
+                       unlock_page(vmpage);
+                       put_page(vmpage);
+                       /* page was truncated */
+                       break;
+               }
+               /* attach VM page to CL page cache */
+               page = cl_page_find(env, obj, vmpage->index, vmpage,
+                                   CPT_CACHEABLE);
+               if (IS_ERR(page)) {
+                       ClearPageUptodate(vmpage);
+                       unlock_page(vmpage);
+                       put_page(vmpage);
+                       break;
+               }
+               cl_page_export(env, page, 1);
+               cl_page_put(env, page);
+               unlock_page(vmpage);
                put_page(vmpage);
                index++;
        } while (rnb->rnb_len > (index << PAGE_SHIFT));
+
+out_io:
+       cl_io_fini(env, io);
+       cl_env_put(env, &refcheck);
+
        EXIT;
 }
 
@@ -591,27 +660,21 @@ retry:
        rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
 
        if (!rc && itp->it_lock_mode) {
-               struct lustre_handle handle = {.cookie = itp->it_lock_handle};
-               struct ldlm_lock *lock;
-               bool has_dom_bit = false;
+               __u64 bits = 0;
 
                /* If we got a lock back and it has a LOOKUP bit set,
                 * make sure the dentry is marked as valid so we can find it.
                 * We don't need to care about actual hashing since other bits
                 * of kernel will deal with that later.
                 */
-               lock = ldlm_handle2lock(&handle);
-               if (lock) {
-                       has_dom_bit = ldlm_has_dom(lock);
-                       if (lock->l_policy_data.l_inodebits.bits &
-                           MDS_INODELOCK_LOOKUP)
-                               d_lustre_revalidate(de);
-
-                       LDLM_LOCK_PUT(lock);
-               }
-               ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
-               if (has_dom_bit)
-                       ll_dom_finish_open(de->d_inode, req, itp);
+               ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, &bits);
+               if (bits & MDS_INODELOCK_LOOKUP)
+                       d_lustre_revalidate(de);
+               /* if DoM bit returned along with LAYOUT bit then there
+                * can be read-on-open data returned.
+                */
+               if (bits & MDS_INODELOCK_DOM && bits & MDS_INODELOCK_LAYOUT)
+                       ll_dom_finish_open(de->d_inode, req);
        }
 
 out:
@@ -706,6 +769,12 @@ int ll_file_open(struct inode *inode, struct file *file)
        it = file->private_data; /* XXX: compat macro */
        file->private_data = NULL; /* prevent ll_local_open assertion */
 
+       if (S_ISREG(inode->i_mode)) {
+               rc = llcrypt_file_open(inode, file);
+               if (rc)
+                       GOTO(out_nofiledata, rc);
+       }
+
        fd = ll_file_data_get();
        if (fd == NULL)
                GOTO(out_nofiledata, rc = -ENOMEM);
@@ -1089,8 +1158,8 @@ ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
 
        /* already get lease, handle lease lock */
        ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-       if (it.it_lock_mode == 0 ||
-           it.it_lock_bits != MDS_INODELOCK_OPEN) {
+       if (!it.it_lock_mode ||
+           !(it.it_lock_bits & MDS_INODELOCK_OPEN)) {
                /* open lock must return for lease */
                CERROR(DFID "lease granted but no open lock, %d/%llu.\n",
                        PFID(ll_inode2fid(inode)), it.it_lock_mode,
@@ -1412,8 +1481,8 @@ void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot,
                                           IS_SYNC(inode));
 #ifdef HAVE_GENERIC_WRITE_SYNC_2ARGS
                io->u.ci_wr.wr_sync  |= !!(args &&
-                                          args->via_io_subtype == IO_NORMAL &&
-                                          args->u.normal.via_iocb->ki_flags & IOCB_DSYNC);
+                                          (args->u.normal.via_iocb->ki_flags &
+                                           IOCB_DSYNC));
 #endif
        }
 
@@ -1471,16 +1540,17 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                   struct file *file, enum cl_io_type iot,
                   loff_t *ppos, size_t count)
 {
-       struct vvp_io           *vio = vvp_env_io(env);
-       struct inode            *inode = file_inode(file);
-       struct ll_inode_info    *lli = ll_i2info(inode);
-       struct ll_file_data     *fd  = file->private_data;
-       struct range_lock       range;
-       struct cl_io            *io;
-       ssize_t                 result = 0;
-       int                     rc = 0;
-       unsigned                retried = 0;
-       unsigned                ignore_lockless = 0;
+       struct vvp_io *vio = vvp_env_io(env);
+       struct inode *inode = file_inode(file);
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_file_data *fd  = file->private_data;
+       struct range_lock range;
+       struct cl_io *io;
+       ssize_t result = 0;
+       int rc = 0;
+       unsigned int retried = 0, ignore_lockless = 0;
+       bool is_aio = false;
+       struct cl_dio_aio *ci_aio = NULL;
 
        ENTRY;
 
@@ -1488,9 +1558,19 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                file_dentry(file)->d_name.name,
                iot == CIT_READ ? "read" : "write", *ppos, count);
 
+       io = vvp_env_thread_io(env);
+       if (file->f_flags & O_DIRECT) {
+               if (!is_sync_kiocb(args->u.normal.via_iocb))
+                       is_aio = true;
+               ci_aio = cl_aio_alloc(args->u.normal.via_iocb);
+               if (!ci_aio)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
 restart:
        io = vvp_env_thread_io(env);
        ll_io_init(io, file, iot, args);
+       io->ci_aio = ci_aio;
        io->ci_ignore_lockless = ignore_lockless;
        io->ci_ndelay_tried = retried;
 
@@ -1503,34 +1583,22 @@ restart:
                        range_lock_init(&range, *ppos, *ppos + count - 1);
 
                vio->vui_fd  = file->private_data;
-               vio->vui_io_subtype = args->via_io_subtype;
-
-               switch (vio->vui_io_subtype) {
-               case IO_NORMAL:
-                       vio->vui_iter = args->u.normal.via_iter;
-                       vio->vui_iocb = args->u.normal.via_iocb;
-                       /* Direct IO reads must also take range lock,
-                        * or multiple reads will try to work on the same pages
-                        * See LU-6227 for details. */
-                       if (((iot == CIT_WRITE) ||
-                           (iot == CIT_READ && (file->f_flags & O_DIRECT))) &&
-                           !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-                               CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
-                                      RL_PARA(&range));
-                               rc = range_lock(&lli->lli_write_tree, &range);
-                               if (rc < 0)
-                                       GOTO(out, rc);
+               vio->vui_iter = args->u.normal.via_iter;
+               vio->vui_iocb = args->u.normal.via_iocb;
+               /* Direct IO reads must also take range lock,
+                * or multiple reads will try to work on the same pages
+                * See LU-6227 for details.
+                */
+               if (((iot == CIT_WRITE) ||
+                   (iot == CIT_READ && (file->f_flags & O_DIRECT))) &&
+                   !(vio->vui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+                       CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
+                              RL_PARA(&range));
+                       rc = range_lock(&lli->lli_write_tree, &range);
+                       if (rc < 0)
+                               GOTO(out, rc);
 
-                               range_locked = true;
-                       }
-                       break;
-               case IO_SPLICE:
-                       vio->u.splice.vui_pipe = args->u.splice.via_pipe;
-                       vio->u.splice.vui_flags = args->u.splice.via_flags;
-                       break;
-               default:
-                       CERROR("unknown IO subtype %u\n", vio->vui_io_subtype);
-                       LBUG();
+                       range_locked = true;
                }
 
                ll_cl_add(file, env, io, LCC_RW);
@@ -1547,13 +1615,22 @@ restart:
                rc = io->ci_result;
        }
 
+       /*
+        * In order to move forward AIO, ci_nob was increased,
+        * but that doesn't mean io have been finished, it just
+        * means io have been submited, we will always return
+        * EIOCBQUEUED to the caller, So we could only return
+        * number of bytes in non-AIO case.
+        */
        if (io->ci_nob > 0) {
-               result += io->ci_nob;
-               count  -= io->ci_nob;
-               *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+               if (!is_aio) {
+                       result += io->ci_nob;
+                       *ppos = io->u.ci_wr.wr.crw_pos; /* for splice */
+               }
+               count -= io->ci_nob;
 
                /* prepare IO restart */
-               if (count > 0 && args->via_io_subtype == IO_NORMAL)
+               if (count > 0)
                        args->u.normal.via_iter = vio->vui_iter;
        }
 out:
@@ -1577,6 +1654,27 @@ out:
                goto restart;
        }
 
+       if (io->ci_aio) {
+               /*
+                * VFS will call aio_complete() if no -EIOCBQUEUED
+                * is returned for AIO, so we can not call aio_complete()
+                * in our end_io().
+                */
+               if (rc != -EIOCBQUEUED)
+                       io->ci_aio->cda_no_aio_complete = 1;
+               /**
+                * Drop one extra reference so that end_io() could be
+                * called for this IO context, we could call it after
+                * we make sure all AIO requests have been proceed.
+                */
+               cl_sync_io_note(env, &io->ci_aio->cda_sync,
+                               rc == -EIOCBQUEUED ? 0 : rc);
+               if (!is_aio) {
+                       cl_aio_free(io->ci_aio);
+                       io->ci_aio = NULL;
+               }
+       }
+
        if (iot == CIT_READ) {
                if (result > 0)
                        ll_stats_ops_tally(ll_i2sbi(inode),
@@ -1711,7 +1809,7 @@ static ssize_t ll_file_read_iter(struct kiocb *iocb, struct iov_iter *to)
        if (IS_ERR(env))
                return PTR_ERR(env);
 
-       args = ll_env_args(env, IO_NORMAL);
+       args = ll_env_args(env);
        args->u.normal.via_iter = to;
        args->u.normal.via_iocb = iocb;
 
@@ -1846,7 +1944,7 @@ static ssize_t ll_file_write_iter(struct kiocb *iocb, struct iov_iter *from)
        if (IS_ERR(env))
                return PTR_ERR(env);
 
-       args = ll_env_args(env, IO_NORMAL);
+       args = ll_env_args(env);
        args->u.normal.via_iter = from;
        args->u.normal.via_iocb = iocb;
 
@@ -2018,46 +2116,6 @@ static ssize_t ll_file_write(struct file *file, const char __user *buf,
 }
 #endif /* !HAVE_FILE_OPERATIONS_READ_WRITE_ITER */
 
-/*
- * Send file content (through pagecache) somewhere with helper
- */
-static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
-                                   struct pipe_inode_info *pipe, size_t count,
-                                   unsigned int flags)
-{
-       struct lu_env *env;
-       struct vvp_io_args *args;
-       ssize_t result;
-       __u16 refcheck;
-       bool cached;
-
-       ENTRY;
-
-       result = pcc_file_splice_read(in_file, ppos, pipe,
-                                     count, flags, &cached);
-       if (cached)
-               RETURN(result);
-
-       ll_ras_enter(in_file, *ppos, count);
-
-       env = cl_env_get(&refcheck);
-       if (IS_ERR(env))
-               RETURN(PTR_ERR(env));
-
-       args = ll_env_args(env, IO_SPLICE);
-       args->u.splice.via_pipe = pipe;
-       args->u.splice.via_flags = flags;
-
-       result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
-       cl_env_put(env, &refcheck);
-
-       if (result > 0)
-               ll_rw_stats_tally(ll_i2sbi(file_inode(in_file)), current->pid,
-                                 in_file->private_data, *ppos, result,
-                                 READ);
-       RETURN(result);
-}
-
 int ll_lov_setstripe_ea_info(struct inode *inode, struct dentry *dentry,
                             __u64 flags, struct lov_user_md *lum, int lum_size)
 {
@@ -2249,6 +2307,13 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
                        GOTO(out, rc);
 
                rc = ll_file_getstripe(inode, arg, lum_size);
+               if (S_ISREG(inode->i_mode) && IS_ENCRYPTED(inode) &&
+                   ll_i2info(inode)->lli_clob) {
+                       struct iattr attr = { 0 };
+
+                       rc = cl_setattr_ost(ll_i2info(inode)->lli_clob, &attr,
+                                           OP_XVALID_FLAGS, LUSTRE_ENCRYPT_FL);
+               }
        }
        cl_lov_delay_create_clear(&file->f_flags);
 
@@ -3982,34 +4047,112 @@ out_state:
                OBD_FREE_PTR(state);
                RETURN(rc);
        }
+#ifdef HAVE_LUSTRE_CRYPTO
+       case LL_IOC_SET_ENCRYPTION_POLICY:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_set_policy(file, (const void __user *)arg);
+       case LL_IOC_GET_ENCRYPTION_POLICY_EX:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_get_policy_ex(file, (void __user *)arg);
+       case LL_IOC_ADD_ENCRYPTION_KEY:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_add_key(file, (void __user *)arg);
+       case LL_IOC_REMOVE_ENCRYPTION_KEY:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_remove_key(file, (void __user *)arg);
+       case LL_IOC_REMOVE_ENCRYPTION_KEY_ALL_USERS:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_remove_key_all_users(file,
+                                                         (void __user *)arg);
+       case LL_IOC_GET_ENCRYPTION_KEY_STATUS:
+               if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
+                       return -EOPNOTSUPP;
+               return llcrypt_ioctl_get_key_status(file, (void __user *)arg);
+#endif
        default:
                RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                     (void __user *)arg));
        }
 }
 
+loff_t ll_lseek(struct inode *inode, loff_t offset, int whence)
+{
+       struct lu_env *env;
+       struct cl_io *io;
+       struct cl_lseek_io *lsio;
+       __u16 refcheck;
+       int rc;
+       loff_t retval;
+
+       ENTRY;
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       io = vvp_env_thread_io(env);
+       io->ci_obj = ll_i2info(inode)->lli_clob;
+
+       lsio = &io->u.ci_lseek;
+       lsio->ls_start = offset;
+       lsio->ls_whence = whence;
+       lsio->ls_result = -ENXIO;
+
+       do {
+               rc = cl_io_init(env, io, CIT_LSEEK, io->ci_obj);
+               if (!rc)
+                       rc = cl_io_loop(env, io);
+               else
+                       rc = io->ci_result;
+               retval = rc ? : lsio->ls_result;
+               cl_io_fini(env, io);
+       } while (unlikely(io->ci_need_restart));
+
+       cl_env_put(env, &refcheck);
+
+       RETURN(retval);
+}
+
 static loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
 {
        struct inode *inode = file_inode(file);
-       loff_t retval, eof = 0;
+       loff_t retval = offset, eof = 0;
        ktime_t kstart = ktime_get();
 
        ENTRY;
-       retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
-                          (origin == SEEK_CUR) ? file->f_pos : 0);
+
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p), to=%llu=%#llx(%d)\n",
               PFID(ll_inode2fid(inode)), inode, retval, retval,
               origin);
 
-       if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
+       if (origin == SEEK_END) {
                retval = ll_glimpse_size(inode);
                if (retval != 0)
                        RETURN(retval);
                eof = i_size_read(inode);
        }
 
-       retval = generic_file_llseek_size(file, offset, origin,
-                                         ll_file_maxbytes(inode), eof);
+       if (origin == SEEK_HOLE || origin == SEEK_DATA) {
+               if (offset < 0)
+                       return -ENXIO;
+
+               /* flush local cache first if any */
+               cl_sync_file_range(inode, offset, OBD_OBJECT_EOF,
+                                  CL_FSYNC_LOCAL, 0);
+
+               retval = ll_lseek(inode, offset, origin);
+               if (retval < 0)
+                       return retval;
+               retval = vfs_setpos(file, retval, ll_file_maxbytes(inode));
+       } else {
+               retval = generic_file_llseek_size(file, offset, origin,
+                                                 ll_file_maxbytes(inode), eof);
+       }
        if (retval >= 0)
                ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK,
                                   ktime_us_delta(ktime_get(), kstart));
@@ -4610,6 +4753,7 @@ static int ll_inode_revalidate_fini(struct inode *inode, int rc)
 
 static int ll_inode_revalidate(struct dentry *dentry, enum ldlm_intent_flags op)
 {
+       struct inode *parent;
        struct inode *inode = dentry->d_inode;
        struct obd_export *exp = ll_i2mdexp(inode);
        struct lookup_intent oit = {
@@ -4617,18 +4761,30 @@ static int ll_inode_revalidate(struct dentry *dentry, enum ldlm_intent_flags op)
        };
        struct ptlrpc_request *req = NULL;
        struct md_op_data *op_data;
+       const char *name = NULL;
+       size_t namelen = 0;
        int rc = 0;
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p),name=%s\n",
               PFID(ll_inode2fid(inode)), inode, dentry->d_name.name);
 
-       /* Call getattr by fid, so do not provide name at all. */
-       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
+       if (exp_connect_flags2(exp) & OBD_CONNECT2_GETATTR_PFID) {
+               parent = dentry->d_parent->d_inode;
+               name = dentry->d_name.name;
+               namelen = dentry->d_name.len;
+       } else {
+               parent = inode;
+       }
+
+       op_data = ll_prep_md_op_data(NULL, parent, inode, name, namelen, 0,
                                     LUSTRE_OPC_ANY, NULL);
        if (IS_ERR(op_data))
                RETURN(PTR_ERR(op_data));
 
+       /* Call getattr by fid */
+       if (exp_connect_flags2(exp) & OBD_CONNECT2_GETATTR_PFID)
+               op_data->op_flags = MF_GETATTR_BY_FID;
        rc = md_intent_lock(exp, op_data, &oit, &req, &ll_md_blocking_ast, 0);
        ll_finish_md_op_data(op_data);
        if (rc < 0) {
@@ -4646,11 +4802,8 @@ static int ll_inode_revalidate(struct dentry *dentry, enum ldlm_intent_flags op)
         * do_lookup() -> ll_revalidate_it(). We cannot use d_drop
         * here to preserve get_cwd functionality on 2.6.
         * Bug 10503 */
-       if (!dentry->d_inode->i_nlink) {
-               spin_lock(&inode->i_lock);
-               d_lustre_invalidate(dentry, 0);
-               spin_unlock(&inode->i_lock);
-       }
+       if (!dentry->d_inode->i_nlink)
+               d_lustre_invalidate(dentry);
 
        ll_lookup_finish_locks(&oit, dentry);
 out:
@@ -4671,7 +4824,7 @@ static int ll_merge_md_attr(struct inode *inode)
                RETURN(0);
 
        down_read(&lli->lli_lsm_sem);
-       rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+       rc = md_merge_attr(ll_i2mdexp(inode), &lli->lli_fid, lli->lli_lsm_md,
                           &attr, ll_md_blocking_ast);
        up_read(&lli->lli_lsm_sem);
        if (rc != 0)
@@ -4913,6 +5066,17 @@ long ll_fallocate(struct file *filp, int mode, loff_t offset, loff_t len)
        int rc;
 
        /*
+        * Encrypted inodes can't handle collapse range or zero range or insert
+        * range since we would need to re-encrypt blocks with a different IV or
+        * XTS tweak (which are based on the logical block number).
+        * Similar to what ext4 does.
+        */
+       if (IS_ENCRYPTED(inode) &&
+           (mode & (FALLOC_FL_COLLAPSE_RANGE | FALLOC_FL_INSERT_RANGE |
+                    FALLOC_FL_ZERO_RANGE)))
+               RETURN(-EOPNOTSUPP);
+
+       /*
         * Only mode == 0 (which is standard prealloc) is supported now.
         * Punch is not supported yet.
         */
@@ -5130,7 +5294,11 @@ struct file_operations ll_file_operations = {
        .release        = ll_file_release,
        .mmap           = ll_file_mmap,
        .llseek         = ll_file_seek,
-       .splice_read    = ll_file_splice_read,
+#ifndef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
+       .splice_read    = generic_file_splice_read,
+#else
+       .splice_read    = pcc_file_splice_read,
+#endif
        .fsync          = ll_fsync,
        .flush          = ll_flush,
        .fallocate      = ll_fallocate,
@@ -5155,7 +5323,11 @@ struct file_operations ll_file_operations_flock = {
        .release        = ll_file_release,
        .mmap           = ll_file_mmap,
        .llseek         = ll_file_seek,
-       .splice_read    = ll_file_splice_read,
+#ifndef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
+       .splice_read    = generic_file_splice_read,
+#else
+       .splice_read    = pcc_file_splice_read,
+#endif
        .fsync          = ll_fsync,
        .flush          = ll_flush,
        .flock          = ll_file_flock,
@@ -5183,7 +5355,11 @@ struct file_operations ll_file_operations_noflock = {
        .release        = ll_file_release,
        .mmap           = ll_file_mmap,
        .llseek         = ll_file_seek,
-       .splice_read    = ll_file_splice_read,
+#ifndef HAVE_DEFAULT_FILE_SPLICE_READ_EXPORT
+       .splice_read    = generic_file_splice_read,
+#else
+       .splice_read    = pcc_file_splice_read,
+#endif
        .fsync          = ll_fsync,
        .flush          = ll_flush,
        .flock          = ll_file_noflock,