Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
index 40c8749..0b7082c 100644 (file)
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
- *   Author: Andreas Dilger <adilger@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/llite/file.c
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
 #include <lustre_dlm.h>
 #include <lustre_lite.h>
+#include <lustre_mdc.h>
 #include <linux/pagemap.h>
 #include <linux/file.h>
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#include <linux/lustre_compat25.h>
-#endif
 #include "llite_internal.h"
+#include <lustre/ll_fiemap.h>
 
 /* also used by llite/special.c:ll_special_open() */
 struct ll_file_data *ll_file_data_get(void)
 {
         struct ll_file_data *fd;
 
-        OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
+        OBD_SLAB_ALLOC_PTR(fd, ll_file_data_slab);
         return fd;
 }
 
 static void ll_file_data_put(struct ll_file_data *fd)
 {
         if (fd != NULL)
-                OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof *fd);
+                OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
+}
+
+void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
+                          struct lustre_handle *fh)
+{
+        op_data->op_fid1 = ll_i2info(inode)->lli_fid;
+        op_data->op_attr.ia_mode = inode->i_mode;
+        op_data->op_attr.ia_atime = inode->i_atime;
+        op_data->op_attr.ia_mtime = inode->i_mtime;
+        op_data->op_attr.ia_ctime = inode->i_ctime;
+        op_data->op_attr.ia_size = i_size_read(inode);
+        op_data->op_attr_blocks = inode->i_blocks;
+        ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
+        op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
+        memcpy(&op_data->op_handle, fh, sizeof(op_data->op_handle));
+        op_data->op_capa1 = ll_mdscapa_get(inode);
+}
+
+static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
+                             struct obd_client_handle *och)
+{
+        ENTRY;
+
+        op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME_SET |
+                                 ATTR_MTIME_SET | ATTR_CTIME_SET;
+
+        if (!(och->och_flags & FMODE_WRITE))
+                goto out;
+
+        if (!(ll_i2mdexp(inode)->exp_connect_flags & OBD_CONNECT_SOM) ||
+            !S_ISREG(inode->i_mode))
+                op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
+        else
+                ll_epoch_close(inode, op_data, &och, 0);
+
+out:
+        ll_pack_inode2opdata(inode, op_data, &och->och_fh);
+        EXIT;
 }
 
-static int ll_close_inode_openhandle(struct inode *inode,
+static int ll_close_inode_openhandle(struct obd_export *md_exp,
+                                     struct inode *inode,
                                      struct obd_client_handle *och)
 {
+        struct obd_export *exp = ll_i2mdexp(inode);
+        struct md_op_data *op_data;
         struct ptlrpc_request *req = NULL;
-        struct obd_device *obd;
-        struct obdo *oa;
-        int rc;
+        struct obd_device *obd = class_exp2obd(exp);
+        int epoch_close = 1;
+        int seq_end = 0, rc;
         ENTRY;
 
-        obd = class_exp2obd(ll_i2mdcexp(inode));
         if (obd == NULL) {
+                /*
+                 * XXX: in case of LMV, is this correct to access
+                 * ->exp_handle?
+                 */
                 CERROR("Invalid MDC connection handle "LPX64"\n",
-                       ll_i2mdcexp(inode)->exp_handle.h_cookie);
+                       ll_i2mdexp(inode)->exp_handle.h_cookie);
                 GOTO(out, rc = 0);
         }
 
         /*
          * here we check if this is forced umount. If so this is called on
-         * canceling "open lock" and we do not call mdc_close() in this case, as
+         * canceling "open lock" and we do not call md_close() in this case, as
          * it will not be successful, as import is already deactivated.
          */
-        if (obd->obd_no_recov)
+        if (obd->obd_force)
                 GOTO(out, rc = 0);
 
-        oa = obdo_alloc();
-        if (!oa)
-                RETURN(-ENOMEM); // XXX We leak openhandle and request here.
-
-        oa->o_id = inode->i_ino;
-        oa->o_valid = OBD_MD_FLID;
-        obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                                   OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                   OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                   OBD_MD_FLCTIME);
-        if (0 /* ll_is_inode_dirty(inode) */) {
-                oa->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
-                oa->o_valid |= OBD_MD_FLFLAGS;
-        }
-
-        rc = mdc_close(ll_i2mdcexp(inode), oa, och, &req);
-        if (rc == EAGAIN) {
-                /* We are the last writer, so the MDS has instructed us to get
-                 * the file size and any write cookies, then close again. */
-                //ll_queue_done_writing(inode);
-                rc = 0;
+        OBD_ALLOC_PTR(op_data);
+        if (op_data == NULL)
+                GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
+
+        ll_prepare_close(inode, op_data, och);
+        epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
+        rc = md_close(md_exp, op_data, och->och_mod, &req);
+        if (rc != -EAGAIN)
+                seq_end = 1;
+
+        if (rc == -EAGAIN) {
+                /* This close must have the epoch closed. */
+                LASSERT(exp->exp_connect_flags & OBD_CONNECT_SOM);
+                LASSERT(epoch_close);
+                /* MDS has instructed us to obtain Size-on-MDS attribute from
+                 * OSTs and send setattr to back to MDS. */
+                rc = ll_sizeonmds_update(inode, och->och_mod,
+                                         &och->och_fh, op_data->op_ioepoch);
+                if (rc) {
+                        CERROR("inode %lu mdc Size-on-MDS update failed: "
+                               "rc = %d\n", inode->i_ino, rc);
+                        rc = 0;
+                }
         } else if (rc) {
                 CERROR("inode %lu mdc close failed: rc = %d\n",
                        inode->i_ino, rc);
         }
-
-        obdo_free(oa);
+        ll_finish_md_op_data(op_data);
 
         if (rc == 0) {
                 rc = ll_objects_destroy(req, inode);
@@ -106,22 +168,32 @@ static int ll_close_inode_openhandle(struct inode *inode,
                                inode->i_ino, rc);
         }
 
-        ptlrpc_req_finished(req); /* This is close request */
         EXIT;
 out:
-        mdc_clear_open_replay_data(och);
-
+      
+        if ((exp->exp_connect_flags & OBD_CONNECT_SOM) && !epoch_close &&
+            S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
+                ll_queue_done_writing(inode, LLIF_DONE_WRITING);
+        } else {
+                if (seq_end)
+                        ptlrpc_close_replay_seq(req);
+                md_clear_open_replay_data(md_exp, och);
+                /* Free @och if it is not waiting for DONE_WRITING. */
+                och->och_fh.cookie = DEAD_HANDLE_MAGIC;
+                OBD_FREE_PTR(och);
+        }
+        if (req) /* This is close request */
+                ptlrpc_req_finished(req);
         return rc;
 }
 
-int ll_mdc_real_close(struct inode *inode, int flags)
+int ll_md_real_close(struct inode *inode, int flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        int rc = 0;
         struct obd_client_handle **och_p;
         struct obd_client_handle *och;
         __u64 *och_usecount;
-
+        int rc = 0;
         ENTRY;
 
         if (flags & FMODE_WRITE) {
@@ -130,7 +202,7 @@ int ll_mdc_real_close(struct inode *inode, int flags)
         } else if (flags & FMODE_EXEC) {
                 och_p = &lli->lli_mds_exec_och;
                 och_usecount = &lli->lli_open_fd_exec_count;
-         } else {
+        } else {
                 LASSERT(flags & FMODE_READ);
                 och_p = &lli->lli_mds_read_och;
                 och_usecount = &lli->lli_open_fd_read_count;
@@ -148,16 +220,15 @@ int ll_mdc_real_close(struct inode *inode, int flags)
 
         if (och) { /* There might be a race and somebody have freed this och
                       already */
-                rc = ll_close_inode_openhandle(inode, och);
-                och->och_fh.cookie = DEAD_HANDLE_MAGIC;
-                OBD_FREE(och, sizeof *och);
+                rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
+                                               inode, och);
         }
 
         RETURN(rc);
 }
 
-int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
-                        struct file *file)
+int ll_md_close(struct obd_export *md_exp, struct inode *inode,
+                struct file *file)
 {
         struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -179,8 +250,6 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
                 int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
                 struct lustre_handle lockh;
                 struct inode *inode = file->f_dentry->d_inode;
-                struct ldlm_res_id file_res_id = {.name={inode->i_ino,
-                                                         inode->i_generation}};
                 ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
 
                 down(&lli->lli_och_sem);
@@ -199,11 +268,11 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
                 }
                 up(&lli->lli_och_sem);
 
-                if (!ldlm_lock_match(mdc_exp->exp_obd->obd_namespace, flags,
-                                     &file_res_id, LDLM_IBITS, &policy,lockmode,
-                                     &lockh)) {
-                        rc = ll_mdc_real_close(file->f_dentry->d_inode,
-                                                fd->fd_omode);
+                if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
+                                   LDLM_IBITS, &policy, lockmode,
+                                   &lockh)) {
+                        rc = ll_md_real_close(file->f_dentry->d_inode,
+                                              fd->fd_omode);
                 }
         } else {
                 CERROR("Releasing a file %p with negative dentry %p. Name %s",
@@ -212,6 +281,7 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode,
 
         LUSTRE_FPRIVATE(file) = NULL;
         ll_file_data_put(fd);
+        ll_capa_close(inode);
 
         RETURN(rc);
 }
@@ -234,22 +304,43 @@ int ll_file_release(struct inode *inode, struct file *file)
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
-        ll_vfs_ops_tally(sbi, VFS_OPS_RELEASE);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
-
-        lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_RELEASE);
+#ifdef CONFIG_FS_POSIX_ACL
+        if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
+            inode == inode->i_sb->s_root->d_inode) {
+                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+
+                LASSERT(fd != NULL);
+                if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
+                        fd->fd_flags &= ~LL_FILE_RMTACL;
+                        rct_del(&sbi->ll_rct, cfs_curproc_pid());
+                        et_search_free(&sbi->ll_et, cfs_curproc_pid());
+                }
+        }
+#endif
 
+        if (inode->i_sb->s_root != file->f_dentry)
+                ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
         fd = LUSTRE_FPRIVATE(file);
         LASSERT(fd != NULL);
 
+        /* The last ref on @file, maybe not the the owner pid of statahead.
+         * Different processes can open the same dir, "ll_opendir_key" means:
+         * it is me that should stop the statahead thread. */
+        if (lli->lli_opendir_key == fd)
+                ll_stop_statahead(inode, fd);
+
+        if (inode->i_sb->s_root == file->f_dentry) {
+                LUSTRE_FPRIVATE(file) = NULL;
+                ll_file_data_put(fd);
+                RETURN(0);
+        }
+        
         if (lsm)
                 lov_test_and_clear_async_rc(lsm);
         lli->lli_async_rc = 0;
 
