Whamcloud - gitweb
b=21454 Disable cl_page_in_io check for append write.
[fs/lustre-release.git] / lustre / llite / file.c
index 9850774..cb32570 100644 (file)
@@ -55,7 +55,7 @@ struct ll_file_data *ll_file_data_get(void)
 {
         struct ll_file_data *fd;
 
-        OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
+        OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, CFS_ALLOC_IO);
         return fd;
 }
 
@@ -77,10 +77,15 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
         op_data->op_attr_blocks = inode->i_blocks;
         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
-        memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
+        if (fh)
+                op_data->op_handle = *fh;
         op_data->op_capa1 = ll_mdscapa_get(inode);
 }
 
+/**
+ * Closes the IO epoch and packs all the attributes into @op_data for
+ * the CLOSE rpc.
+ */
 static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
                              struct obd_client_handle *och)
 {
@@ -92,14 +97,15 @@ static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
         if (!(och->och_flags & FMODE_WRITE))
                 goto out;
 
-        if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
-            !S_ISREG(inode->i_mode))
+        if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
                 op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
         else
-                ll_epoch_close(inode, op_data, &och, 0);
+                ll_ioepoch_close(inode, op_data, &och, 0);
 
 out:
         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
+        ll_prep_md_op_data(op_data, inode, NULL, NULL,
+                           0, 0, LUSTRE_OPC_ANY, NULL);
         EXIT;
 }
 
@@ -112,7 +118,7 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         struct ptlrpc_request *req = NULL;
         struct obd_device *obd = class_exp2obd(exp);
         int epoch_close = 1;
-        int seq_end = 0, rc;
+        int rc;
         ENTRY;
 
         if (obd == NULL) {
@@ -125,14 +131,6 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                 GOTO(out, rc = 0);
         }
 
-        /*
-         * here we check if this is forced umount. If so this is called on
-         * canceling "open lock" and we do not call md_close() in this case, as
-         * it will not be successful, as import is already deactivated.
-         */
-        if (obd->obd_force)
-                GOTO(out, rc = 0);
-
         OBD_ALLOC_PTR(op_data);
         if (op_data == NULL)
                 GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
@@ -140,17 +138,12 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         ll_prepare_close(inode, op_data, och);
         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
         rc = md_close(md_exp, op_data, och->och_mod, &req);
-        if (rc != -EAGAIN)
-                seq_end = 1;
-
         if (rc == -EAGAIN) {
                 /* This close must have the epoch closed. */
-                LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
                 LASSERT(epoch_close);
                 /* MDS has instructed us to obtain Size-on-MDS attribute from
                  * OSTs and send setattr to back to MDS. */
-                rc = ll_sizeonmds_update(inode, och->och_mod,
-                                         &och->och_fh, op_data->op_ioepoch);
+                rc = ll_som_update(inode, op_data);
                 if (rc) {
                         CERROR("inode %lu mdc Size-on-MDS update failed: "
                                "rc = %d\n", inode->i_ino, rc);
@@ -172,12 +165,10 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         EXIT;
 out:
 
-        if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
+        if (exp_connect_som(exp) && !epoch_close &&
             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
         } else {
-                if (seq_end)
-                        ptlrpc_close_replay_seq(req);
                 md_clear_open_replay_data(md_exp, och);
                 /* Free @och if it is not waiting for DONE_WRITING. */
                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
@@ -237,14 +228,8 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
         ENTRY;
 
         /* clear group lock, if present */
-        if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-#if 0 /* XXX */
-                struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-                fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
-                rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
-                                      &fd->fd_cwlockh);
-#endif
-        }
+        if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
+                ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
 
         /* Let's see if we have good enough OPEN lock on the file and if
            we can skip talking to MDS */
@@ -344,6 +329,10 @@ int ll_file_release(struct inode *inode, struct file *file)
         lli->lli_async_rc = 0;
 
         rc = ll_md_close(sbi->ll_md_exp, inode, file);
+
+        if (OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, obd_fail_val))
+                libcfs_debug_dumplog();
+
         RETURN(rc);
 }
 
@@ -400,12 +389,12 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                 GOTO(out, rc);
         }
 
-        if (itp->d.lustre.it_lock_mode)
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
+        if (!rc && itp->d.lustre.it_lock_mode)
                 md_set_lock_data(sbi->ll_md_exp,
                                  &itp->d.lustre.it_lock_handle,
-                                 file->f_dentry->d_inode);
+                                 file->f_dentry->d_inode, NULL);
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
 out:
         ptlrpc_req_finished(itp->d.lustre.it_data);
         it_clear_disposition(itp, DISP_ENQ_COMPLETE);
@@ -414,6 +403,20 @@ out:
         RETURN(rc);
 }
 
+/**
+ * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
+ * not believe attributes if a few ioepoch holders exist. Attributes for
+ * previous ioepoch if new one is opened are also skipped by MDS.
+ */
+void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
+{
+        if (ioepoch && lli->lli_ioepoch != ioepoch) {
+                lli->lli_ioepoch = ioepoch;
+                CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
+                       ioepoch, PFID(&lli->lli_fid));
+        }
+}
+
 static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
                        struct lookup_intent *it, struct obd_client_handle *och)
 {
@@ -429,7 +432,7 @@ static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
         och->och_fid = lli->lli_fid;
         och->och_flags = it->it_flags;
-        lli->lli_ioepoch = body->ioepoch;
+        ll_ioepoch_open(lli, body->ioepoch);
 
         return md_set_open_replay_data(md_exp, och, req);
 }