-        rc = ll_mdc_close(sbi->ll_mdc_exp, inode, file);
+        rc = ll_md_close(sbi->ll_md_exp, inode, file);
         RETURN(rc);
 }
 
@@ -257,19 +348,17 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                                int lmmsize, struct lookup_intent *itp)
 {
         struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
-        struct mdc_op_data data;
         struct dentry *parent = file->f_dentry->d_parent;
         const char *name = file->f_dentry->d_name.name;
         const int len = file->f_dentry->d_name.len;
-        struct inode *inode = file->f_dentry->d_inode;
+        struct md_op_data *op_data;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
 
-        ll_prepare_mdc_op_data(&data, parent->d_inode, inode, name, len, O_RDWR);
-
         /* Usually we come here only for NFSD, and we want open lock.
            But we can also get here with pre 2.6.15 patchless kernels, and in
            that case that lock is also ok */
@@ -282,29 +371,38 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         if (!lmm && !lmmsize)
                 itp->it_flags |= MDS_OPEN_LOCK;
 
-        rc = mdc_intent_lock(sbi->ll_mdc_exp, &data, lmm, lmmsize, itp,
-                              0 /*unused */, &req, ll_mdc_blocking_ast, 0);
+        op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
+                                      file->f_dentry->d_inode, name, len,
+                                      O_RDWR, LUSTRE_OPC_ANY, NULL);
+        if (IS_ERR(op_data))
+                RETURN(PTR_ERR(op_data));
+
+        rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
+                            0 /*unused */, &req, ll_md_blocking_ast, 0);
+        ll_finish_md_op_data(op_data);
         if (rc == -ESTALE) {
                 /* reason for keep own exit path - don`t flood log
                 * with messages with -ESTALE errors.
                 */
-                if (!it_disposition(itp, DISP_OPEN_OPEN))
+                if (!it_disposition(itp, DISP_OPEN_OPEN) || 
+                     it_open_error(DISP_OPEN_OPEN, itp))
                         GOTO(out, rc);
                 ll_release_openhandle(file->f_dentry, itp);
                 GOTO(out_stale, rc);
         }
 
-        if (rc != 0) {
-               CERROR("lock enqueue: err: %d\n", rc);
-               GOTO(out, rc);
+        if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
+                rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
+                CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
+                GOTO(out, rc);
         }
 
         if (itp->d.lustre.it_lock_mode)
-                mdc_set_lock_data(&itp->d.lustre.it_lock_handle,
-                                  inode);
+                md_set_lock_data(sbi->ll_md_exp,
+                                 &itp->d.lustre.it_lock_handle, 
+                                 file->f_dentry->d_inode);
 
-        rc = ll_prep_inode(sbi->ll_osc_exp, &file->f_dentry->d_inode,
-                           req, DLM_REPLY_REC_OFF, NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
 out:
         ptlrpc_req_finished(itp->d.lustre.it_data);
 
@@ -315,41 +413,56 @@ out_stale:
         RETURN(rc);
 }
 
-
-static void ll_och_fill(struct ll_inode_info *lli, struct lookup_intent *it,
-                        struct obd_client_handle *och)
+static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
+                       struct lookup_intent *it, struct obd_client_handle *och)
 {
         struct ptlrpc_request *req = it->d.lustre.it_data;
-        struct mds_body *body;
+        struct mdt_body *body;
 
         LASSERT(och);
 
-        body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
-        LASSERT(body != NULL);                  /* reply already checked out */
-        LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in mdc_enqueue */
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+        LASSERT(body != NULL);                      /* reply already checked out */
 
         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
-        lli->lli_io_epoch = body->io_epoch;
+        och->och_fid = lli->lli_fid;
+        och->och_flags = it->it_flags;
+        lli->lli_ioepoch = body->ioepoch;
 
-        mdc_set_open_replay_data(och, it->d.lustre.it_data);
+        return md_set_open_replay_data(md_exp, och, req);
 }
 
 int ll_local_open(struct file *file, struct lookup_intent *it,
                   struct ll_file_data *fd, struct obd_client_handle *och)
 {
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
         ENTRY;
 
         LASSERT(!LUSTRE_FPRIVATE(file));
 
         LASSERT(fd != NULL);
 
-        if (och)
-                ll_och_fill(ll_i2info(file->f_dentry->d_inode), it, och);
+        if (och) {
+                struct ptlrpc_request *req = it->d.lustre.it_data;
+                struct mdt_body *body;
+                int rc;
+
+                rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
+                if (rc)
+                        RETURN(rc);
+
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+                if ((it->it_flags & FMODE_WRITE) &&
+                    (body->valid & OBD_MD_FLSIZE))
+                        CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
+                               lli->lli_ioepoch, PFID(&lli->lli_fid));
+        }
+
         LUSTRE_FPRIVATE(file) = fd;
-        ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
+        ll_readahead_init(inode, &fd->fd_ras);
         fd->fd_omode = it->it_flags;
-
         RETURN(0);
 }
 
@@ -360,7 +473,7 @@ int ll_local_open(struct file *file, struct lookup_intent *it,
  * stripe MD to the MDS, or try to destroy the objects if that fails.
  *
  * If we already have the stripe MD locally then we don't request it in
- * mdc_open(), by passing a lmm_size = 0.
+ * md_open(), by passing a lmm_size = 0.
  *
  * It is up to the application to ensure no other processes open this file
  * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
@@ -378,18 +491,13 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct obd_client_handle **och_p;
         __u64 *och_usecount;
         struct ll_file_data *fd;
-        int rc = 0;
+        int rc = 0, opendir_set = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_OPEN);
-
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
 
-#ifdef LUSTRE_KERNEL_VERSION
+#ifdef HAVE_VFS_INTENT_PATCHES
         it = file->f_it;
 #else
         it = file->private_data; /* XXX: compat macro */
@@ -400,9 +508,38 @@ int ll_file_open(struct inode *inode, struct file *file)
         if (fd == NULL)
                 RETURN(-ENOMEM);
 
+        if (S_ISDIR(inode->i_mode)) {
+                spin_lock(&lli->lli_lock);
+                /* "lli->lli_opendir_pid != 0" means someone has set it.
+                 * "lli->lli_sai != NULL" means the previous statahead has not
+                 *                        been cleanup. */ 
+                if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
+                        opendir_set = 1;
+                        lli->lli_opendir_pid = cfs_curproc_pid();
+                        lli->lli_opendir_key = fd;
+                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
+                        /* Two cases for this:
+                         * (1) The same process open such directory many times.
+                         * (2) The old process opened the directory, and exited
+                         *     before its children processes. Then new process
+                         *     with the same pid opens such directory before the
+                         *     old process's children processes exit.
+                         * Change the owner to the latest one. */
+                        opendir_set = 2;
+                        lli->lli_opendir_key = fd;
+                }
+                spin_unlock(&lli->lli_lock);
+        }
+
+        if (inode->i_sb->s_root == file->f_dentry) {
+                LUSTRE_FPRIVATE(file) = fd;
+                RETURN(0);
+        }
+
         if (!it || !it->d.lustre.it_disposition) {
                 /* Convert f_flags into access mode. We cannot use file->f_mode,
-                 * because everything but O_ACCMODE mask was stripped from it */
+                 * because everything but O_ACCMODE mask was stripped from
+                 * there */
                 if ((oit.it_flags + 1) & O_ACCMODE)
                         oit.it_flags++;
                 if (file->f_flags & O_TRUNC)
@@ -422,6 +559,7 @@ int ll_file_open(struct inode *inode, struct file *file)
                 it = &oit;
         }
 
+restart:
         /* Let's see if we have file open on MDS already. */
         if (it->it_flags & FMODE_WRITE) {
                 och_p = &lli->lli_mds_write_och;
@@ -433,35 +571,46 @@ int ll_file_open(struct inode *inode, struct file *file)
                 och_p = &lli->lli_mds_read_och;
                 och_usecount = &lli->lli_open_fd_read_count;
         }
-
-        LASSERTF(it->it_flags != 0, "it %p dist %d \n", it,
-                 it->d.lustre.it_disposition);
-
+        
         down(&lli->lli_och_sem);
         if (*och_p) { /* Open handle is present */
                 if (it_disposition(it, DISP_OPEN_OPEN)) {
                         /* Well, there's extra open request that we do not need,
                            let's close it somehow. This will decref request. */
+                        rc = it_open_error(DISP_OPEN_OPEN, it);
+                        if (rc) {
+                                up(&lli->lli_och_sem);
+                                ll_file_data_put(fd);
+                                GOTO(out_openerr, rc);
+                        }       
                         ll_release_openhandle(file->f_dentry, it);
+                        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
+                                             LPROC_LL_OPEN);
                 }
                 (*och_usecount)++;
 
                 rc = ll_local_open(file, it, fd, NULL);
-
-                LASSERTF(rc == 0, "rc = %d\n", rc);
-        } else {
-                LASSERT(*och_usecount == 0);
-                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
-                if (!*och_p) {
+                if (rc) {
+                        (*och_usecount)--;
+                        up(&lli->lli_och_sem);
                         ll_file_data_put(fd);
-                        GOTO(out_och_free, rc = -ENOMEM);
+                        GOTO(out_openerr, rc);
                 }
-                (*och_usecount)++;
+        } else {
+                LASSERT(*och_usecount == 0);
                 if (!it->d.lustre.it_disposition) {
+                        /* We cannot just request lock handle now, new ELC code
+                           means that one of other OPEN locks for this file
+                           could be cancelled, and since blocking ast handler
+                           would attempt to grab och_sem as well, that would
+                           result in a deadlock */
+                        up(&lli->lli_och_sem);
+                        it->it_flags |= O_CHECK_STALE;
                         rc = ll_intent_file_open(file, NULL, 0, it);
+                        it->it_flags &= ~O_CHECK_STALE;
                         if (rc) {
                                 ll_file_data_put(fd);
-                                GOTO(out_och_free, rc);
+                                GOTO(out_openerr, rc);
                         }
 
                         /* Got some error? Release the request */
@@ -469,12 +618,20 @@ int ll_file_open(struct inode *inode, struct file *file)
                                 req = it->d.lustre.it_data;
                                 ptlrpc_req_finished(req);
                         }
-                        mdc_set_lock_data(&it->d.lustre.it_lock_handle,
-                                          file->f_dentry->d_inode);
+                        md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
+                                         &it->d.lustre.it_lock_handle,
+                                         file->f_dentry->d_inode);
+                        goto restart;
                 }
+                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
+                if (!*och_p) {
+                        ll_file_data_put(fd);
+                        GOTO(out_och_free, rc = -ENOMEM);
+                }
+                (*och_usecount)++;
                 req = it->d.lustre.it_data;
 
-                /* mdc_intent_lock() didn't get a request ref if there was an
+                /* md_intent_lock() didn't get a request ref if there was an
                  * open error, so don't do cleanup on the request here
                  * (bug 3430) */
                 /* XXX (green): Should not we bail out on any error here, not
@@ -485,9 +642,12 @@ int ll_file_open(struct inode *inode, struct file *file)
                         GOTO(out_och_free, rc);
                 }
 
-                lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
+                ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
                 rc = ll_local_open(file, it, fd, *och_p);
-                LASSERTF(rc == 0, "rc = %d\n", rc);
+                if (rc) {
+                        ll_file_data_put(fd);
+                        GOTO(out_och_free, rc);
+                }
         }
         up(&lli->lli_och_sem);
 
@@ -497,6 +657,8 @@ int ll_file_open(struct inode *inode, struct file *file)
         if (!S_ISREG(inode->i_mode))
                 GOTO(out, rc);
 
+        ll_capa_open(inode);
+
         lsm = lli->lli_smd;
         if (lsm == NULL) {
                 if (file->f_flags & O_LOV_DELAY_CREATE ||
@@ -507,87 +669,92 @@ int ll_file_open(struct inode *inode, struct file *file)
         }
         file->f_flags &= ~O_LOV_DELAY_CREATE;
         GOTO(out, rc);
- out:
+out:
         ptlrpc_req_finished(req);
         if (req)
                 it_clear_disposition(it, DISP_ENQ_OPEN_REF);
-        if (rc == 0) {
-                ll_open_complete(inode);
-        } else {
 out_och_free:
+        if (rc) {
                 if (*och_p) {
                         OBD_FREE(*och_p, sizeof (struct obd_client_handle));
                         *och_p = NULL; /* OBD_FREE writes some magic there */
                         (*och_usecount)--;
                 }
                 up(&lli->lli_och_sem);
+out_openerr:
+                if (opendir_set == 1) {
+                        lli->lli_opendir_key = NULL;
+                        lli->lli_opendir_pid = 0;
+                } else if (unlikely(opendir_set == 2)) {
+                        ll_stop_statahead(inode, fd);
+                }
         }
+
         return rc;
 }
 
 /* Fills the obdo with the attributes for the inode defined by lsm */
-int ll_lsm_getattr(struct obd_export *exp, struct lov_stripe_md *lsm,
-                   struct obdo *oa)
+int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
 {
         struct ptlrpc_request_set *set;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
+
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
 
         LASSERT(lsm != NULL);
 
-        memset(oa, 0, sizeof *oa);
         oinfo.oi_md = lsm;
-        oinfo.oi_oa = oa;
-        oa->o_id = lsm->lsm_object_id;
-        oa->o_mode = S_IFREG;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
-                OBD_MD_FLCTIME;
+        oinfo.oi_oa = obdo;
+        oinfo.oi_oa->o_id = lsm->lsm_object_id;
+        oinfo.oi_oa->o_gr = lsm->lsm_object_gr;
+        oinfo.oi_oa->o_mode = S_IFREG;
+        oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
+                               OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                               OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
+                               OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                               OBD_MD_FLGROUP;
+        oinfo.oi_capa = ll_mdscapa_get(inode);
 
         set = ptlrpc_prep_set();
         if (set == NULL) {
+                CERROR("can't allocate ptlrpc set\n");
                 rc = -ENOMEM;
         } else {
-                rc = obd_getattr_async(exp, &oinfo, set);
+                rc = obd_getattr_async(ll_i2dtexp(inode), &oinfo, set);
                 if (rc == 0)
                         rc = ptlrpc_set_wait(set);
                 ptlrpc_set_destroy(set);
         }
+        capa_put(oinfo.oi_capa);
         if (rc)
                 RETURN(rc);
 
-        oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
-                        OBD_MD_FLCTIME | OBD_MD_FLSIZE);
-        RETURN(0);
-}
-
-static inline void ll_remove_suid(struct inode *inode)
-{
-        unsigned int mode;
-
-        /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
-        mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
+        oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
+                                 OBD_MD_FLATIME | OBD_MD_FLMTIME |
+                                 OBD_MD_FLCTIME | OBD_MD_FLSIZE);
 
-        /* was any of the uid bits set? */
-        mode &= inode->i_mode;
-        if (mode && !capable(CAP_FSETID)) {
-                inode->i_mode &= ~mode;
-                // XXX careful here - we cannot change the size
-        }
+        obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
+               lli->lli_smd->lsm_object_id, i_size_read(inode),
+               (unsigned long long)inode->i_blocks,
+               (unsigned long)ll_inode_blksize(inode));
+        RETURN(0);
 }
 
 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
-        struct obd_export *exp = ll_i2obdexp(inode);
+        struct obd_export *exp = ll_i2dtexp(inode);
         struct {
                 char name[16];
                 struct ldlm_lock *lock;
-                struct lov_stripe_md *lsm;
-        } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+        } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
         __u32 stripe, vallen = sizeof(stripe);
+        struct lov_oinfo *loinfo;
         int rc;
         ENTRY;
 
@@ -595,7 +762,7 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
                 GOTO(check, stripe = 0);
 
         /* get our offset in the lov */
-        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
+        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
         if (rc != 0) {
                 CERROR("obd_get_info: rc = %d\n", rc);
                 RETURN(rc);
@@ -603,163 +770,103 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
         LASSERT(stripe < lsm->lsm_stripe_count);
 
 check:
-        if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
-            lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[1]){
+        loinfo = lsm->lsm_oinfo[stripe];
+        if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
+                            &lock->l_resource->lr_name)){
                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
-                           lsm->lsm_oinfo[stripe]->loi_id,
-                           lsm->lsm_oinfo[stripe]->loi_gr);
+                           loinfo->loi_id, loinfo->loi_gr);
                 RETURN(-ELDLM_NO_LOCK_DATA);
         }
 
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
         ENTRY;
 
-        memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               inode->i_size);
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = inode->i_size ? (inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(inode->i_mapping);
-                if (ll_teardown_mmaps(inode->i_mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
-
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
-
-                if (!mapping_has_pages(inode->i_mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
-
-                cond_resched();
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                page = find_get_page(inode->i_mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
+        mapping = page->mapping;
+
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
+
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
                 lock_page(page);
-
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        if (rc != 0)
-                                CERROR("writepage of page %p failed: %d\n",
-                                       page, rc);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page  b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-                if (rc2 == 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        // checking again to account for writeback's lock_page()
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, inode->i_mapping);
-                        ll_truncate_complete_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+                set_bit(AS_EIO, &mapping->flags);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                /* checking again to account for writeback's lock_page() */
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -767,61 +874,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-                //ll_try_done_writing(inode);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_queue_done_writing(inode, 0);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -857,16 +940,16 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lvb = lock->l_lvb_data;
                 lsm->lsm_oinfo[stripe].loi_rss = lvb->lvb_size;
 
-                LOCK_INODE_MUTEX(inode);
                 lock_res_and_lock(lock);
+                ll_inode_size_lock(inode, 1);
                 kms = MAX(lsm->lsm_oinfo[stripe].loi_kms, lvb->lvb_size);
                 kms = ldlm_extent_shift_kms(NULL, kms);
                 if (lsm->lsm_oinfo[stripe].loi_kms != kms)
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
+                ll_inode_size_unlock(inode, 1);
                 unlock_res_and_lock(lock);
-                UNLOCK_INODE_MUTEX(inode);
         }
 
 iput:
@@ -887,7 +970,6 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         struct lov_stripe_md *lsm;
         struct ost_lvb *lvb;
         int rc, stripe;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
         ENTRY;
 
         if (inode == NULL)
@@ -904,13 +986,16 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         if (stripe < 0)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             sizeof(*lvb));
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc) {
                 CERROR("lustre_pack_reply: %d\n", rc);
                 GOTO(iput, rc);
         }
 
-        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
+        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
         lvb->lvb_atime = LTIME_S(inode->i_atime);
@@ -918,7 +1003,7 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 
         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
-                   inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
+                   i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
                    lvb->lvb_atime, lvb->lvb_ctime);
  iput:
         iput(inode);
@@ -933,21 +1018,68 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         return rc;
 }
 
+static int ll_merge_lvb(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ost_lvb lvb;
+        int rc;
+
+        ENTRY;
+
+        ll_inode_size_lock(inode, 1);
+        inode_init_lvb(inode, &lvb);
+        rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
+        i_size_write(inode, lvb.lvb_size);
+        inode->i_blocks = lvb.lvb_blocks;
+
+        LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+        LTIME_S(inode->i_atime) = lvb.lvb_atime;
+        LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
+        ll_inode_size_unlock(inode, 1);
+
+        RETURN(rc);
+}
+
+int ll_local_size(struct inode *inode)
+{
+        ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct lustre_handle lockh = { 0 };
+        int flags = 0;
+        int rc;
+        ENTRY;
+
+        if (lli->lli_smd->lsm_stripe_count == 0)
+                RETURN(0);
+
+        rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
+                       &policy, LCK_PR, &flags, inode, &lockh);
+        if (rc < 0)
+                RETURN(rc);
+        else if (rc == 0)
+                RETURN(-ENODATA);
+
+        rc = ll_merge_lvb(inode);
+        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
+        RETURN(rc);
+}
+
 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
                      lstat_t *st)
 {
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         struct ost_lvb lvb;
         int rc;
-        
+
         ENTRY;
-        
+
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = LDLM_FL_HAS_INTENT;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -955,8 +1087,9 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = LDLM_FL_HAS_INTENT;
 
-        rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
+        rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
                 RETURN(rc);
         if (rc != 0) {
@@ -964,17 +1097,17 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
                        "returning -EIO\n", rc);
                 RETURN(rc > 0 ? -EIO : rc);
         }
-        
+
         lov_stripe_lock(lsm);
         memset(&lvb, 0, sizeof(lvb));
-        obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 0);
+        obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
         st->st_size = lvb.lvb_size;
         st->st_blocks = lvb.lvb_blocks;
         st->st_mtime = lvb.lvb_mtime;
         st->st_atime = lvb.lvb_atime;
         st->st_ctime = lvb.lvb_ctime;
         lov_stripe_unlock(lsm);
-        
+
         RETURN(rc);
 }
 
@@ -985,12 +1118,14 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
-        struct ost_lvb lvb;
         int rc;
         ENTRY;
 