@@ -511,29 +514,12 @@ int ll_file_open(struct inode *inode, struct file *file)
 
         fd->fd_file = file;
         if (S_ISDIR(inode->i_mode)) {
-again:
                 spin_lock(&lli->lli_lock);
                 if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) {
                         LASSERT(lli->lli_sai == NULL);
                         lli->lli_opendir_key = fd;
                         lli->lli_opendir_pid = cfs_curproc_pid();
                         opendir_set = 1;
-                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid() &&
-                                    lli->lli_opendir_key != NULL)) {
-                        /* Two cases for this:
-                         * (1) The same process open such directory many times.
-                         * (2) The old process opened the directory, and exited
-                         *     before its children processes. Then new process
-                         *     with the same pid opens such directory before the
-                         *     old process's children processes exit.
-                         * reset stat ahead for such cases. */
-                        spin_unlock(&lli->lli_lock);
-                        CDEBUG(D_INFO, "Conflict statahead for %.*s "DFID
-                               " reset it.\n", file->f_dentry->d_name.len,
-                               file->f_dentry->d_name.name,
-                               PFID(&lli->lli_fid));
-                        ll_stop_statahead(inode, lli->lli_opendir_key);
-                        goto again;
                 }
                 spin_unlock(&lli->lli_lock);
         }
@@ -563,6 +549,12 @@ again:
                  * already? XXX - NFS implications? */
                 oit.it_flags &= ~O_EXCL;
 
+                /* bug20584, if "it_flags" contains O_CREAT, the file will be
+                 * created if necessary, then "IT_CREAT" should be set to keep
+                 * consistent with it */
+                if (oit.it_flags & O_CREAT)
+                        oit.it_op |= IT_CREAT;
+
                 it = &oit;
         }
 
@@ -612,9 +604,9 @@ restart:
                            would attempt to grab och_sem as well, that would
                            result in a deadlock */
                         up(&lli->lli_och_sem);
-                        it->it_flags |= O_CHECK_STALE;
+                        it->it_create_mode |= M_CHECK_STALE;
                         rc = ll_intent_file_open(file, NULL, 0, it);
-                        it->it_flags &= ~O_CHECK_STALE;
+                        it->it_create_mode &= ~M_CHECK_STALE;
                         if (rc) {
                                 ll_file_data_put(fd);
                                 GOTO(out_openerr, rc);
@@ -625,9 +617,6 @@ restart:
                                 req = it->d.lustre.it_data;
                                 ptlrpc_req_finished(req);
                         }
-                        md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
-                                         &it->d.lustre.it_lock_handle,
-                                         file->f_dentry->d_inode);
                         goto restart;
                 }
                 OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
@@ -696,15 +685,15 @@ out_openerr:
         return rc;
 }
 
-/* Fills the obdo with the attributes for the inode defined by lsm */
-int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
+/* Fills the obdo with the attributes for the lsm */
+static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
+                          struct obd_capa *capa, struct obdo *obdo,
+                          __u64 ioepoch, int sync)
 {
         struct ptlrpc_request_set *set;
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
+        struct obd_info            oinfo = { { { 0 } } };
+        int                        rc;
 
-        struct obd_info oinfo = { { { 0 } } };
-        int rc;
         ENTRY;
 
         LASSERT(lsm != NULL);
@@ -714,37 +703,59 @@ int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
         oinfo.oi_oa->o_id = lsm->lsm_object_id;
         oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
         oinfo.oi_oa->o_mode = S_IFREG;
+        oinfo.oi_oa->o_ioepoch = ioepoch;
         oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
                                OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
                                OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                               OBD_MD_FLGROUP;
-        oinfo.oi_capa = ll_mdscapa_get(inode);
+                               OBD_MD_FLGROUP | OBD_MD_FLEPOCH;
+        oinfo.oi_capa = capa;
+        if (sync) {
+                oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
+                oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
+        }
 
         set = ptlrpc_prep_set();
         if (set == NULL) {
                 CERROR("can't allocate ptlrpc set\n");
                 rc = -ENOMEM;
         } else {
-                rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
+                rc = obd_getattr_async(exp, &oinfo, set);
                 if (rc == 0)
                         rc = ptlrpc_set_wait(set);
                 ptlrpc_set_destroy(set);
         }
-        capa_put(oinfo.oi_capa);
-        if (rc)
-                RETURN(rc);
+        if (rc == 0)
+                oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
+                                         OBD_MD_FLATIME | OBD_MD_FLMTIME |
+                                         OBD_MD_FLCTIME | OBD_MD_FLSIZE);
+        RETURN(rc);
+}
 
-        oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
-                                 OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                 OBD_MD_FLCTIME | OBD_MD_FLSIZE);
+/**
+  * Performs the getattr on the inode and updates its fields.
+  * If @sync != 0, perform the getattr under the server-side lock.
+  */
+int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
+                     __u64 ioepoch, int sync)
+{
+        struct ll_inode_info *lli  = ll_i2info(inode);
+        struct obd_capa      *capa = ll_mdscapa_get(inode);
+        int rc;
+        ENTRY;
 
-        obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
-        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
-               lli->lli_smd->lsm_object_id, i_size_read(inode),
-               (unsigned long long)inode->i_blocks,
-               (unsigned long)ll_inode_blksize(inode));
-        RETURN(0);
+        rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode),
+                            capa, obdo, ioepoch, sync);
+        capa_put(capa);
+        if (rc == 0) {
+                obdo_refresh_inode(inode, obdo, obdo->o_valid);
+                CDEBUG(D_INODE,
+                       "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
+                       lli->lli_smd->lsm_object_id, i_size_read(inode),
+                       (unsigned long long)inode->i_blocks,
+                       (unsigned long)ll_inode_blksize(inode));
+        }
+        RETURN(rc);
 }
 
 int ll_merge_lvb(struct inode *inode)