+        if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
+                RETURN(0);
+
         CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
 
         if (!lli->lli_smd) {
@@ -1007,8 +1142,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1016,8 +1150,9 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
 
-        rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);
+        rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
                 RETURN(rc);
         if (rc != 0) {
@@ -1025,18 +1160,10 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
                 RETURN(rc > 0 ? -EIO : rc);
         }
 
-        ll_inode_size_lock(inode, 1);
-        inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);
-        inode->i_size = lvb.lvb_size;
-        inode->i_blocks = lvb.lvb_blocks;
-        LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
-        LTIME_S(inode->i_atime) = lvb.lvb_atime;
-        LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
-        ll_inode_size_unlock(inode, 1);
+        rc = ll_merge_lvb(inode);
 
-        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
-               inode->i_size, inode->i_blocks);
+        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
+               i_size_read(inode), (unsigned long long)inode->i_blocks);
 
         RETURN(rc);
 }
@@ -1048,7 +1175,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ost_lvb lvb;
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -1070,70 +1197,396 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_flags = ast_flags;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
 
-        oinfo.oi_policy = *policy;
-        oinfo.oi_lockh = lockh;
-        oinfo.oi_md = lsm;
+        oinfo.oi_policy = *policy;
+        oinfo.oi_lockh = lockh;
+        oinfo.oi_md = lsm;
+        oinfo.oi_flags = ast_flags;
+
+        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
+        *policy = oinfo.oi_policy;
+        if (rc > 0)
+                rc = -EIO;
+
+        ll_inode_size_lock(inode, 1);
+        inode_init_lvb(inode, &lvb);
+        obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
+
+        if (policy->l_extent.start == 0 &&
+            policy->l_extent.end == OBD_OBJECT_EOF) {
+                /* vmtruncate()->ll_truncate() first sets the i_size and then
+                 * the kms under both a DLM lock and the
+                 * ll_inode_size_lock().  If we don't get the
+                 * ll_inode_size_lock() here we can match the DLM lock and
+                 * reset i_size from the kms before the truncating path has
+                 * updated the kms.  generic_file_write can then trust the
+                 * stale i_size when doing appending writes and effectively
+                 * cancel the result of the truncate.  Getting the
+                 * ll_inode_size_lock() after the enqueue maintains the DLM
+                 * -> ll_inode_size_lock() acquiring order. */
+                i_size_write(inode, lvb.lvb_size);
+                CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
+                       inode->i_ino, i_size_read(inode));
+        }
+
+        if (rc == 0) {
+                LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+                LTIME_S(inode->i_atime) = lvb.lvb_atime;
+                LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
+        }
+        ll_inode_size_unlock(inode, 1);
+
+        RETURN(rc);
+}
+
+int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
+                     struct lov_stripe_md *lsm, int mode,
+                     struct lustre_handle *lockh)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        int rc;
+        ENTRY;
+
+        /* XXX phil: can we do this?  won't it screw the file size up? */
+        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+            (sbi->ll_flags & LL_SBI_NOLCK))
+                RETURN(0);
+
+        rc = obd_cancel(sbi->ll_dt_exp, lsm, mode, lockh);
+
+        RETURN(rc);
+}
+
+static void ll_set_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        cfs_time_t now = cfs_time_current();
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_contention_time = now;
+        lli->lli_flags |= LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_flags &= ~LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        ENTRY;
+
+        if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+                CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+                       " osc connect flags = 0x"LPX64"\n",
+                       sbi->ll_lco.lco_flags);
+                RETURN(0);
+        }
+        if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+                RETURN(1);
+        if (lli->lli_flags & LLIF_CONTENDED) {
+                cfs_time_t cur_time = cfs_time_current();
+                cfs_time_t retry_time;
+
+                retry_time = cfs_time_add(
+                        lli->lli_contention_time,
+                        cfs_time_seconds(sbi->ll_contention_time));
+                if (cfs_time_after(cur_time, retry_time)) {
+                        ll_clear_file_contended(inode);
+                        RETURN(0);
+                }
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+                                 const char *buf, size_t count,
+                                 loff_t start, loff_t end, int rw)
+{
+        int append;
+        int tree_locked = 0;
+        int rc;
+        struct inode * inode = file->f_dentry->d_inode;
+        ENTRY;
+
+        append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
+
+        if (append || !ll_is_file_contended(file)) {
+                struct ll_lock_tree_node *node;
+                int ast_flags;
+
+                ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+                if (file->f_flags & O_NONBLOCK)
+                        ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+                node = ll_node_from_inode(inode, start, end,
+                                          (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
+                if (IS_ERR(node)) {
+                        rc = PTR_ERR(node);
+                        GOTO(out, rc);
+                }
+                tree->lt_fd = LUSTRE_FPRIVATE(file);
+                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                if (rc == 0)
+                        tree_locked = 1;
+                else if (rc == -EUSERS)
+                        ll_set_file_contended(inode);
+                else
+                        GOTO(out, rc);
+        }
+        RETURN(tree_locked);
+out:
+        return rc;
+}
+
+/**
+ * Checks if requested extent lock is compatible with a lock under a page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param page the page under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param start start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced or
+ * \post result == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_release_short_lock
+ */
+static int ll_reget_short_lock(struct page *page, int rw,
+                               obd_off start, obd_off end,
+                               void **cookie)
+{
+        struct ll_async_page *llap;
+        struct obd_export *exp;
+        struct inode *inode = page->mapping->host;
+
+        ENTRY;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                RETURN(0);
+
+        llap = llap_cast_private(page);
+        if (llap == NULL)
+                RETURN(0);
+
+        RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
+                                    &llap->llap_cookie, rw, start, end,
+                                    cookie));
+}
+
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_reget_short_lock
+ */
+static void ll_release_short_lock(struct inode *inode, obd_off end,
+                                  void *cookie, int rw)
+{
+        struct obd_export *exp;
+        int rc;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                return;
 
-        rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo);
-        *policy = oinfo.oi_policy;
-        if (rc > 0)
-                rc = -EIO;
+        rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
+                                    cookie, rw);
+        if (rc < 0)
+                CERROR("unlock failed (%d)\n", rc);
+}
 
-        ll_inode_size_lock(inode, 1);
-        inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);
+/**
+ * Checks if requested extent lock is compatible
+ * with a lock under a page in page cache.
+ *
+ * Checks if a lock under some \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file the file under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ * \param buf userspace buffer for the data
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced
+ * \post retuls == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_file_put_fast_lock
+ */
+static inline int ll_file_get_fast_lock(struct file *file,
+                                        obd_off ppos, obd_off end,
+                                        char *buf, void **cookie, int rw)
+{
+        int rc = 0;
+        struct page *page;
 
-        if (policy->l_extent.start == 0 &&
-            policy->l_extent.end == OBD_OBJECT_EOF) {
-                /* vmtruncate()->ll_truncate() first sets the i_size and then
-                 * the kms under both a DLM lock and the
-                 * ll_inode_size_lock().  If we don't get the
-                 * ll_inode_size_lock() here we can match the DLM lock and
-                 * reset i_size from the kms before the truncating path has
-                 * updated the kms.  generic_file_write can then trust the
-                 * stale i_size when doing appending writes and effectively
-                 * cancel the result of the truncate.  Getting the
-                 * ll_inode_size_lock() after the enqueue maintains the DLM
-                 * -> ll_inode_size_lock() acquiring order. */
-                inode->i_size = lvb.lvb_size;
-                CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
-                       inode->i_ino, inode->i_size);
-        }
+        ENTRY;
 
-        if (rc == 0) {
-                LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
-                LTIME_S(inode->i_atime) = lvb.lvb_atime;
-                LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
+        if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
+                page = find_lock_page(file->f_dentry->d_inode->i_mapping,
+                                      ppos >> CFS_PAGE_SHIFT);
+                if (page) {
+                        if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+                                rc = 1;
+
+                        unlock_page(page);
+                        page_cache_release(page);
+                }
         }
-        ll_inode_size_unlock(inode, 1);
 
         RETURN(rc);
 }
 
-int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
-                     struct lov_stripe_md *lsm, int mode,
-                     struct lustre_handle *lockh)
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_fast_lock
+ */
+static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
+                                         void *cookie, int rw)
+{
+        ll_release_short_lock(inode, end, cookie, rw);
+}
+
+enum ll_lock_style {
+        LL_LOCK_STYLE_NOLOCK   = 0,
+        LL_LOCK_STYLE_FASTLOCK = 1,
+        LL_LOCK_STYLE_TREELOCK = 2
+};
+
+/**
+ * Checks if requested extent lock is compatible with a lock 
+ * under a page cache page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file file under which I/O is processed
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ * \param buf userspace buffer for the data
+ *
+ * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
+ * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
+ * \retval LL_LOCK_STYLE_NOLOCK got no lock
+ *
+ * \see ll_file_put_lock
+ */
+static inline int ll_file_get_lock(struct file *file, obd_off ppos,
+                                   obd_off end, char *buf, void **cookie,
+                                   struct ll_lock_tree *tree, int rw)
 {
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         int rc;
+
         ENTRY;
 
-        /* XXX phil: can we do this?  won't it screw the file size up? */
-        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
-            (sbi->ll_flags & LL_SBI_NOLCK))
-                RETURN(0);
+        if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
+                RETURN(LL_LOCK_STYLE_FASTLOCK);
 
-        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
+        rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
+        /* rc: 1 for tree lock, 0 for no lock, <0 for error */
+        switch (rc) {
+        case 1:
+                RETURN(LL_LOCK_STYLE_TREELOCK);
+        case 0:
+                RETURN(LL_LOCK_STYLE_NOLOCK);
+        }
 
+        /* an error happened if we reached this point, rc = -errno here */
         RETURN(rc);
 }
 
+/**
+ * Drops the lock taken by ll_file_get_lock.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a tree or \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param lockstyle facility through which the lock was taken
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_lock
+ */
+static inline void ll_file_put_lock(struct inode *inode, obd_off end,
+                                    enum ll_lock_style lock_style,
+                                    void *cookie, struct ll_lock_tree *tree,
+                                    int rw)
+
+{
+        switch (lock_style) {
+        case LL_LOCK_STYLE_TREELOCK:
+                ll_tree_unlock(tree);
+                break;
+        case LL_LOCK_STYLE_FASTLOCK:
+                ll_file_put_fast_lock(inode, end, cookie, rw);
+                break;
+        default:
+                CERROR("invalid locking style (%d)\n", lock_style);
+        }
+}
+
 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                             loff_t *ppos)
 {
@@ -1142,25 +1595,24 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc, ra = 0;
-        loff_t end;
+        int ra = 0;
+        obd_off end;
         ssize_t retval, chunk, sum = 0;
+        int lock_style;
+        void *cookie;
 
         __u64 kms;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
                inode->i_ino, inode->i_generation, inode, count, *ppos);