@@ -773,31 +784,40 @@ int ll_merge_lvb(struct inode *inode)
 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
                      lstat_t *st)
 {
-        /* XXX */
-        return -ENOSYS;
+        struct obdo obdo = { 0 };
+        int rc;
+
+        rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
+        if (rc == 0) {
+                st->st_size   = obdo.o_size;
+                st->st_blocks = obdo.o_blocks;
+                st->st_mtime  = obdo.o_mtime;
+                st->st_atime  = obdo.o_atime;
+                st->st_ctime  = obdo.o_ctime;
+        }
+        return rc;
 }
 
 void ll_io_init(struct cl_io *io, const struct file *file, int write)
 {
-        struct inode *inode     = file->f_dentry->d_inode;
-        struct ll_sb_info *sbi  = ll_i2sbi(inode);
-        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        struct inode *inode = file->f_dentry->d_inode;
 
-        LASSERT(fd != NULL);
         memset(io, 0, sizeof *io);
         io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
         if (write)
-                io->u.ci_wr.wr_append = file->f_flags & O_APPEND;
+                io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
         io->ci_obj     = ll_i2info(inode)->lli_clob;
         io->ci_lockreq = CILR_MAYBE;
-        if (fd->fd_flags & LL_FILE_IGNORE_LOCK || sbi->ll_flags & LL_SBI_NOLCK)
+        if (ll_file_nolock(file)) {
                 io->ci_lockreq = CILR_NEVER;
-        else if (file->f_flags & O_APPEND)
+                io->ci_no_srvlock = 1;
+        } else if (file->f_flags & O_APPEND) {
                 io->ci_lockreq = CILR_MANDATORY;
+        }
 }
 
 static ssize_t ll_file_io_generic(const struct lu_env *env,
-                struct ccc_io_args *args, struct file *file,
+                struct vvp_io_args *args, struct file *file,
                 enum cl_io_type iot, loff_t *ppos, size_t count)
 {
         struct cl_io       *io;
@@ -807,27 +827,48 @@ static ssize_t ll_file_io_generic(const struct lu_env *env,
         io = &ccc_env_info(env)->cti_io;
         ll_io_init(io, file, iot == CIT_WRITE);
 
-        if (iot == CIT_READ)
-                io->u.ci_rd.rd_is_sendfile = args->cia_is_sendfile;
-
         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
                 struct vvp_io *vio = vvp_env_io(env);
                 struct ccc_io *cio = ccc_env_io(env);
-                if (cl_io_is_sendfile(io)) {
-                        vio->u.read.cui_actor = args->cia_actor;
-                        vio->u.read.cui_target = args->cia_target;
-                } else {
-                        cio->cui_iov = args->cia_iov;
-                        cio->cui_nrsegs = args->cia_nrsegs;
+                struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
+                int write_sem_locked = 0;
+
+                cio->cui_fd  = LUSTRE_FPRIVATE(file);
+                vio->cui_io_subtype = args->via_io_subtype;
+
+                switch (vio->cui_io_subtype) {
+                case IO_NORMAL:
+                        cio->cui_iov = args->u.normal.via_iov;
+                        cio->cui_nrsegs = args->u.normal.via_nrsegs;
 #ifndef HAVE_FILE_WRITEV
-                        cio->cui_iocb = args->cia_iocb;
+                        cio->cui_iocb = args->u.normal.via_iocb;
 #endif
+                        if ((iot == CIT_WRITE) &&
+                            !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+                                down(&lli->lli_write_sem);
+                                write_sem_locked = 1;
+                        }
+                        break;
+                case IO_SENDFILE:
+                        vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
+                        vio->u.sendfile.cui_target = args->u.sendfile.via_target;
+                        break;
+                case IO_SPLICE:
+                        vio->u.splice.cui_pipe = args->u.splice.via_pipe;
+                        vio->u.splice.cui_flags = args->u.splice.via_flags;
+                        break;
+                default:
+                        CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
+                        LBUG();
                 }
-                cio->cui_fd  = LUSTRE_FPRIVATE(file);
                 result = cl_io_loop(env, io);
-        } else
+                if (write_sem_locked)
+                        up(&lli->lli_write_sem);
+        } else {
                 /* cl_io_rw_init() handled IO */
                 result = io->ci_result;
+        }
+
         if (io->ci_nob > 0) {
                 result = io->ci_nob;
                 *ppos = io->u.ci_wr.wr.crw_pos;
@@ -873,7 +914,7 @@ static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
                               unsigned long nr_segs, loff_t *ppos)
 {
         struct lu_env      *env;
-        struct ccc_io_args *args;
+        struct vvp_io_args *args;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -887,10 +928,10 @@ static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-        args = &vvp_env_info(env)->vti_args;
-        args->cia_is_sendfile = 0;
-        args->cia_iov = (struct iovec *)iov;
-        args->cia_nrsegs = nr_segs;
+        args = vvp_env_args(env, IO_NORMAL);
+        args->u.normal.via_iov = (struct iovec *)iov;
+        args->u.normal.via_nrsegs = nr_segs;
+
         result = ll_file_io_generic(env, args, file, CIT_READ, ppos, count);
         cl_env_put(env, &refcheck);
         RETURN(result);
@@ -922,7 +963,7 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
                                 unsigned long nr_segs, loff_t pos)
 {
         struct lu_env      *env;
-        struct ccc_io_args *args;
+        struct vvp_io_args *args;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -936,11 +977,11 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-        args = &vvp_env_info(env)->vti_args;
-        args->cia_is_sendfile = 0;
-        args->cia_iov = (struct iovec *)iov;
-        args->cia_nrsegs = nr_segs;
-        args->cia_iocb = iocb;
+        args = vvp_env_args(env, IO_NORMAL);
+        args->u.normal.via_iov = (struct iovec *)iov;
+        args->u.normal.via_nrsegs = nr_segs;
+        args->u.normal.via_iocb = iocb;
+
         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
                                     &iocb->ki_pos, count);
         cl_env_put(env, &refcheck);
@@ -985,7 +1026,7 @@ static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
                               unsigned long nr_segs, loff_t *ppos)
 {
         struct lu_env      *env;
-        struct ccc_io_args *args;
+        struct vvp_io_args *args;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -999,9 +1040,10 @@ static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-        args = &vvp_env_info(env)->vti_args;
-        args->cia_iov = (struct iovec *)iov;
-        args->cia_nrsegs = nr_segs;
+        args = vvp_env_args(env, IO_NORMAL);
+        args->u.normal.via_iov = (struct iovec *)iov;
+        args->u.normal.via_nrsegs = nr_segs;
+
         result = ll_file_io_generic(env, args, file, CIT_WRITE, ppos, count);
         cl_env_put(env, &refcheck);
         RETURN(result);
@@ -1034,7 +1076,7 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
                                  unsigned long nr_segs, loff_t pos)
 {
         struct lu_env      *env;
-        struct ccc_io_args *args;
+        struct vvp_io_args *args;
         size_t              count;
         ssize_t             result;
         int                 refcheck;
@@ -1048,10 +1090,11 @@ static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-        args = &vvp_env_info(env)->vti_args;
-        args->cia_iov = (struct iovec *)iov;
-        args->cia_nrsegs = nr_segs;
-        args->cia_iocb = iocb;
+        args = vvp_env_args(env, IO_NORMAL);
+        args->u.normal.via_iov = (struct iovec *)iov;
+        args->u.normal.via_nrsegs = nr_segs;
+        args->u.normal.via_iocb = iocb;
+
         result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
                                   &iocb->ki_pos, count);
         cl_env_put(env, &refcheck);