-        ll_vfs_ops_tally(sbi, VFS_OPS_READ);
-
         /* "If nbyte is 0, read() will return 0 and have no other results."
          *                      -- Single Unix Spec */
         if (count == 0)
                 RETURN(0);
 
-        lprocfs_counter_add(sbi->ll_stats, LPROC_LL_READ_BYTES, count);
+        ll_stats_ops_tally(sbi, LPROC_LL_READ_BYTES, count);
 
         if (!lsm) {
                 /* Read on file with no objects should return zero-filled
@@ -1173,11 +1625,11 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                  * unguarded */
 
                 /* Read beyond end of file */
-                if (*ppos >= inode->i_size)
+                if (*ppos >= i_size_read(inode))
                         RETURN(0);
 
-                if (count > inode->i_size - *ppos)
-                        count = inode->i_size - *ppos;
+                if (count > i_size_read(inode) - *ppos)
+                        count = i_size_read(inode) - *ppos;
                 /* Make sure to correctly adjust the file pos pointer for
                  * EFAULT case */
                 notzeroed = clear_user(buf, count);
@@ -1187,13 +1639,11 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         RETURN(-EFAULT);
                 RETURN(count);
         }
-
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
-                obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END,
-                                (obd_off *)&end);
+                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
 
                 /* correct, the end is beyond the request */
                 if (end > *ppos + count - 1)
@@ -1205,13 +1655,11 @@ repeat:
         } else {
                 end = *ppos + count - 1;
         }
-       
-        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+
+        lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
+                                      buf, &cookie, &tree, OBD_BRW_READ);
+        if (lock_style < 0)
+                GOTO(out, retval = lock_style);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1234,7 +1682,7 @@ repeat:
          * correctly in the face of concurrent writes and truncates.
          */
         inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
+        obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 1);
         kms = lvb.lvb_size;
         if (*ppos + count - 1 > kms) {
                 /* A glimpse is necessary to determine whether we return a
@@ -1242,7 +1690,9 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        ll_tree_unlock(&tree);
+                        if (lock_style != LL_LOCK_STYLE_NOLOCK)
+                                ll_file_put_lock(inode, end, lock_style,
+                                                 cookie, &tree, OBD_BRW_READ);
                         goto out;
                 }
         } else {
@@ -1252,35 +1702,37 @@ repeat:
                  * the kms size is _correct_, it is only the _minimum_ size.
                  * If someone does a stat they will get the correct size which
                  * will always be >= the kms value here.  b=11081 */
-                if (inode->i_size < kms)
-                        inode->i_size = kms;
+                if (i_size_read(inode) < kms)
+                        i_size_write(inode, kms);
                 ll_inode_size_unlock(inode, 1);
         }
 
         chunk = end - *ppos + 1;
-        CDEBUG(D_INODE,"Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-               inode->i_ino, chunk, *ppos, inode->i_size);
+        CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
+               inode->i_ino, chunk, *ppos, i_size_read(inode));
+
+        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
+                /* turn off the kernel's read-ahead */
+                file->f_ra.ra_pages = 0;
+
+                /* initialize read-ahead window once per syscall */
+                if (ra == 0) {
+                        ra = 1;
+                        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                        ll_ra_read_in(file, &bead);
+                }
 
-        /* turn off the kernel's read-ahead */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        file->f_ramax = 0;
-#else
-        file->f_ra.ra_pages = 0;
-#endif
-        /* initialize read-ahead window once per syscall */
-        if (ra == 0) {
-                ra = 1;
-                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-                ll_ra_read_in(file, &bead);
+                /* BUG: 5972 */
+                file_accessed(file);
+                retval = generic_file_read(file, buf, chunk, ppos);
+                ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
+                                 OBD_BRW_READ);
+        } else {
+                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
         }
 
-        /* BUG: 5972 */
-        file_accessed(file);
-        retval = generic_file_read(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 0);
-
-        ll_tree_unlock(&tree);
+        ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
 
         if (retval > 0) {
                 buf += retval;
@@ -1307,17 +1759,15 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
-        int rc;
+        int tree_locked;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
                inode->i_ino, inode->i_generation, inode, count, *ppos);
-        ll_vfs_ops_tally(sbi, VFS_OPS_WRITE);
-        
+
         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
 
         /* POSIX, but surprised the VFS doesn't check this already */
@@ -1344,7 +1794,7 @@ repeat:
         } else if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
-                obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, 
+                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END,
                                 (obd_off *)&end);
 
                 /* correct, the end is beyond the request */
@@ -1360,43 +1810,44 @@ repeat:
                 lock_start = *ppos;
                 lock_end = *ppos + count - 1;
         }
-        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
-
-        if (IS_ERR(node))
-                GOTO(out, retval = PTR_ERR(node));
 
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+                                            lock_start, lock_end, OBD_BRW_WRITE);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         /* This is ok, g_f_w will overwrite this under i_sem if it races
          * with a local truncate, it just makes our maxbyte checking easier.
          * The i_size value gets updated in ll_extent_lock() as a consequence
          * of the [0,EOF] extent lock we requested above. */
         if (file->f_flags & O_APPEND) {
-                *ppos = inode->i_size;
+                *ppos = i_size_read(inode);
                 end = *ppos + count - 1;
         }
 
         if (*ppos >= maxbytes) {
                 send_sig(SIGXFSZ, current, 0);
-                GOTO(out, retval = -EFBIG);
+                GOTO(out_unlock, retval = -EFBIG);
         }
-        if (*ppos + count > maxbytes)
-                count = maxbytes - *ppos;
+        if (end > maxbytes - 1)
+                end = maxbytes - 1;
 
         /* generic_file_write handles O_APPEND after getting i_mutex */
         chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
-        retval = generic_file_write(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+        if (tree_locked)
+                retval = generic_file_write(file, buf, chunk, ppos);
+        else
+                retval = ll_file_lockless_io(file, (char*)buf, chunk,
+                                             ppos, WRITE);
+        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
-out:
-        ll_tree_unlock(&tree);
+out_unlock:
+        if (tree_locked)
+                ll_tree_unlock(&tree);
 
+out:
         if (retval > 0) {
                 buf += retval;
                 count -= retval;
@@ -1408,23 +1859,14 @@ out:
         up(&ll_i2info(inode)->lli_write_sem);
 
         retval = (sum > 0) ? sum : retval;
-        lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
-                            retval > 0 ? retval : 0);
-
-        if (retval > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
-                rc = ll_sync_page_range(inode, inode->i_mapping, *ppos - retval,
-                                        count);
-                if (rc < 0)
-                        retval = rc;
-        }
-
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
+                           retval > 0 ? retval : 0);
         RETURN(retval);
 }
 
 /*
  * Send file content (through pagecache) somewhere with helper
  */
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                                 read_actor_t actor, void *target)
 {
@@ -1447,9 +1889,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (count == 0)
                 RETURN(0);
 
-        lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_READ_BYTES,
-                            count);
-
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_READ_BYTES, count);
         /* turn off the kernel's read-ahead */
         in_file->f_ra.ra_pages = 0;
 
@@ -1458,12 +1898,16 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                 RETURN(generic_file_sendfile(in_file, ppos, count, actor, target));
 
         node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
+        if (IS_ERR(node))
+                RETURN(PTR_ERR(node));
+
         tree.lt_fd = LUSTRE_FPRIVATE(in_file);
         rc = ll_tree_lock(&tree, node, NULL, count,
                           in_file->f_flags & O_NONBLOCK?LDLM_FL_BLOCK_NOWAIT:0);
         if (rc != 0)
                 RETURN(rc);
 
+        ll_clear_file_contended(inode);
         ll_inode_size_lock(inode, 1);
         /*
          * Consistency guarantees: following possibilities exist for the
@@ -1485,7 +1929,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
          * correctly in the face of concurrent writes and truncates.
          */
         inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(ll_i2sbi(inode)->ll_osc_exp, lsm, &lvb, 1);
+        obd_merge_lvb(ll_i2sbi(inode)->ll_dt_exp, lsm, &lvb, 1);
         kms = lvb.lvb_size;
         if (*ppos + count - 1 > kms) {
                 /* A glimpse is necessary to determine whether we return a
@@ -1496,12 +1940,12 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                         goto out;
         } else {
                 /* region is within kms and, hence, within real file size (A) */
-                inode->i_size = kms;
+                i_size_write(inode, kms);
                 ll_inode_size_unlock(inode, 1);
         }
 
         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-               inode->i_ino, count, *ppos, inode->i_size);
+               inode->i_ino, count, *ppos, i_size_read(inode));
 
         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
@@ -1515,13 +1959,12 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         ll_tree_unlock(&tree);
         RETURN(retval);
 }
-#endif
 
 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
                                unsigned long arg)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct obd_export *exp = ll_i2obdexp(inode);
+        struct obd_export *exp = ll_i2dtexp(inode);
         struct ll_recreate_obj ucreatp;
         struct obd_trans_info oti = { 0 };
         struct obdo *oa = NULL;
@@ -1538,11 +1981,11 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
         if (rc) {
                 RETURN(-EFAULT);
         }
-        oa = obdo_alloc();
+        OBDO_ALLOC(oa);
         if (oa == NULL)
                 RETURN(-ENOMEM);
 
-        down(&lli->lli_open_sem);
+        down(&lli->lli_size_sem);
         lsm = lli->lli_smd;
         if (lsm == NULL)
                 GOTO(out, rc = -ENOENT);
@@ -1554,27 +1997,26 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
                 GOTO(out, rc = -ENOMEM);
 
         oa->o_id = ucreatp.lrc_id;
+        oa->o_gr = ucreatp.lrc_group;
         oa->o_nlink = ucreatp.lrc_ost_idx;
         oa->o_flags |= OBD_FL_RECREATE_OBJS;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
-        oti.oti_objid = NULL;
         memcpy(lsm2, lsm, lsm_size);
         rc = obd_create(exp, oa, &lsm2, &oti);
 
         OBD_FREE(lsm2, lsm_size);
         GOTO(out, rc);
 out:
-        up(&lli->lli_open_sem);
-        obdo_free(oa);
+        up(&lli->lli_size_sem);
+        OBDO_FREE(oa);
         return rc;
 }
 
 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
-                                    int flags, struct lov_user_md *lum,
-                                    int lum_size)
+                             int flags, struct lov_user_md *lum, int lum_size)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm;
@@ -1582,10 +2024,10 @@ int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
         int rc = 0;
         ENTRY;
 