@@ -1089,6 +1132,7 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
 #endif
 
 
+#ifdef HAVE_KERNEL_SENDFILE
 /*
  * Send file content (through pagecache) somewhere with helper
  */
@@ -1096,7 +1140,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                                 read_actor_t actor, void *target)
 {
         struct lu_env      *env;
-        struct ccc_io_args *args;
+        struct vvp_io_args *args;
         ssize_t             result;
         int                 refcheck;
         ENTRY;
@@ -1105,14 +1149,43 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (IS_ERR(env))
                 RETURN(PTR_ERR(env));
 
-        args = &vvp_env_info(env)->vti_args;
-        args->cia_is_sendfile = 1;
-        args->cia_target = target;
-        args->cia_actor = actor;
+        args = vvp_env_args(env, IO_SENDFILE);
+        args->u.sendfile.via_target = target;
+        args->u.sendfile.via_actor = actor;
+
         result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
         cl_env_put(env, &refcheck);
         RETURN(result);
 }
+#endif
+
+#ifdef HAVE_KERNEL_SPLICE_READ
+/*
+ * Send file content (through pagecache) somewhere with helper
+ */
+static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
+                                   struct pipe_inode_info *pipe, size_t count,
+                                   unsigned int flags)
+{
+        struct lu_env      *env;
+        struct vvp_io_args *args;
+        ssize_t             result;
+        int                 refcheck;
+        ENTRY;
+
+        env = cl_env_get(&refcheck);
+        if (IS_ERR(env))
+                RETURN(PTR_ERR(env));
+
+        args = vvp_env_args(env, IO_SPLICE);
+        args->u.splice.via_pipe = pipe;
+        args->u.splice.via_flags = flags;
+
+        result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
+        cl_env_put(env, &refcheck);
+        RETURN(result);
+}
+#endif
 
 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
                                unsigned long arg)
@@ -1245,8 +1318,7 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
         LASSERT(lmm != NULL);
 
         if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
-            (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3)) &&
-            (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_JOIN))) {
+            (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
                 GOTO(out, rc = -EPROTO);
         }
 
@@ -1270,62 +1342,9 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                                 lustre_swab_lov_user_md_objects(
                                  ((struct lov_user_md_v3 *)lmm)->lmm_objects,
                                  ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
-                } else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_JOIN)) {
-                        lustre_swab_lov_user_md_join((struct lov_user_md_join *)lmm);
                 }
         }
 
-        if (lmm->lmm_magic == LOV_MAGIC_JOIN) {
-                struct lov_stripe_md *lsm;
-                struct lov_user_md_join *lmj;
-                int lmj_size, i, aindex = 0;
-
-                rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
-                if (rc < 0)
-                        GOTO(out, rc = -ENOMEM);
-                rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
-                if (rc)
-                        GOTO(out_free_memmd, rc);
-
-                lmj_size = sizeof(struct lov_user_md_join) +
-                           lsm->lsm_stripe_count *
-                           sizeof(struct lov_user_ost_data_join);
-                OBD_ALLOC(lmj, lmj_size);
-                if (!lmj)
-                        GOTO(out_free_memmd, rc = -ENOMEM);
-
-                memcpy(lmj, lmm, sizeof(struct lov_user_md_join));
-                for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                        struct lov_extent *lex =
-                                &lsm->lsm_array->lai_ext_array[aindex];
-
-                        if (lex->le_loi_idx + lex->le_stripe_count <= i)
-                                aindex ++;
-                        CDEBUG(D_INFO, "aindex %d i %d l_extent_start "
-                                        LPU64" len %d\n", aindex, i,
-                                        lex->le_start, (int)lex->le_len);
-                        lmj->lmm_objects[i].l_extent_start =
-                                lex->le_start;
-
-                        if ((int)lex->le_len == -1)
-                                lmj->lmm_objects[i].l_extent_end = -1;
-                        else
-                                lmj->lmm_objects[i].l_extent_end =
-                                        lex->le_start + lex->le_len;
-                        lmj->lmm_objects[i].l_object_id =
-                                lsm->lsm_oinfo[i]->loi_id;
-                        lmj->lmm_objects[i].l_object_gr =
-                                lsm->lsm_oinfo[i]->loi_gr;
-                        lmj->lmm_objects[i].l_ost_gen =
-                                lsm->lsm_oinfo[i]->loi_ost_gen;
-                        lmj->lmm_objects[i].l_ost_idx =
-                                lsm->lsm_oinfo[i]->loi_ost_idx;
-                }
-                lmm = (struct lov_mds_md *)lmj;
-                lmmsize = lmj_size;
-out_free_memmd:
-                obd_free_memmd(sbi->ll_dt_exp, &lsm);
-        }
 out:
         *lmmp = lmm;
         *lmm_size = lmmsize;
@@ -1405,192 +1424,79 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
                             (void *)arg);
 }
 
-static int ll_get_grouplock(struct inode *inode, struct file *file,
-                            unsigned long arg)
+int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
 {
-        /* XXX */
-        return -ENOSYS;
-}
+        struct ll_inode_info   *lli = ll_i2info(inode);
+        struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
+        struct ccc_grouplock    grouplock;
+        int                     rc;
+        ENTRY;
 
-static int ll_put_grouplock(struct inode *inode, struct file *file,
-                            unsigned long arg)
-{
-        /* XXX */
-        return -ENOSYS;
-}
+        if (ll_file_nolock(file))
+                RETURN(-EOPNOTSUPP);
 
-#if LUSTRE_FIX >= 50
-static int join_sanity_check(struct inode *head, struct inode *tail)
-{
-        ENTRY;
-        if ((ll_i2sbi(head)->ll_flags & LL_SBI_JOIN) == 0) {
-                CERROR("server do not support join \n");
-                RETURN(-EINVAL);
-        }
-        if (!S_ISREG(tail->i_mode) || !S_ISREG(head->i_mode)) {
-                CERROR("tail ino %lu and ino head %lu must be regular\n",
-                       head->i_ino, tail->i_ino);
-                RETURN(-EINVAL);
-        }
-        if (head->i_ino == tail->i_ino) {
-                CERROR("file %lu can not be joined to itself \n", head->i_ino);
+        spin_lock(&lli->lli_lock);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+                CWARN("group lock already existed with gid %lu\n",
+                       fd->fd_grouplock.cg_gid);
+                spin_unlock(&lli->lli_lock);
                 RETURN(-EINVAL);
         }
-        if (i_size_read(head) % JOIN_FILE_ALIGN) {
-                CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
+        LASSERT(fd->fd_grouplock.cg_lock == NULL);
+        spin_unlock(&lli->lli_lock);
+
+        rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
+                              arg, (file->f_flags & O_NONBLOCK), &grouplock);
+        if (rc)
+                RETURN(rc);
+
+        spin_lock(&lli->lli_lock);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+                spin_unlock(&lli->lli_lock);
+                CERROR("another thread just won the race\n");
+                cl_put_grouplock(&grouplock);
                 RETURN(-EINVAL);
         }
+
+        fd->fd_flags |= LL_FILE_GROUP_LOCKED;
+        fd->fd_grouplock = grouplock;
+        spin_unlock(&lli->lli_lock);
+
+        CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
         RETURN(0);
 }
 
-static int join_file(struct inode *head_inode, struct file *head_filp,
-                     struct file *tail_filp)
+int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
 {
-        struct dentry *tail_dentry = tail_filp->f_dentry;
-        struct lookup_intent oit = {.it_op = IT_OPEN,
-                                   .it_flags = head_filp->f_flags|O_JOIN_FILE};
-        struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
-                ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
-
-        struct lustre_handle lockh;
-        struct md_op_data *op_data;
-        int    rc;
-        loff_t data;
+        struct ll_inode_info   *lli = ll_i2info(inode);
+        struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
+        struct ccc_grouplock    grouplock;
         ENTRY;
 
-        tail_dentry = tail_filp->f_dentry;
-
-        data = i_size_read(head_inode);
-        op_data = ll_prep_md_op_data(NULL, head_inode,
-                                     tail_dentry->d_parent->d_inode,
-                                     tail_dentry->d_name.name,
-                                     tail_dentry->d_name.len, 0,
-                                     LUSTRE_OPC_ANY, &data);
-        if (IS_ERR(op_data))
-                RETURN(PTR_ERR(op_data));
-
-        rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
-                         op_data, &lockh, NULL, 0, NULL, 0);
-
-        ll_finish_md_op_data(op_data);
-        if (rc < 0)
-                GOTO(out, rc);
-
-        rc = oit.d.lustre.it_status;
-
-        if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
-                rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
-                ptlrpc_req_finished((struct ptlrpc_request *)
-                                    oit.d.lustre.it_data);
-                GOTO(out, rc);
+        spin_lock(&lli->lli_lock);
+        if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+                spin_unlock(&lli->lli_lock);
+                CWARN("no group lock held\n");
+                RETURN(-EINVAL);
         }