-        down(&lli->lli_open_sem);
+        down(&lli->lli_size_sem);
         lsm = lli->lli_smd;
         if (lsm) {
-                up(&lli->lli_open_sem);
+                up(&lli->lli_size_sem);
                 CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
                        inode->i_ino);
                 RETURN(-EEXIST);
@@ -1603,7 +2045,7 @@ int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
         ll_release_openhandle(file->f_dentry, &oit);
 
  out:
-        up(&lli->lli_open_sem);
+        up(&lli->lli_size_sem);
         ll_intent_release(&oit);
         RETURN(rc);
 out_req_free:
@@ -1616,33 +2058,30 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                              struct ptlrpc_request **request)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct ll_fid  fid;
-        struct mds_body  *body;
+        struct mdt_body  *body;
         struct lov_mds_md *lmm = NULL;
         struct ptlrpc_request *req = NULL;
+        struct obd_capa *oc;
         int rc, lmmsize;
 
-        ll_inode2fid(&fid, inode);
-
         rc = ll_get_max_mdsize(sbi, &lmmsize);
         if (rc)
                 RETURN(rc);
 
-        rc = mdc_getattr_name(sbi->ll_mdc_exp, &fid,
-                        filename, strlen(filename) + 1,
-                        OBD_MD_FLEASIZE | OBD_MD_FLDIREA,
-                        lmmsize, &req);
+        oc = ll_mdscapa_get(inode);
+        rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
+                             oc, filename, strlen(filename) + 1,
+                             OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
+                             ll_i2suppgid(inode), &req);
+        capa_put(oc);
         if (rc < 0) {
-                CDEBUG(D_INFO, "mdc_getattr_name failed "
-                                "on %s: rc %d\n", filename, rc);
+                CDEBUG(D_INFO, "md_getattr_name failed "
+                       "on %s: rc %d\n", filename, rc);
                 GOTO(out, rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                        sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL); /* checked by mdc_getattr_name */
-        /* swabbed by mdc_getattr_name */
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF);
 
         lmmsize = body->eadatasize;
 
@@ -1651,10 +2090,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc = -ENODATA);
         }
 
-        lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
-                        lmmsize);
+        lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
         LASSERT(lmm != NULL);
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
 
         /*
          * This is coming from the MDS, so is probably in
@@ -1673,16 +2110,16 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 struct lov_user_md_join *lmj;
                 int lmj_size, i, aindex = 0;
 
-                rc = obd_unpackmd(sbi->ll_osc_exp, &lsm, lmm, lmmsize);
+                rc = obd_unpackmd(sbi->ll_dt_exp, &lsm, lmm, lmmsize);
                 if (rc < 0)
                         GOTO(out, rc = -ENOMEM);
-                rc = obd_checkmd(sbi->ll_osc_exp, sbi->ll_mdc_exp, lsm);
+                rc = obd_checkmd(sbi->ll_dt_exp, sbi->ll_md_exp, lsm);
                 if (rc)
                         GOTO(out_free_memmd, rc);
 
                 lmj_size = sizeof(struct lov_user_md_join) +
-                        lsm->lsm_stripe_count *
-                        sizeof(struct lov_user_ost_data_join);
+                           lsm->lsm_stripe_count *
+                           sizeof(struct lov_user_ost_data_join);
                 OBD_ALLOC(lmj, lmj_size);
                 if (!lmj)
                         GOTO(out_free_memmd, rc = -ENOMEM);
@@ -1717,7 +2154,7 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 lmm = (struct lov_mds_md *)lmj;
                 lmmsize = lmj_size;
 out_free_memmd:
-                obd_free_memmd(sbi->ll_osc_exp, &lsm);
+                obd_free_memmd(sbi->ll_dt_exp, &lsm);
         }
 out:
         *lmmp = lmm;
@@ -1725,6 +2162,7 @@ out:
         *request = req;
         return rc;
 }
+
 static int ll_lov_setea(struct inode *inode, struct file *file,
                             unsigned long arg)
 {
@@ -1772,7 +2210,7 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
         rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum));
         if (rc == 0) {
                  put_user(0, &lump->lmm_stripe_count);
-                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode),
+                 rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
                                     0, ll_i2info(inode)->lli_smd, lump);
         }
         RETURN(rc);
@@ -1785,7 +2223,7 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
         if (!lsm)
                 RETURN(-ENODATA);
 
-        return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2obdexp(inode), 0, lsm,
+        return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
                             (void *)arg);
 }
 
@@ -1865,8 +2303,8 @@ static int join_sanity_check(struct inode *head, struct inode *tail)
                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
                 RETURN(-EINVAL);
         }
-        if (head->i_size % JOIN_FILE_ALIGN) {
-                CERROR("hsize %llu must be times of 64K\n", head->i_size);
+        if (i_size_read(head) % JOIN_FILE_ALIGN) {
+                CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
                 RETURN(-EINVAL);
         }
         RETURN(0);
@@ -1875,41 +2313,42 @@ static int join_sanity_check(struct inode *head, struct inode *tail)
 static int join_file(struct inode *head_inode, struct file *head_filp,
                      struct file *tail_filp)
 {
-        struct inode *tail_inode, *tail_parent;
         struct dentry *tail_dentry = tail_filp->f_dentry;
         struct lookup_intent oit = {.it_op = IT_OPEN,
                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
+        struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
+                ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
         struct lustre_handle lockh;
-        struct mdc_op_data *op_data;
-        __u32  hsize = head_inode->i_size >> 32;
-        __u32  tsize = head_inode->i_size;
+        struct md_op_data *op_data;
         int    rc;
+        loff_t data;
         ENTRY;
 
         tail_dentry = tail_filp->f_dentry;
-        tail_inode = tail_dentry->d_inode;
-        tail_parent = tail_dentry->d_parent->d_inode;
 
-        OBD_ALLOC_PTR(op_data);
-        if (op_data == NULL) {
-                RETURN(-ENOMEM);
-        }
+        data = i_size_read(head_inode);
+        op_data = ll_prep_md_op_data(NULL, head_inode,
+                                     tail_dentry->d_parent->d_inode,
+                                     tail_dentry->d_name.name,
+                                     tail_dentry->d_name.len, 0,
+                                     LUSTRE_OPC_ANY, &data);
+        if (IS_ERR(op_data))
+                RETURN(PTR_ERR(op_data));
 
-        ll_prepare_mdc_op_data(op_data, head_inode, tail_parent,
-                               tail_dentry->d_name.name,
-                               tail_dentry->d_name.len, 0);
-        rc = mdc_enqueue(ll_i2mdcexp(head_inode), LDLM_IBITS, &oit, LCK_PW,
-                         op_data, &lockh, &tsize, 0, ldlm_completion_ast,
-                         ll_mdc_blocking_ast, &hsize, 0);
+        rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
+                         op_data, &lockh, NULL, 0, 0);
 
+        ll_finish_md_op_data(op_data);
         if (rc < 0)
                 GOTO(out, rc);
 
         rc = oit.d.lustre.it_status;
 
-        if (rc < 0) {
+        if (rc < 0 || it_open_error(DISP_OPEN_OPEN, &oit)) {
+                rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, &oit);
                 ptlrpc_req_finished((struct ptlrpc_request *)
-                                                          oit.d.lustre.it_data);
+                                    oit.d.lustre.it_data);
                 GOTO(out, rc);
         }
 
@@ -1920,8 +2359,6 @@ static int join_file(struct inode *head_inode, struct file *head_filp,
         }
         ll_release_openhandle(head_filp->f_dentry, &oit);
 out:
-        if (op_data)
-                OBD_FREE_PTR(op_data);
         ll_intent_release(&oit);
         RETURN(rc);
 }
@@ -1995,18 +2432,18 @@ cleanup:
         switch (cleanup_phase) {
         case 3:
                 ll_tree_unlock(&second_tree);
-                obd_cancel_unused(ll_i2obdexp(second),
+                obd_cancel_unused(ll_i2dtexp(second),
                                   ll_i2info(second)->lli_smd, 0, NULL);
         case 2:
                 ll_tree_unlock(&first_tree);
-                obd_cancel_unused(ll_i2obdexp(first),
+                obd_cancel_unused(ll_i2dtexp(first),
                                   ll_i2info(first)->lli_smd, 0, NULL);
         case 1:
                 filp_close(tail_filp, 0);
                 if (tail)
                         iput(tail);
                 if (head && rc == 0) {
-                        obd_free_memmd(ll_i2sbi(head)->ll_osc_exp,
+                        obd_free_memmd(ll_i2sbi(head)->ll_dt_exp,
                                        &hlli->lli_smd);
                         hlli->lli_smd = NULL;
                 }
@@ -2036,15 +2473,17 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
         if (!it_disposition(it, DISP_OPEN_OPEN))
                 RETURN(0);
 
+        LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
+
         OBD_ALLOC(och, sizeof(*och));
         if (!och)
                 GOTO(out, rc = -ENOMEM);
 
-        ll_och_fill(ll_i2info(inode), it, och);
-
-        rc = ll_close_inode_openhandle(inode, och);
+        ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
+                    ll_i2info(inode), it, och);
 
-        OBD_FREE(och, sizeof(*och));
+        rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
+                                       inode, och);
  out:
         /* this one is in place of ll_file_open */
         ptlrpc_req_finished(it->d.lustre.it_data);
@@ -2052,6 +2491,49 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
         RETURN(rc);
 }
 
+/**
+ * Get size for inode for which FIEMAP mapping is requested.
+ * Make the FIEMAP get_info call and returns the result.
+ */
+int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
+              int num_bytes)
+{
+        struct obd_export *exp = ll_i2dtexp(inode);
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
+        int vallen = num_bytes;
+        int rc;
+        ENTRY;
+
+        /* If the stripe_count > 1 and the application does not understand
+         * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
+         */
+        if (lsm->lsm_stripe_count > 1 &&
+            !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
+                return -EOPNOTSUPP;
+
+        fm_key.oa.o_id = lsm->lsm_object_id;
+        fm_key.oa.o_gr = lsm->lsm_object_gr;
+        fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
+                        OBD_MD_FLSIZE);
+
+        /* If filesize is 0, then there would be no objects for mapping */
+        if (fm_key.oa.o_size == 0) {
+                fiemap->fm_mapped_extents = 0;
+                RETURN(0);
+        }
+
+        memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
+
+        rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
+        if (rc)
+                CERROR("obd_get_info failed: rc = %d\n", rc);
+
+        RETURN(rc);
+}
+
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                   unsigned long arg)
 {
@@ -2061,13 +2543,12 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
                inode->i_generation, inode, cmd);
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_IOCTL);
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
 
         /* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
         if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
                 RETURN(-ENOTTY);
 
-        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
         switch(cmd) {
         case LL_IOC_GETFLAGS:
                 /* Get the current value of the file flags */