+        LASSERT(fd->fd_grouplock.cg_lock != NULL);
 
-        if (oit.d.lustre.it_lock_mode) { /* If we got lock - release it right
-                                           * away */
-                ldlm_lock_decref(&lockh, oit.d.lustre.it_lock_mode);
-                oit.d.lustre.it_lock_mode = 0;
+        if (fd->fd_grouplock.cg_gid != arg) {
+                CWARN("group lock %lu doesn't match current id %lu\n",
+                       arg, fd->fd_grouplock.cg_gid);
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EINVAL);
         }
-        ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
-        it_clear_disposition(&oit, DISP_ENQ_COMPLETE);
-        ll_release_openhandle(head_filp->f_dentry, &oit);
-out:
-        ll_intent_release(&oit);
-        RETURN(rc);
-}
 
-static int ll_file_join(struct inode *head, struct file *filp,
-                        char *filename_tail)
-{
-        struct inode *tail = NULL, *first = NULL, *second = NULL;
-        struct dentry *tail_dentry;
-        struct file *tail_filp, *first_filp, *second_filp;
-        struct ll_lock_tree first_tree, second_tree;
-        struct ll_lock_tree_node *first_node, *second_node;
-        struct ll_inode_info *hlli = ll_i2info(head), *tlli;
-        int rc = 0, cleanup_phase = 0;
-        ENTRY;
-
-        CDEBUG(D_VFSTRACE, "VFS Op:head=%lu/%u(%p) tail %s\n",
-               head->i_ino, head->i_generation, head, filename_tail);
-
-        tail_filp = filp_open(filename_tail, O_WRONLY, 0644);
-        if (IS_ERR(tail_filp)) {
-                CERROR("Can not open tail file %s", filename_tail);
-                rc = PTR_ERR(tail_filp);
-                GOTO(cleanup, rc);
-        }
-        tail = igrab(tail_filp->f_dentry->d_inode);
-
-        tlli = ll_i2info(tail);
-        tail_dentry = tail_filp->f_dentry;
-        LASSERT(tail_dentry);
-        cleanup_phase = 1;
-
-        /*reorder the inode for lock sequence*/
-        first = head->i_ino > tail->i_ino ? head : tail;
-        second = head->i_ino > tail->i_ino ? tail : head;
-        first_filp = head->i_ino > tail->i_ino ? filp : tail_filp;
-        second_filp = head->i_ino > tail->i_ino ? tail_filp : filp;
-
-        CDEBUG(D_INFO, "reorder object from %lu:%lu to %lu:%lu \n",
-               head->i_ino, tail->i_ino, first->i_ino, second->i_ino);
-        first_node = ll_node_from_inode(first, 0, OBD_OBJECT_EOF, LCK_EX);
-        if (IS_ERR(first_node)){
-                rc = PTR_ERR(first_node);
-                GOTO(cleanup, rc);
-        }
-        first_tree.lt_fd = first_filp->private_data;
-        rc = ll_tree_lock(&first_tree, first_node, NULL, 0, 0);
-        if (rc != 0)
-                GOTO(cleanup, rc);
-        cleanup_phase = 2;
-
-        second_node = ll_node_from_inode(second, 0, OBD_OBJECT_EOF, LCK_EX);
-        if (IS_ERR(second_node)){
-                rc = PTR_ERR(second_node);
-                GOTO(cleanup, rc);
-        }
-        second_tree.lt_fd = second_filp->private_data;
-        rc = ll_tree_lock(&second_tree, second_node, NULL, 0, 0);
-        if (rc != 0)
-                GOTO(cleanup, rc);
-        cleanup_phase = 3;
-
-        rc = join_sanity_check(head, tail);
-        if (rc)
-                GOTO(cleanup, rc);
+        grouplock = fd->fd_grouplock;
+        memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
+        fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
+        spin_unlock(&lli->lli_lock);
 
-        rc = join_file(head, filp, tail_filp);
-        if (rc)
-                GOTO(cleanup, rc);
-cleanup:
-        switch (cleanup_phase) {
-        case 3:
-                ll_tree_unlock(&second_tree);
-                obd_cancel_unused(ll_i2dtexp(second),
-                                  ll_i2info(second)->lli_smd, 0, NULL);
-        case 2:
-                ll_tree_unlock(&first_tree);
-                obd_cancel_unused(ll_i2dtexp(first),
-                                  ll_i2info(first)->lli_smd, 0, NULL);
-        case 1:
-                filp_close(tail_filp, 0);
-                if (tail)
-                        iput(tail);
-                if (head && rc == 0) {
-                        obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
-                                       &hlli->lli_smd);
-                        hlli->lli_smd = NULL;
-                }
-        case 0:
-                break;
-        default:
-                CERROR("invalid cleanup_phase %d\n", cleanup_phase);
-                LBUG();
-        }
-        RETURN(rc);
+        cl_put_grouplock(&grouplock);
+        CDEBUG(D_INFO, "group lock %lu released\n", arg);
+        RETURN(0);
 }
-#endif /* LUSTRE_FIX >= 50 */
 
 /**
  * Close inode open handle
@@ -1641,7 +1547,7 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
  * Get size for inode for which FIEMAP mapping is requested.
  * Make the FIEMAP get_info call and returns the result.
  */
-int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
+int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
               int num_bytes)
 {
         struct obd_export *exp = ll_i2dtexp(inode);
@@ -1680,6 +1586,42 @@ int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
         RETURN(rc);
 }
 
+int ll_fid2path(struct obd_export *exp, void *arg)
+{
+        struct getinfo_fid2path *gfout, *gfin;
+        int outsize, rc;
+        ENTRY;
+
+        /* Need to get the buflen */
+        OBD_ALLOC_PTR(gfin);
+        if (gfin == NULL)
+                RETURN(-ENOMEM);
+        if (copy_from_user(gfin, arg, sizeof(*gfin))) {
+                OBD_FREE_PTR(gfin);
+                RETURN(-EFAULT);
+        }
+
+        outsize = sizeof(*gfout) + gfin->gf_pathlen;
+        OBD_ALLOC(gfout, outsize);
+        if (gfout == NULL) {
+                OBD_FREE_PTR(gfin);
+                RETURN(-ENOMEM);
+        }
+        memcpy(gfout, gfin, sizeof(*gfout));
+        OBD_FREE_PTR(gfin);
+
+        /* Call mdc_iocontrol */
+        rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
+        if (rc)
+                GOTO(gf_free, rc);
+        if (copy_to_user(arg, gfout, outsize))
+                rc = -EFAULT;
+
+gf_free:
+        OBD_FREE(gfout, outsize);
+        RETURN(rc);
+}
+
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                   unsigned long arg)
 {
@@ -1729,7 +1671,7 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
                 RETURN(ll_lov_recreate_obj(inode, file, arg));
-        case EXT3_IOC_FIEMAP: {
+        case FSFILT_IOC_FIEMAP: {
                 struct ll_user_fiemap *fiemap_s;
                 size_t num_bytes, ret_bytes;
                 unsigned int extent_count;
@@ -1778,7 +1720,7 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                                 GOTO(error, rc);
                 }
 
-                rc = ll_fiemap(inode, fiemap_s, num_bytes);
+                rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
                 if (rc)
                         GOTO(error, rc);
 
@@ -1795,29 +1737,12 @@ error:
                 OBD_VFREE(fiemap_s, num_bytes);
                 RETURN(rc);
         }
-        case EXT3_IOC_GETFLAGS:
-        case EXT3_IOC_SETFLAGS:
+        case FSFILT_IOC_GETFLAGS:
+        case FSFILT_IOC_SETFLAGS:
                 RETURN(ll_iocontrol(inode, file, cmd, arg));
-        case EXT3_IOC_GETVERSION_OLD:
-        case EXT3_IOC_GETVERSION:
+        case FSFILT_IOC_GETVERSION_OLD:
+        case FSFILT_IOC_GETVERSION:
                 RETURN(put_user(inode->i_generation, (int *)arg));
-        case LL_IOC_JOIN: {
-#if LUSTRE_FIX >= 50
-                /* Allow file join in beta builds to allow debuggging */
-                char *ftail;
-                int rc;
-
-                ftail = getname((const char *)arg);
-                if (IS_ERR(ftail))
-                        RETURN(PTR_ERR(ftail));
-                rc = ll_file_join(inode, file, ftail);
-                putname(ftail);
-                RETURN(rc);
-#else
-                CWARN("file join is not supported in this version of Lustre\n");
-                RETURN(-ENOTTY);
-#endif
-        }
         case LL_IOC_GROUP_LOCK:
                 RETURN(ll_get_grouplock(inode, file, arg));
         case LL_IOC_GROUP_UNLOCK:
@@ -1828,18 +1753,21 @@ error:
         /* We need to special case any other ioctls we want to handle,
          * to send them to the MDS/OST as appropriate and to properly
          * network encode the arg field.
-        case EXT3_IOC_SETVERSION_OLD:
-        case EXT3_IOC_SETVERSION:
+        case FSFILT_IOC_SETVERSION_OLD:
+        case FSFILT_IOC_SETVERSION:
         */
         case LL_IOC_FLUSHCTX:
                 RETURN(ll_flush_ctx(inode));
         case LL_IOC_PATH2FID: {
-                if (copy_to_user((void *)arg, &ll_i2info(inode)->lli_fid,
+                if (copy_to_user((void *)arg, ll_inode2fid(inode),
                                  sizeof(struct lu_fid)))
                         RETURN(-EFAULT);
 
                 RETURN(0);
         }
+        case OBD_IOC_FID2PATH:
+                RETURN(ll_fid2path(ll_i2mdexp(inode), (void *)arg));
+
         default: {
                 int err;
 
@@ -2131,13 +2059,14 @@ static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
         return 0;
 }
 
-int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
+int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
+                             __u64 ibits)
 {
         struct inode *inode = dentry->d_inode;
         struct ptlrpc_request *req = NULL;
         struct ll_sb_info *sbi;
         struct obd_export *exp;
-        int rc;
+        int rc = 0;
         ENTRY;
 
         if (!inode) {
@@ -2162,14 +2091,14 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 if (IS_ERR(op_data))
                         RETURN(PTR_ERR(op_data));
 
-                oit.it_flags |= O_CHECK_STALE;
+                oit.it_create_mode |= M_CHECK_STALE;
                 rc = md_intent_lock(exp, op_data, NULL, 0,
                                     /* we are not interested in name
                                        based lookup */
                                     &oit, 0, &req,
                                     ll_md_blocking_ast, 0);
                 ll_finish_md_op_data(op_data);
-                oit.it_flags &= ~O_CHECK_STALE;
+                oit.it_create_mode &= ~M_CHECK_STALE;
                 if (rc < 0) {
                         rc = ll_inode_revalidate_fini(inode, rc);
                         GOTO (out, rc);
@@ -2186,16 +2115,14 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                    here to preserve get_cwd functionality on 2.6.
                    Bug 10503 */
                 if (!dentry->d_inode->i_nlink) {
-                        spin_lock(&ll_lookup_lock);
                         spin_lock(&dcache_lock);
                         ll_drop_dentry(dentry);
                         spin_unlock(&dcache_lock);
-                        spin_unlock(&ll_lookup_lock);
                 }
 
-                ll_lookup_finish_locks(&oit, dentry);
-        } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
-                                                     MDS_INODELOCK_LOOKUP)) {
+                ll_finish_locks(&oit, dentry);
+        } else if (!ll_have_md_lock(dentry->d_inode, ibits)) {
+
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
                 obd_valid valid = OBD_MD_FLGETATTR;
                 struct obd_capa *oc;
@@ -2220,21 +2147,31 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 }
 
                 rc = ll_prep_inode(&inode, req, NULL);
-                if (rc)
-                        GOTO(out, rc);
         }
+out:
+        ptlrpc_req_finished(req);
+        return rc;
+}
+
+int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
+{
+        int rc;
+        ENTRY;
+
+        rc = __ll_inode_revalidate_it(dentry, it, MDS_INODELOCK_UPDATE |
+                                                  MDS_INODELOCK_LOOKUP);
 
         /* if object not yet allocated, don't validate size */
-        if (ll_i2info(inode)->lli_smd == NULL)
-                GOTO(out, rc = 0);
+        if (rc == 0 && ll_i2info(dentry->d_inode)->lli_smd == NULL)
+                RETURN(0);
 
         /* cl_glimpse_size will prefer locally cached writes if they extend
          * the file */
-        rc = cl_glimpse_size(inode);
-        EXIT;
-out:
-        ptlrpc_req_finished(req);
-        return rc;
+
+        if (rc == 0)
+                rc = cl_glimpse_size(dentry->d_inode);
+
+        RETURN(rc);
 }
 
 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