@@ -2102,6 +2583,72 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
                 RETURN(ll_lov_recreate_obj(inode, file, arg));
+        case EXT3_IOC_FIEMAP: {
+                struct ll_user_fiemap *fiemap_s;
+                size_t num_bytes, ret_bytes;
+                unsigned int extent_count;
+                int rc = 0;
+
+                /* Get the extent count so we can calculate the size of
+                 * required fiemap buffer */
+                if (get_user(extent_count,
+                    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
+                        RETURN(-EFAULT);
+                num_bytes = sizeof(*fiemap_s) + (extent_count *
+                                                 sizeof(struct ll_fiemap_extent));
+                OBD_VMALLOC(fiemap_s, num_bytes);
+                if (fiemap_s == NULL)
+                        RETURN(-ENOMEM);
+
+                if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
+                                   sizeof(*fiemap_s)))
+                        GOTO(error, rc = -EFAULT);
+
+                if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
+                        fiemap_s->fm_flags = fiemap_s->fm_flags &
+                                                    ~LUSTRE_FIEMAP_FLAGS_COMPAT;
+                        if (copy_to_user((char *)arg, fiemap_s,
+                                         sizeof(*fiemap_s)))
+                                GOTO(error, rc = -EFAULT);
+
+                        GOTO(error, rc = -EBADR);
+                }
+
+                /* If fm_extent_count is non-zero, read the first extent since
+                 * it is used to calculate end_offset and device from previous
+                 * fiemap call. */
+                if (extent_count) {
+                        if (copy_from_user(&fiemap_s->fm_extents[0],
+                            (char __user *)arg + sizeof(*fiemap_s),
+                            sizeof(struct ll_fiemap_extent)))
+                                GOTO(error, rc = -EFAULT);
+                }
+
+                if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
+                        int rc;
+
+                        rc = filemap_fdatawrite(inode->i_mapping);
+                        if (rc)
+                                GOTO(error, rc);
+                }
+
+                rc = ll_fiemap(inode, fiemap_s, num_bytes);
+                if (rc)
+                        GOTO(error, rc);
+
+                ret_bytes = sizeof(struct ll_user_fiemap);
+
+                if (extent_count != 0)
+                        ret_bytes += (fiemap_s->fm_mapped_extents *
+                                         sizeof(struct ll_fiemap_extent));
+
+                if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
+                        rc = -EFAULT;
+
+error:
+                OBD_VFREE(fiemap_s, num_bytes);
+                RETURN(rc);
+        }
         case EXT3_IOC_GETFLAGS:
         case EXT3_IOC_SETFLAGS:
                 RETURN(ll_iocontrol(inode, file, cmd, arg));
@@ -2132,10 +2679,19 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         case EXT3_IOC_SETVERSION_OLD:
         case EXT3_IOC_SETVERSION:
         */
-        default:
-                RETURN(obd_iocontrol(cmd, ll_i2obdexp(inode), 0, NULL,
+        case LL_IOC_FLUSHCTX:
+                RETURN(ll_flush_ctx(inode));
+        default: {
+                int err;
+
+                if (LLIOC_STOP == 
+                    ll_iocontrol_call(inode, file, cmd, arg, &err))
+                        RETURN(err);
+
+                RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                      (void *)arg));
         }
+        }
 }
 
 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
@@ -2145,14 +2701,13 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
         struct lov_stripe_md *lsm = lli->lli_smd;
         loff_t retval;
         ENTRY;
-        retval = offset + ((origin == 2) ? inode->i_size :
+        retval = offset + ((origin == 2) ? i_size_read(inode) :
                            (origin == 1) ? file->f_pos : 0);
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
                inode->i_ino, inode->i_generation, inode, retval, retval,
                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_SEEK);
-        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
-        
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
+
         if (origin == 2) { /* SEEK_END */
                 int nonblock = 0, rc;
 
@@ -2166,7 +2721,7 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
                 }
 
                 ll_inode_size_lock(inode, 0);
-                offset += inode->i_size;
+                offset += i_size_read(inode);
                 ll_inode_size_unlock(inode, 0);
         } else if (origin == 1) { /* SEEK_CUR */
                 offset += file->f_pos;
@@ -2183,7 +2738,7 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
                 }
                 retval = offset;
         }
-
+        
         RETURN(retval);
 }
 
@@ -2192,14 +2747,13 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
         struct inode *inode = dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
-        struct ll_fid fid;
         struct ptlrpc_request *req;
+        struct obd_capa *oc;
         int rc, err;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FSYNC);
-        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_FSYNC);
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
 
         /* fsync's caller has already called _fdata{sync,write}, we want
          * that IO to finish before calling the osc and mdc sync methods */
@@ -2217,29 +2771,36 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                         rc = err;
         }
 
-        ll_inode2fid(&fid, inode);
-        err = mdc_sync(ll_i2sbi(inode)->ll_mdc_exp, &fid, &req);
+        oc = ll_mdscapa_get(inode);
+        err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
+                      &req);
+        capa_put(oc);
         if (!rc)
                 rc = err;
         if (!err)
                 ptlrpc_req_finished(req);
 
         if (data && lsm) {
-                struct obdo *oa = obdo_alloc();
-
+                struct obdo *oa;
+                
+                OBDO_ALLOC(oa);
                 if (!oa)
                         RETURN(rc ? rc : -ENOMEM);
 
                 oa->o_id = lsm->lsm_object_id;
-                oa->o_valid = OBD_MD_FLID;
+                oa->o_gr = lsm->lsm_object_gr;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                                           OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+                                           OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                                           OBD_MD_FLGROUP);
 
-                err = obd_sync(ll_i2sbi(inode)->ll_osc_exp, oa, lsm,
-                               0, OBD_OBJECT_EOF);
+                oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
+                err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
+                               0, OBD_OBJECT_EOF, oc);
+                capa_put(oc);
                 if (!rc)
                         rc = err;
-                obdo_free(oa);
+                OBDO_FREE(oa);
         }
 
         RETURN(rc);
@@ -2250,18 +2811,23 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_res_id res_id =
-                    { .name = {inode->i_ino, inode->i_generation, LDLM_FLOCK} };
+                { .name = { fid_seq(ll_inode2fid(inode)),
+                            fid_oid(ll_inode2fid(inode)),
+                            fid_ver(ll_inode2fid(inode)),
+                            LDLM_FLOCK} };
+        struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+                ldlm_flock_completion_ast, NULL, file_lock };
         struct lustre_handle lockh = {0};
         ldlm_policy_data_t flock;
-        ldlm_mode_t mode = 0;
         int flags = 0;
         int rc;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
                inode->i_ino, file_lock);
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_FLOCK);
 
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
         if (file_lock->fl_flags & FL_FLOCK) {
                 LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
                 /* set missing params for flock() calls */
@@ -2274,7 +2840,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         switch (file_lock->fl_type) {
         case F_RDLCK:
-                mode = LCK_PR;
+                einfo.ei_mode = LCK_PR;
                 break;
         case F_UNLCK:
                 /* An unlock request may or may not have any relation to
@@ -2285,10 +2851,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                  * information that is given with a normal read or write record
                  * lock request. To avoid creating another ldlm unlock (cancel)
                  * message we'll treat a LCK_NL flock request as an unlock. */
-                mode = LCK_NL;
+                einfo.ei_mode = LCK_NL;
                 break;
         case F_WRLCK:
-                mode = LCK_PW;
+                einfo.ei_mode = LCK_PW;
                 break;
         default:
                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
@@ -2315,7 +2881,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 flags = LDLM_FL_TEST_LOCK;
                 /* Save the old mode so that if the mode in the lock changes we
                  * can decrement the appropriate reader or writer refcount. */
-                file_lock->fl_type = mode;
+                file_lock->fl_type = einfo.ei_mode;
                 break;
         default:
                 CERROR("unknown fcntl lock command: %d\n", cmd);
@@ -2324,16 +2890,17 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
-               flags, mode, flock.l_flock.start, flock.l_flock.end);
+               flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, res_id,
-                              LDLM_FLOCK, &flock, mode, &flags, NULL,
-                              ldlm_flock_completion_ast, NULL, file_lock,
-                              NULL, 0, NULL, &lockh, 0);
-        if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
+        rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
+                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
+        if ((file_lock->fl_flags & FL_FLOCK) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK))
                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
 #ifdef HAVE_F_OP_FLOCK
-        if ((file_lock->fl_flags & FL_POSIX) &&(rc == 0))
+        if ((file_lock->fl_flags & FL_POSIX) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK) &&
+            !(flags & LDLM_FL_TEST_LOCK))
                 posix_lock_file_wait(file, file_lock);
 #endif
 
@@ -2350,30 +2917,43 @@ int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
 int ll_have_md_lock(struct inode *inode, __u64 bits)
 {
         struct lustre_handle lockh;
-        struct ldlm_res_id res_id = { .name = {0} };
-        struct obd_device *obddev;
         ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
         int flags;
         ENTRY;
 
         if (!inode)
                RETURN(0);
 
-        obddev = ll_i2mdcexp(inode)->exp_obd;
-        res_id.name[0] = inode->i_ino;
-        res_id.name[1] = inode->i_generation;
-
-        CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
+        fid = &ll_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
-                            &policy, LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+        if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                          LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
-
         RETURN(0);
 }
 
+ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
+                            struct lustre_handle *lockh)
+{
+        ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
+        ldlm_mode_t rc;
+        int flags;
+        ENTRY;
+
+        fid = &ll_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
+
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
+        RETURN(rc);
+}
+
 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
                               * and return success */
@@ -2399,6 +2979,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
 {
         struct inode *inode = dentry->d_inode;
         struct ptlrpc_request *req = NULL;
+        struct ll_sb_info *sbi;
         struct obd_export *exp;
         int rc;
         ENTRY;
@@ -2407,32 +2988,38 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 CERROR("REPORT THIS LINE TO PETER\n");
                 RETURN(0);
         }
+        sbi = ll_i2sbi(inode);
+
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
-        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_REVALIDATE);
-#endif
 
-        exp = ll_i2mdcexp(inode);
+        exp = ll_i2mdexp(inode);
 
         if (exp->exp_connect_flags & OBD_CONNECT_ATTRFID) {
                 struct lookup_intent oit = { .it_op = IT_GETATTR };
-                struct mdc_op_data op_data;
+                struct md_op_data *op_data;
 
                 /* Call getattr by fid, so do not provide name at all. */
-                ll_prepare_mdc_op_data(&op_data, dentry->d_parent->d_inode,
-                                       dentry->d_inode, NULL, 0, 0);
-                rc = mdc_intent_lock(exp, &op_data, NULL, 0,
-                                     /* we are not interested in name
-                                        based lookup */
-                                     &oit, 0, &req,
-                                     ll_mdc_blocking_ast, 0);
+                op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
+                                             dentry->d_inode, NULL, 0, 0,
+                                             LUSTRE_OPC_ANY, NULL);
+                if (IS_ERR(op_data))
+                        RETURN(PTR_ERR(op_data));
+
+                oit.it_flags |= O_CHECK_STALE;
+                rc = md_intent_lock(exp, op_data, NULL, 0,
+                                    /* we are not interested in name
+                                       based lookup */
+                                    &oit, 0, &req,
+                                    ll_md_blocking_ast, 0);
+                ll_finish_md_op_data(op_data);
+                oit.it_flags &= ~O_CHECK_STALE;
                 if (rc < 0) {
                         rc = ll_inode_revalidate_fini(inode, rc);
                         GOTO (out, rc);
                 }
-                
-                rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
+
+                rc = ll_revalidate_it_finish(req, &oit, dentry);
                 if (rc != 0) {
                         ll_intent_release(&oit);
                         GOTO(out, rc);
@@ -2449,46 +3036,49 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 }
 
                 ll_lookup_finish_locks(&oit, dentry);
-        } else if (!ll_have_md_lock(dentry->d_inode,
-                                  MDS_INODELOCK_UPDATE|MDS_INODELOCK_LOOKUP)) {
+        } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
+                                                     MDS_INODELOCK_LOOKUP)) {
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
-                struct ll_fid fid;
                 obd_valid valid = OBD_MD_FLGETATTR;
+                struct obd_capa *oc;
                 int ealen = 0;
 
                 if (S_ISREG(inode->i_mode)) {
                         rc = ll_get_max_mdsize(sbi, &ealen);
-                        if (rc) 
-                                RETURN(rc); 
+                        if (rc)
+                                RETURN(rc);
                         valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
                 }
-                ll_inode2fid(&fid, inode);
-                rc = mdc_getattr(sbi->ll_mdc_exp, &fid, valid, ealen, &req);
+                /* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
+                 * capa for this inode. Because we only keep capas of dirs
+                 * fresh. */
+                oc = ll_mdscapa_get(inode);
+                rc = md_getattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, valid,
+                                ealen, &req);
+                capa_put(oc);
                 if (rc) {
                         rc = ll_inode_revalidate_fini(inode, rc);
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(sbi->ll_osc_exp, &inode, req, REPLY_REC_OFF,
-                                   NULL);
+                rc = ll_prep_inode(&inode, req, NULL);
                 if (rc)
                         GOTO(out, rc);
         }
 
         /* if object not yet allocated, don't validate size */
-        if (ll_i2info(inode)->lli_smd == NULL) 
+        if (ll_i2info(inode)->lli_smd == NULL)
                 GOTO(out, rc = 0);
 
         /* ll_glimpse_size will prefer locally cached writes if they extend
          * the file */
         rc = ll_glimpse_size(inode, 0);
-
+        EXIT;
 out:
         ptlrpc_req_finished(req);
-        RETURN(rc);
+        return rc;
 }
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
                   struct lookup_intent *it, struct kstat *stat)
 {
@@ -2496,7 +3086,7 @@ int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
         int res = 0;
 
         res = ll_inode_revalidate_it(de, it);
-        lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETATTR, 1);
 
         if (res)
                 return res;
@@ -2514,11 +3104,11 @@ int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
 #ifdef HAVE_INODE_BLKSIZE
         stat->blksize = inode->i_blksize;
 #else
-        stat->blksize = 1<<inode->i_blkbits;
+        stat->blksize = 1 << inode->i_blkbits;
 #endif
 
         ll_inode_size_lock(inode, 0);
-        stat->size = inode->i_size;
+        stat->size = i_size_read(inode);
         stat->blocks = inode->i_blocks;
         ll_inode_size_unlock(inode, 0);
 
@@ -2528,10 +3118,8 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
 {
         struct lookup_intent it = { .it_op = IT_GETATTR };
 
-        ll_vfs_ops_tally(ll_i2sbi(de->d_inode), VFS_OPS_GETATTR);
         return ll_getattr_it(mnt, de, &it, stat);
 }
-#endif
 
 static
 int lustre_check_acl(struct inode *inode, int mask)
@@ -2563,23 +3151,25 @@ int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
 {
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
                inode->i_ino, inode->i_generation, inode, mask);
-
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
+        if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
+                return lustre_check_remote_perm(inode, mask);
+        
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
         return generic_permission(inode, mask, lustre_check_acl);
 }
 #else
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
-#else
-int ll_inode_permission(struct inode *inode, int mask)
-#endif
 {
         int mode = inode->i_mode;
         int rc;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), mask %o\n",
                inode->i_ino, inode->i_generation, inode, mask);
-        ll_vfs_ops_tally(ll_i2sbi(inode), VFS_OPS_INODE_PERMISSION);
+
+        if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
+                return lustre_check_remote_perm(inode, mask);
+
+        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
 
         if ((mask & MAY_WRITE) && IS_RDONLY(inode) &&
             (S_ISREG(mode) || S_ISDIR(mode) || S_ISLNK(mode)))
@@ -2614,11 +3204,12 @@ check_capabilities:
         if (capable(CAP_DAC_READ_SEARCH) && ((mask == MAY_READ) ||
             (S_ISDIR(inode->i_mode) && !(mask & MAY_WRITE))))
                 return 0;
-
+        
         return -EACCES;
 }
 #endif
 
+/* -o localflock - only provides locally consistent flock locks */
 struct file_operations ll_file_operations = {
         .read           = ll_file_read,
         .write          = ll_file_write,
@@ -2627,14 +3218,8 @@ struct file_operations ll_file_operations = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         .sendfile       = ll_file_sendfile,
-#endif
         .fsync          = ll_fsync,
-#ifdef HAVE_F_OP_FLOCK
-        .flock          = ll_file_noflock,
-#endif
-        .lock           = ll_file_noflock
 };
 
 struct file_operations ll_file_operations_flock = {
@@ -2645,9 +3230,7 @@ struct file_operations ll_file_operations_flock = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         .sendfile       = ll_file_sendfile,
-#endif
         .fsync          = ll_fsync,
 #ifdef HAVE_F_OP_FLOCK
         .flock          = ll_file_flock,
@@ -2655,18 +3238,30 @@ struct file_operations ll_file_operations_flock = {
         .lock           = ll_file_flock
 };
 
+/* These are for -o noflock - to return ENOSYS on flock calls */
+struct file_operations ll_file_operations_noflock = {
+        .read           = ll_file_read,
+        .write          = ll_file_write,
+        .ioctl          = ll_file_ioctl,
+        .open           = ll_file_open,
+        .release        = ll_file_release,
+        .mmap           = ll_file_mmap,
+        .llseek         = ll_file_seek,
+        .sendfile       = ll_file_sendfile,
+        .fsync          = ll_fsync,
+#ifdef HAVE_F_OP_FLOCK
+        .flock          = ll_file_noflock,
+#endif
+        .lock           = ll_file_noflock
+};
 
 struct inode_operations ll_file_inode_operations = {
-#ifdef LUSTRE_KERNEL_VERSION
+#ifdef HAVE_VFS_INTENT_PATCHES
         .setattr_raw    = ll_setattr_raw,
 #endif
         .setattr        = ll_setattr,
         .truncate       = ll_truncate,
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
         .getattr        = ll_getattr,
-#else
-        .revalidate_it  = ll_inode_revalidate_it,
-#endif
         .permission     = ll_inode_permission,
         .setxattr       = ll_setxattr,
         .getxattr       = ll_getxattr,
@@ -2674,3 +3269,102 @@ struct inode_operations ll_file_inode_operations = {
         .removexattr    = ll_removexattr,
 };
 
+/* dynamic ioctl number support routins */
+static struct llioc_ctl_data {
+        struct rw_semaphore ioc_sem;
+        struct list_head    ioc_head;
+} llioc = { 
+        __RWSEM_INITIALIZER(llioc.ioc_sem), 
+        CFS_LIST_HEAD_INIT(llioc.ioc_head)
+};
+
+
+struct llioc_data {
+        struct list_head        iocd_list;
+        unsigned int            iocd_size;
+        llioc_callback_t        iocd_cb;
+        unsigned int            iocd_count;
+        unsigned int            iocd_cmd[0];
+};
+
+void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
+{
+        unsigned int size;
+        struct llioc_data *in_data = NULL;
+        ENTRY;
+
+        if (cb == NULL || cmd == NULL ||
+            count > LLIOC_MAX_CMD || count < 0)
+                RETURN(NULL);
+
+        size = sizeof(*in_data) + count * sizeof(unsigned int);
+        OBD_ALLOC(in_data, size);
+        if (in_data == NULL)
+                RETURN(NULL);
+
+        memset(in_data, 0, sizeof(*in_data));
+        in_data->iocd_size = size;
+        in_data->iocd_cb = cb;
+        in_data->iocd_count = count;
+        memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
+
+        down_write(&llioc.ioc_sem);
+        list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
+        up_write(&llioc.ioc_sem);
+
+        RETURN(in_data);
+}
+
+void ll_iocontrol_unregister(void *magic)
+{
+        struct llioc_data *tmp;
+
+        if (magic == NULL)
+                return;
+
+        down_write(&llioc.ioc_sem);
+        list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
+                if (tmp == magic) {
+                        unsigned int size = tmp->iocd_size;
+
+                        list_del(&tmp->iocd_list);
+                        up_write(&llioc.ioc_sem);
+
+                        OBD_FREE(tmp, size);
+                        return;
+                }
+        }
+        up_write(&llioc.ioc_sem);
+
+        CWARN("didn't find iocontrol register block with magic: %p\n", magic);
+}
+
+EXPORT_SYMBOL(ll_iocontrol_register);
+EXPORT_SYMBOL(ll_iocontrol_unregister);
+
+enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
+                        unsigned int cmd, unsigned long arg, int *rcp)
+{
+        enum llioc_iter ret = LLIOC_CONT;
+        struct llioc_data *data;
+        int rc = -EINVAL, i;
+
+        down_read(&llioc.ioc_sem);
+        list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
+                for (i = 0; i < data->iocd_count; i++) {
+                        if (cmd != data->iocd_cmd[i]) 
+                                continue;
+
+                        ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
+                        break;
+                }
+
+                if (ret == LLIOC_STOP)
+                        break;
+        }
+        up_read(&llioc.ioc_sem);
+
+        if (rcp)
+                *rcp = rc;
+        return ret;
+}