@@ -2279,6 +2216,26 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
         return ll_getattr_it(mnt, de, &it, stat);
 }
 
+#ifdef HAVE_LINUX_FIEMAP_H
+int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
+                __u64 start, __u64 len)
+{
+        int rc;
+        struct ll_user_fiemap *fiemap = (struct ll_user_fiemap*)(
+                fieinfo->fi_extents_start - sizeof(ll_user_fiemap));
+
+        rc = ll_do_fiemap(inode, fiemap, sizeof(*fiemap) +
+                          fiemap->fm_extent_count *
+                          sizeof(struct ll_fiemap_extent));
+
+        fieinfo->fi_flags = fiemap->fm_flags;
+        fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
+
+        return rc;
+}
+#endif
+
+
 static
 int lustre_check_acl(struct inode *inode, int mask)
 {
@@ -2305,15 +2262,37 @@ int lustre_check_acl(struct inode *inode, int mask)
 }
 
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,10))
+#ifndef HAVE_INODE_PERMISION_2ARGS
 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
+#else
+int ll_inode_permission(struct inode *inode, int mask)
+#endif
 {
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
-               inode->i_ino, inode->i_generation, inode, mask);
+        int rc = 0;
+        ENTRY;
+
+       /* as root inode are NOT getting validated in lookup operation,
+        * need to do it before permission check. */
+
+        if (inode == inode->i_sb->s_root->d_inode) {
+                struct lookup_intent it = { .it_op = IT_LOOKUP };
+
+                rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
+                                              MDS_INODELOCK_LOOKUP);
+                if (rc)
+                        RETURN(rc);
+        }
+
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
+               inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
+
         if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
                 return lustre_check_remote_perm(inode, mask);
 
         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
-        return generic_permission(inode, mask, lustre_check_acl);
+        rc = generic_permission(inode, mask, lustre_check_acl);
+
+        RETURN(rc);
 }
 #else
 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
@@ -2390,7 +2369,12 @@ struct file_operations ll_file_operations = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
+#ifdef HAVE_KERNEL_SENDFILE
         .sendfile       = ll_file_sendfile,
+#endif
+#ifdef HAVE_KERNEL_SPLICE_READ
+        .splice_read    = ll_file_splice_read,
+#endif
         .fsync          = ll_fsync,
 };
 
@@ -2404,7 +2388,12 @@ struct file_operations ll_file_operations_flock = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
+#ifdef HAVE_KERNEL_SENDFILE
         .sendfile       = ll_file_sendfile,
+#endif
+#ifdef HAVE_KERNEL_SPLICE_READ
+        .splice_read    = ll_file_splice_read,
+#endif
         .fsync          = ll_fsync,
 #ifdef HAVE_F_OP_FLOCK
         .flock          = ll_file_flock,
@@ -2423,7 +2412,12 @@ struct file_operations ll_file_operations_noflock = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
+#ifdef HAVE_KERNEL_SENDFILE
         .sendfile       = ll_file_sendfile,
+#endif
+#ifdef HAVE_KERNEL_SPLICE_READ
+        .splice_read    = ll_file_splice_read,
+#endif
         .fsync          = ll_fsync,
 #ifdef HAVE_F_OP_FLOCK
         .flock          = ll_file_noflock,
@@ -2443,6 +2437,9 @@ struct inode_operations ll_file_inode_operations = {
         .getxattr       = ll_getxattr,
         .listxattr      = ll_listxattr,
         .removexattr    = ll_removexattr,
+#ifdef  HAVE_LINUX_FIEMAP_H
+        .fiemap         = ll_fiemap,
+#endif
 };
 
 /* dynamic ioctl number support routins */