Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
index f088b60..0b7082c 100644 (file)
@@ -1,25 +1,43 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
- *   Author: Andreas Dilger <adilger@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/llite/file.c
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
 #include <lustre_mdc.h>
 #include <linux/pagemap.h>
 #include <linux/file.h>
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#include <linux/lustre_compat25.h>
-#endif
 #include "llite_internal.h"
+#include <lustre/ll_fiemap.h>
 
 /* also used by llite/special.c:ll_special_open() */
 struct ll_file_data *ll_file_data_get(void)
@@ -56,7 +72,7 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
         op_data->op_attr.ia_atime = inode->i_atime;
         op_data->op_attr.ia_mtime = inode->i_mtime;
         op_data->op_attr.ia_ctime = inode->i_ctime;
-        op_data->op_attr.ia_size = inode->i_size;
+        op_data->op_attr.ia_size = i_size_read(inode);
         op_data->op_attr_blocks = inode->i_blocks;
         ((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags = inode->i_flags;
         op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
@@ -95,7 +111,7 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         struct ptlrpc_request *req = NULL;
         struct obd_device *obd = class_exp2obd(exp);
         int epoch_close = 1;
-        int rc;
+        int seq_end = 0, rc;
         ENTRY;
 
         if (obd == NULL) {
@@ -122,7 +138,9 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
 
         ll_prepare_close(inode, op_data, och);
         epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
-        rc = md_close(md_exp, op_data, och, &req);
+        rc = md_close(md_exp, op_data, och->och_mod, &req);
+        if (rc != -EAGAIN)
+                seq_end = 1;
 
         if (rc == -EAGAIN) {
                 /* This close must have the epoch closed. */
@@ -130,8 +148,8 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                 LASSERT(epoch_close);
                 /* MDS has instructed us to obtain Size-on-MDS attribute from
                  * OSTs and send setattr to back to MDS. */
-                rc = ll_sizeonmds_update(inode, &och->och_fh,
-                                         op_data->op_ioepoch);
+                rc = ll_sizeonmds_update(inode, och->och_mod,
+                                         &och->och_fh, op_data->op_ioepoch);
                 if (rc) {
                         CERROR("inode %lu mdc Size-on-MDS update failed: "
                                "rc = %d\n", inode->i_ino, rc);
@@ -150,7 +168,6 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                                inode->i_ino, rc);
         }
 
-        ptlrpc_req_finished(req); /* This is close request */
         EXIT;
 out:
       
@@ -158,12 +175,15 @@ out:
             S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
                 ll_queue_done_writing(inode, LLIF_DONE_WRITING);
         } else {
+                if (seq_end)
+                        ptlrpc_close_replay_seq(req);
                 md_clear_open_replay_data(md_exp, och);
                 /* Free @och if it is not waiting for DONE_WRITING. */
                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
                 OBD_FREE_PTR(och);
         }
-        
+        if (req) /* This is close request */
+                ptlrpc_req_finished(req);
         return rc;
 }
 
@@ -285,15 +305,31 @@ int ll_file_release(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
+#ifdef CONFIG_FS_POSIX_ACL
+        if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
+            inode == inode->i_sb->s_root->d_inode) {
+                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+
+                LASSERT(fd != NULL);
+                if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
+                        fd->fd_flags &= ~LL_FILE_RMTACL;
+                        rct_del(&sbi->ll_rct, cfs_curproc_pid());
+                        et_search_free(&sbi->ll_et, cfs_curproc_pid());
+                }
+        }
+#endif
 
-        ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
+        if (inode->i_sb->s_root != file->f_dentry)
+                ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
         fd = LUSTRE_FPRIVATE(file);
         LASSERT(fd != NULL);
 
-        /* don't do anything for / */
+        /* The last ref on @file, maybe not the the owner pid of statahead.
+         * Different processes can open the same dir, "ll_opendir_key" means:
+         * it is me that should stop the statahead thread. */
+        if (lli->lli_opendir_key == fd)
+                ll_stop_statahead(inode, fd);
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = NULL;
                 ll_file_data_put(fd);
@@ -318,6 +354,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         struct md_op_data *op_data;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
@@ -356,7 +393,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
 
         if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
                 rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
-                CERROR("lock enqueue: err: %d\n", rc);
+                CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
                 GOTO(out, rc);
         }
 
@@ -365,8 +402,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                                  &itp->d.lustre.it_lock_handle, 
                                  file->f_dentry->d_inode);
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
-                           NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
 out:
         ptlrpc_req_finished(itp->d.lustre.it_data);
 
@@ -385,9 +421,8 @@ static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
 
         LASSERT(och);
 
-        body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);                      /* reply already checked out */
-        LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
 
         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
@@ -418,15 +453,11 @@ int ll_local_open(struct file *file, struct lookup_intent *it,
                 if (rc)
                         RETURN(rc);
 
-                body = lustre_msg_buf(req->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
-
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 if ((it->it_flags & FMODE_WRITE) &&
                     (body->valid & OBD_MD_FLSIZE))
-                {
                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
                                lli->lli_ioepoch, PFID(&lli->lli_fid));
-                }
         }
 
         LUSTRE_FPRIVATE(file) = fd;
@@ -460,17 +491,13 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct obd_client_handle **och_p;
         __u64 *och_usecount;
         struct ll_file_data *fd;
-        int rc = 0;
+        int rc = 0, opendir_set = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
-
-#ifdef LUSTRE_KERNEL_VERSION
+#ifdef HAVE_VFS_INTENT_PATCHES
         it = file->f_it;
 #else
         it = file->private_data; /* XXX: compat macro */
@@ -481,7 +508,29 @@ int ll_file_open(struct inode *inode, struct file *file)
         if (fd == NULL)
                 RETURN(-ENOMEM);
 
-        /* don't do anything for / */
+        if (S_ISDIR(inode->i_mode)) {
+                spin_lock(&lli->lli_lock);
+                /* "lli->lli_opendir_pid != 0" means someone has set it.
+                 * "lli->lli_sai != NULL" means the previous statahead has not
+                 *                        been cleanup. */ 
+                if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
+                        opendir_set = 1;
+                        lli->lli_opendir_pid = cfs_curproc_pid();
+                        lli->lli_opendir_key = fd;
+                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
+                        /* Two cases for this:
+                         * (1) The same process open such directory many times.
+                         * (2) The old process opened the directory, and exited
+                         *     before its children processes. Then new process
+                         *     with the same pid opens such directory before the
+                         *     old process's children processes exit.
+                         * Change the owner to the latest one. */
+                        opendir_set = 2;
+                        lli->lli_opendir_key = fd;
+                }
+                spin_unlock(&lli->lli_lock);
+        }
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = fd;
                 RETURN(0);
@@ -510,6 +559,7 @@ int ll_file_open(struct inode *inode, struct file *file)
                 it = &oit;
         }
 
+restart:
         /* Let's see if we have file open on MDS already. */
         if (it->it_flags & FMODE_WRITE) {
                 och_p = &lli->lli_mds_write_och;
@@ -529,8 +579,9 @@ int ll_file_open(struct inode *inode, struct file *file)
                            let's close it somehow. This will decref request. */
                         rc = it_open_error(DISP_OPEN_OPEN, it);
                         if (rc) {
+                                up(&lli->lli_och_sem);
                                 ll_file_data_put(fd);
-                                GOTO(out_och_free, rc);
+                                GOTO(out_openerr, rc);
                         }       
                         ll_release_openhandle(file->f_dentry, it);
                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
@@ -540,25 +591,26 @@ int ll_file_open(struct inode *inode, struct file *file)
 
                 rc = ll_local_open(file, it, fd, NULL);
                 if (rc) {
+                        (*och_usecount)--;
                         up(&lli->lli_och_sem);
                         ll_file_data_put(fd);
-                        RETURN(rc);
+                        GOTO(out_openerr, rc);
                 }
         } else {
                 LASSERT(*och_usecount == 0);
-                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
-                if (!*och_p) {
-                        ll_file_data_put(fd);
-                        GOTO(out_och_free, rc = -ENOMEM);
-                }
-                (*och_usecount)++;
                 if (!it->d.lustre.it_disposition) {
+                        /* We cannot just request lock handle now, new ELC code
+                           means that one of other OPEN locks for this file
+                           could be cancelled, and since blocking ast handler
+                           would attempt to grab och_sem as well, that would
+                           result in a deadlock */
+                        up(&lli->lli_och_sem);
                         it->it_flags |= O_CHECK_STALE;
                         rc = ll_intent_file_open(file, NULL, 0, it);
                         it->it_flags &= ~O_CHECK_STALE;
                         if (rc) {
                                 ll_file_data_put(fd);
-                                GOTO(out_och_free, rc);
+                                GOTO(out_openerr, rc);
                         }
 
                         /* Got some error? Release the request */
@@ -569,7 +621,14 @@ int ll_file_open(struct inode *inode, struct file *file)
                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
                                          &it->d.lustre.it_lock_handle,
                                          file->f_dentry->d_inode);
+                        goto restart;
                 }
+                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
+                if (!*och_p) {
+                        ll_file_data_put(fd);
+                        GOTO(out_och_free, rc = -ENOMEM);
+                }
+                (*och_usecount)++;
                 req = it->d.lustre.it_data;
 
                 /* md_intent_lock() didn't get a request ref if there was an
@@ -586,7 +645,6 @@ int ll_file_open(struct inode *inode, struct file *file)
                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
                 rc = ll_local_open(file, it, fd, *och_p);
                 if (rc) {
-                        up(&lli->lli_och_sem);
                         ll_file_data_put(fd);
                         GOTO(out_och_free, rc);
                 }
@@ -623,6 +681,13 @@ out_och_free:
                         (*och_usecount)--;
                 }
                 up(&lli->lli_och_sem);
+out_openerr:
+                if (opendir_set == 1) {
+                        lli->lli_opendir_key = NULL;
+                        lli->lli_opendir_pid = 0;
+                } else if (unlikely(opendir_set == 2)) {
+                        ll_stop_statahead(inode, fd);
+                }
         }
 
         return rc;
@@ -672,27 +737,13 @@ int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
 
         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
-        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
-               lli->lli_smd->lsm_object_id, inode->i_size, inode->i_blocks,
-               inode->i_blksize);
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
+               lli->lli_smd->lsm_object_id, i_size_read(inode),
+               (unsigned long long)inode->i_blocks,
+               (unsigned long)ll_inode_blksize(inode));
         RETURN(0);
 }
 
-static inline void ll_remove_suid(struct inode *inode)
-{
-        unsigned int mode;
-
-        /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
-        mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
-
-        /* was any of the uid bits set? */
-        mode &= inode->i_mode;
-        if (mode && !capable(CAP_FSETID)) {
-                inode->i_mode &= ~mode;
-                // XXX careful here - we cannot change the size
-        }
-}
-
 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -701,9 +752,9 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
         struct {
                 char name[16];
                 struct ldlm_lock *lock;
-                struct lov_stripe_md *lsm;
-        } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+        } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
         __u32 stripe, vallen = sizeof(stripe);
+        struct lov_oinfo *loinfo;
         int rc;
         ENTRY;
 
@@ -711,7 +762,7 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
                 GOTO(check, stripe = 0);
 
         /* get our offset in the lov */
-        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
+        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
         if (rc != 0) {
                 CERROR("obd_get_info: rc = %d\n", rc);
                 RETURN(rc);
@@ -719,167 +770,103 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
         LASSERT(stripe < lsm->lsm_stripe_count);
 
 check:
-        if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
-            lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
+        loinfo = lsm->lsm_oinfo[stripe];
+        if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
+                            &lock->l_resource->lr_name)){
                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
-                           lsm->lsm_oinfo[stripe]->loi_id,
-                           lsm->lsm_oinfo[stripe]->loi_gr);
+                           loinfo->loi_id, loinfo->loi_gr);
                 RETURN(-ELDLM_NO_LOCK_DATA);
         }
 
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
         ENTRY;
 
-        memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               inode->i_size);
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
-                           CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = inode->i_size ? (__u64)(inode->i_size - 1) >> CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(inode->i_mapping);
-                if (ll_teardown_mmaps(inode->i_mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
-
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
-
-                if (!mapping_has_pages(inode->i_mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
-
-                cond_resched();
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                page = find_get_page(inode->i_mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
+        mapping = page->mapping;
+
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
+
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
                 lock_page(page);
-
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        if (rc != 0)
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-
-                if (rc2 <= 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        /* checking again to account for writeback's
-                         * lock_page() */
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, inode->i_mapping);
-                        ll_truncate_complete_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+                set_bit(AS_EIO, &mapping->flags);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                /* checking again to account for writeback's lock_page() */
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -887,60 +874,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_queue_done_writing(inode, 0);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -1006,7 +970,6 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         struct lov_stripe_md *lsm;
         struct ost_lvb *lvb;
         int rc, stripe;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
         ENTRY;
 
         if (inode == NULL)
@@ -1023,13 +986,16 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         if (stripe < 0)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             sizeof(*lvb));
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc) {
                 CERROR("lustre_pack_reply: %d\n", rc);
                 GOTO(iput, rc);
         }
 
-        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
+        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
         lvb->lvb_atime = LTIME_S(inode->i_atime);
@@ -1037,7 +1003,7 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 
         LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64
                    " atime "LPU64", mtime "LPU64", ctime "LPU64,
-                   inode->i_size, stripe, lvb->lvb_size, lvb->lvb_mtime,
+                   i_size_read(inode), stripe, lvb->lvb_size, lvb->lvb_mtime,
                    lvb->lvb_atime, lvb->lvb_ctime);
  iput:
         iput(inode);
@@ -1052,23 +1018,27 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         return rc;
 }
 
-static void ll_merge_lvb(struct inode *inode)
+static int ll_merge_lvb(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ost_lvb lvb;
+        int rc;
+
         ENTRY;
 
         ll_inode_size_lock(inode, 1);
         inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
-        inode->i_size = lvb.lvb_size;
+        rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
+        i_size_write(inode, lvb.lvb_size);
         inode->i_blocks = lvb.lvb_blocks;
+
         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
         LTIME_S(inode->i_atime) = lvb.lvb_atime;
         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
         ll_inode_size_unlock(inode, 1);
-        EXIT;
+
+        RETURN(rc);
 }
 
 int ll_local_size(struct inode *inode)
@@ -1085,22 +1055,22 @@ int ll_local_size(struct inode *inode)
                 RETURN(0);
 
         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
-                       &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
+                       &policy, LCK_PR, &flags, inode, &lockh);
         if (rc < 0)
                 RETURN(rc);
         else if (rc == 0)
                 RETURN(-ENODATA);
 
-        ll_merge_lvb(inode);
-        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
-        RETURN(0);
+        rc = ll_merge_lvb(inode);
+        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
+        RETURN(rc);
 }
 
 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
                      lstat_t *st)
 {
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         struct ost_lvb lvb;
         int rc;
@@ -1109,8 +1079,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = LDLM_FL_HAS_INTENT;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -1118,6 +1087,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = LDLM_FL_HAS_INTENT;
 
         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
@@ -1148,7 +1118,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -1172,8 +1142,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1181,6 +1150,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
 
         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
@@ -1190,10 +1160,10 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
                 RETURN(rc > 0 ? -EIO : rc);
         }
 
-        ll_merge_lvb(inode);
+        rc = ll_merge_lvb(inode);
 
-        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
-               inode->i_size, inode->i_blocks);
+        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
+               i_size_read(inode), (unsigned long long)inode->i_blocks);
 
         RETURN(rc);
 }
@@ -1205,7 +1175,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ost_lvb lvb;
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -1227,8 +1197,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_flags = ast_flags;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1236,8 +1205,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
         oinfo.oi_policy = *policy;
         oinfo.oi_lockh = lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = ast_flags;
 
-        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
+        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
         *policy = oinfo.oi_policy;
         if (rc > 0)
                 rc = -EIO;
@@ -1258,9 +1228,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
                  * cancel the result of the truncate.  Getting the
                  * ll_inode_size_lock() after the enqueue maintains the DLM
                  * -> ll_inode_size_lock() acquiring order. */
-                inode->i_size = lvb.lvb_size;
-                CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n", 
-                       inode->i_ino, inode->i_size);
+                i_size_write(inode, lvb.lvb_size);
+                CDEBUG(D_INODE, "inode=%lu, updating i_size %llu\n",
+                       inode->i_ino, i_size_read(inode));
         }
 
         if (rc == 0) {
@@ -1291,6 +1261,332 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
         RETURN(rc);
 }
 
+static void ll_set_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        cfs_time_t now = cfs_time_current();
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_contention_time = now;
+        lli->lli_flags |= LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_flags &= ~LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        ENTRY;
+
+        if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+                CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+                       " osc connect flags = 0x"LPX64"\n",
+                       sbi->ll_lco.lco_flags);
+                RETURN(0);
+        }
+        if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+                RETURN(1);
+        if (lli->lli_flags & LLIF_CONTENDED) {
+                cfs_time_t cur_time = cfs_time_current();
+                cfs_time_t retry_time;
+
+                retry_time = cfs_time_add(
+                        lli->lli_contention_time,
+                        cfs_time_seconds(sbi->ll_contention_time));
+                if (cfs_time_after(cur_time, retry_time)) {
+                        ll_clear_file_contended(inode);
+                        RETURN(0);
+                }
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+                                 const char *buf, size_t count,
+                                 loff_t start, loff_t end, int rw)
+{
+        int append;
+        int tree_locked = 0;
+        int rc;
+        struct inode * inode = file->f_dentry->d_inode;
+        ENTRY;
+
+        append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
+
+        if (append || !ll_is_file_contended(file)) {
+                struct ll_lock_tree_node *node;
+                int ast_flags;
+
+                ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+                if (file->f_flags & O_NONBLOCK)
+                        ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+                node = ll_node_from_inode(inode, start, end,
+                                          (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
+                if (IS_ERR(node)) {
+                        rc = PTR_ERR(node);
+                        GOTO(out, rc);
+                }
+                tree->lt_fd = LUSTRE_FPRIVATE(file);
+                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                if (rc == 0)
+                        tree_locked = 1;
+                else if (rc == -EUSERS)
+                        ll_set_file_contended(inode);
+                else
+                        GOTO(out, rc);
+        }
+        RETURN(tree_locked);
+out:
+        return rc;
+}
+
+/**
+ * Checks if requested extent lock is compatible with a lock under a page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param page the page under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param start start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced or
+ * \post result == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_release_short_lock
+ */
+static int ll_reget_short_lock(struct page *page, int rw,
+                               obd_off start, obd_off end,
+                               void **cookie)
+{
+        struct ll_async_page *llap;
+        struct obd_export *exp;
+        struct inode *inode = page->mapping->host;
+
+        ENTRY;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                RETURN(0);
+
+        llap = llap_cast_private(page);
+        if (llap == NULL)
+                RETURN(0);
+
+        RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
+                                    &llap->llap_cookie, rw, start, end,
+                                    cookie));
+}
+
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_reget_short_lock
+ */
+static void ll_release_short_lock(struct inode *inode, obd_off end,
+                                  void *cookie, int rw)
+{
+        struct obd_export *exp;
+        int rc;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                return;
+
+        rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
+                                    cookie, rw);
+        if (rc < 0)
+                CERROR("unlock failed (%d)\n", rc);
+}
+
+/**
+ * Checks if requested extent lock is compatible
+ * with a lock under a page in page cache.
+ *
+ * Checks if a lock under some \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file the file under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ * \param buf userspace buffer for the data
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced
+ * \post retuls == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_file_put_fast_lock
+ */
+static inline int ll_file_get_fast_lock(struct file *file,
+                                        obd_off ppos, obd_off end,
+                                        char *buf, void **cookie, int rw)
+{
+        int rc = 0;
+        struct page *page;
+
+        ENTRY;
+
+        if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
+                page = find_lock_page(file->f_dentry->d_inode->i_mapping,
+                                      ppos >> CFS_PAGE_SHIFT);
+                if (page) {
+                        if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+                                rc = 1;
+
+                        unlock_page(page);
+                        page_cache_release(page);
+                }
+        }
+
+        RETURN(rc);
+}
+
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_fast_lock
+ */
+static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
+                                         void *cookie, int rw)
+{
+        ll_release_short_lock(inode, end, cookie, rw);
+}
+
+enum ll_lock_style {
+        LL_LOCK_STYLE_NOLOCK   = 0,
+        LL_LOCK_STYLE_FASTLOCK = 1,
+        LL_LOCK_STYLE_TREELOCK = 2
+};
+
+/**
+ * Checks if requested extent lock is compatible with a lock 
+ * under a page cache page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file file under which I/O is processed
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ * \param buf userspace buffer for the data
+ *
+ * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
+ * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
+ * \retval LL_LOCK_STYLE_NOLOCK got no lock
+ *
+ * \see ll_file_put_lock
+ */
+static inline int ll_file_get_lock(struct file *file, obd_off ppos,
+                                   obd_off end, char *buf, void **cookie,
+                                   struct ll_lock_tree *tree, int rw)
+{
+        int rc;
+
+        ENTRY;
+
+        if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
+                RETURN(LL_LOCK_STYLE_FASTLOCK);
+
+        rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
+        /* rc: 1 for tree lock, 0 for no lock, <0 for error */
+        switch (rc) {
+        case 1:
+                RETURN(LL_LOCK_STYLE_TREELOCK);
+        case 0:
+                RETURN(LL_LOCK_STYLE_NOLOCK);
+        }
+
+        /* an error happened if we reached this point, rc = -errno here */
+        RETURN(rc);
+}
+
+/**
+ * Drops the lock taken by ll_file_get_lock.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a tree or \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param lockstyle facility through which the lock was taken
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_lock
+ */
+static inline void ll_file_put_lock(struct inode *inode, obd_off end,
+                                    enum ll_lock_style lock_style,
+                                    void *cookie, struct ll_lock_tree *tree,
+                                    int rw)
+
+{
+        switch (lock_style) {
+        case LL_LOCK_STYLE_TREELOCK:
+                ll_tree_unlock(tree);
+                break;
+        case LL_LOCK_STYLE_FASTLOCK:
+                ll_file_put_fast_lock(inode, end, cookie, rw);
+                break;
+        default:
+                CERROR("invalid locking style (%d)\n", lock_style);
+        }
+}
+
 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                             loff_t *ppos)
 {
@@ -1299,12 +1595,13 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc, ra = 0;
-        loff_t end;
+        int ra = 0;
+        obd_off end;
         ssize_t retval, chunk, sum = 0;
+        int lock_style;
+        void *cookie;
 
         __u64 kms;
         ENTRY;
@@ -1328,11 +1625,11 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                  * unguarded */
 
                 /* Read beyond end of file */
-                if (*ppos >= inode->i_size)
+                if (*ppos >= i_size_read(inode))
                         RETURN(0);
 
-                if (count > inode->i_size - *ppos)
-                        count = inode->i_size - *ppos;
+                if (count > i_size_read(inode) - *ppos)
+                        count = i_size_read(inode) - *ppos;
                 /* Make sure to correctly adjust the file pos pointer for
                  * EFAULT case */
                 notzeroed = clear_user(buf, count);
@@ -1342,13 +1639,11 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         RETURN(-EFAULT);
                 RETURN(count);
         }
-
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
-                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
-                                (obd_off *)&end);
+                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
 
                 /* correct, the end is beyond the request */
                 if (end > *ppos + count - 1)
@@ -1361,16 +1656,10 @@ repeat:
                 end = *ppos + count - 1;
         }
 
-        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
-        if (IS_ERR(node)){
-                GOTO(out, retval = PTR_ERR(node));
-        }
-
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
+                                      buf, &cookie, &tree, OBD_BRW_READ);
+        if (lock_style < 0)
+                GOTO(out, retval = lock_style);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1401,7 +1690,9 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        ll_tree_unlock(&tree);
+                        if (lock_style != LL_LOCK_STYLE_NOLOCK)
+                                ll_file_put_lock(inode, end, lock_style,
+                                                 cookie, &tree, OBD_BRW_READ);
                         goto out;
                 }
         } else {
@@ -1411,35 +1702,37 @@ repeat:
                  * the kms size is _correct_, it is only the _minimum_ size.
                  * If someone does a stat they will get the correct size which
                  * will always be >= the kms value here.  b=11081 */
-                if (inode->i_size < kms)
-                        inode->i_size = kms;
+                if (i_size_read(inode) < kms)
+                        i_size_write(inode, kms);
                 ll_inode_size_unlock(inode, 1);
         }
 
         chunk = end - *ppos + 1;
         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-                        inode->i_ino, chunk, *ppos, inode->i_size);
+               inode->i_ino, chunk, *ppos, i_size_read(inode));
+
+        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
+                /* turn off the kernel's read-ahead */
+                file->f_ra.ra_pages = 0;
+
+                /* initialize read-ahead window once per syscall */
+                if (ra == 0) {
+                        ra = 1;
+                        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                        ll_ra_read_in(file, &bead);
+                }
 
-        /* turn off the kernel's read-ahead */
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        file->f_ramax = 0;
-#else
-        file->f_ra.ra_pages = 0;
-#endif
-        /* initialize read-ahead window once per syscall */
-        if (ra == 0) {
-                ra = 1;
-                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-                ll_ra_read_in(file, &bead);
+                /* BUG: 5972 */
+                file_accessed(file);
+                retval = generic_file_read(file, buf, chunk, ppos);
+                ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
+                                 OBD_BRW_READ);
+        } else {
+                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
         }
 
-        /* BUG: 5972 */
-        file_accessed(file);
-        retval = generic_file_read(file, buf, chunk, ppos);
-        ll_rw_stats_tally(sbi, current->pid, file, count, 0);
-
-        ll_tree_unlock(&tree);
+        ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
 
         if (retval > 0) {
                 buf += retval;
@@ -1466,11 +1759,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
-        int rc;
+        int tree_locked;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1518,23 +1810,18 @@ repeat:
                 lock_start = *ppos;
                 lock_end = *ppos + count - 1;
         }
-        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
 
-        if (IS_ERR(node))
-                GOTO(out, retval = PTR_ERR(node));
-
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+                                            lock_start, lock_end, OBD_BRW_WRITE);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         /* This is ok, g_f_w will overwrite this under i_sem if it races
          * with a local truncate, it just makes our maxbyte checking easier.
          * The i_size value gets updated in ll_extent_lock() as a consequence
          * of the [0,EOF] extent lock we requested above. */
         if (file->f_flags & O_APPEND) {
-                *ppos = inode->i_size;
+                *ppos = i_size_read(inode);
                 end = *ppos + count - 1;
         }
 
@@ -1542,18 +1829,23 @@ repeat:
                 send_sig(SIGXFSZ, current, 0);
                 GOTO(out_unlock, retval = -EFBIG);
         }
-        if (*ppos + count > maxbytes)
-                count = maxbytes - *ppos;
+        if (end > maxbytes - 1)
+                end = maxbytes - 1;
 
         /* generic_file_write handles O_APPEND after getting i_mutex */
         chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
-        retval = generic_file_write(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+        if (tree_locked)
+                retval = generic_file_write(file, buf, chunk, ppos);
+        else
+                retval = ll_file_lockless_io(file, (char*)buf, chunk,
+                                             ppos, WRITE);
+        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
 out_unlock:
-        ll_tree_unlock(&tree);
+        if (tree_locked)
+                ll_tree_unlock(&tree);
 
 out:
         if (retval > 0) {
@@ -1575,7 +1867,6 @@ out:
 /*
  * Send file content (through pagecache) somewhere with helper
  */
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                                 read_actor_t actor, void *target)
 {
@@ -1616,6 +1907,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (rc != 0)
                 RETURN(rc);
 
+        ll_clear_file_contended(inode);
         ll_inode_size_lock(inode, 1);
         /*
          * Consistency guarantees: following possibilities exist for the
@@ -1648,12 +1940,12 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
                         goto out;
         } else {
                 /* region is within kms and, hence, within real file size (A) */
-                inode->i_size = kms;
+                i_size_write(inode, kms);
                 ll_inode_size_unlock(inode, 1);
         }
 
         CDEBUG(D_INFO, "Send ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-               inode->i_ino, count, *ppos, inode->i_size);
+               inode->i_ino, count, *ppos, i_size_read(inode));
 
         bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
         bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
@@ -1667,7 +1959,6 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         ll_tree_unlock(&tree);
         RETURN(retval);
 }
-#endif
 
 static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
                                unsigned long arg)
@@ -1713,7 +2004,6 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
-        oti.oti_objid = NULL;
         memcpy(lsm2, lsm, lsm_size);
         rc = obd_create(exp, oa, &lsm2, &oti);
 
@@ -1781,7 +2071,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
         oc = ll_mdscapa_get(inode);
         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
                              oc, filename, strlen(filename) + 1,
-                             OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
+                             OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
+                             ll_i2suppgid(inode), &req);
         capa_put(oc);
         if (rc < 0) {
                 CDEBUG(D_INFO, "md_getattr_name failed "
@@ -1789,10 +2080,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL); /* checked by mdc_getattr_name */
-        /* swabbed by mdc_getattr_name */
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF);
 
         lmmsize = body->eadatasize;
 
@@ -1801,9 +2090,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc = -ENODATA);
         }
 
-        lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
+        lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
         LASSERT(lmm != NULL);
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
 
         /*
          * This is coming from the MDS, so is probably in
@@ -2015,8 +2303,8 @@ static int join_sanity_check(struct inode *head, struct inode *tail)
                 CERROR("file %lu can not be joined to itself \n", head->i_ino);
                 RETURN(-EINVAL);
         }
-        if (head->i_size % JOIN_FILE_ALIGN) {
-                CERROR("hsize %llu must be times of 64K\n", head->i_size);
+        if (i_size_read(head) % JOIN_FILE_ALIGN) {
+                CERROR("hsize %llu must be times of 64K\n", i_size_read(head));
                 RETURN(-EINVAL);
         }
         RETURN(0);
@@ -2025,29 +2313,31 @@ static int join_sanity_check(struct inode *head, struct inode *tail)
 static int join_file(struct inode *head_inode, struct file *head_filp,
                      struct file *tail_filp)
 {
-        struct inode *tail_inode, *tail_parent;
         struct dentry *tail_dentry = tail_filp->f_dentry;
         struct lookup_intent oit = {.it_op = IT_OPEN,
                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
+        struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
+                ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
         struct lustre_handle lockh;
         struct md_op_data *op_data;
         int    rc;
+        loff_t data;
         ENTRY;
 
         tail_dentry = tail_filp->f_dentry;
-        tail_inode = tail_dentry->d_inode;
-        tail_parent = tail_dentry->d_parent->d_inode;
 
-        op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent,
+        data = i_size_read(head_inode);
+        op_data = ll_prep_md_op_data(NULL, head_inode,
+                                     tail_dentry->d_parent->d_inode,
                                      tail_dentry->d_name.name,
                                      tail_dentry->d_name.len, 0,
-                                     LUSTRE_OPC_ANY, &head_inode->i_size);
+                                     LUSTRE_OPC_ANY, &data);
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-        rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
-                        op_data, &lockh, NULL, 0, ldlm_completion_ast,
-                        ll_md_blocking_ast, NULL, 0);
+        rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
+                         op_data, &lockh, NULL, 0, 0);
 
         ll_finish_md_op_data(op_data);
         if (rc < 0)
@@ -2201,6 +2491,49 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
         RETURN(rc);
 }
 
+/**
+ * Get size for inode for which FIEMAP mapping is requested.
+ * Make the FIEMAP get_info call and returns the result.
+ */
+int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
+              int num_bytes)
+{
+        struct obd_export *exp = ll_i2dtexp(inode);
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
+        int vallen = num_bytes;
+        int rc;
+        ENTRY;
+
+        /* If the stripe_count > 1 and the application does not understand
+         * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
+         */
+        if (lsm->lsm_stripe_count > 1 &&
+            !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
+                return -EOPNOTSUPP;
+
+        fm_key.oa.o_id = lsm->lsm_object_id;
+        fm_key.oa.o_gr = lsm->lsm_object_gr;
+        fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
+                        OBD_MD_FLSIZE);
+
+        /* If filesize is 0, then there would be no objects for mapping */
+        if (fm_key.oa.o_size == 0) {
+                fiemap->fm_mapped_extents = 0;
+                RETURN(0);
+        }
+
+        memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
+
+        rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
+        if (rc)
+                CERROR("obd_get_info failed: rc = %d\n", rc);
+
+        RETURN(rc);
+}
+
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                   unsigned long arg)
 {
@@ -2250,6 +2583,72 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
                 RETURN(ll_lov_recreate_obj(inode, file, arg));
+        case EXT3_IOC_FIEMAP: {
+                struct ll_user_fiemap *fiemap_s;
+                size_t num_bytes, ret_bytes;
+                unsigned int extent_count;
+                int rc = 0;
+
+                /* Get the extent count so we can calculate the size of
+                 * required fiemap buffer */
+                if (get_user(extent_count,
+                    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
+                        RETURN(-EFAULT);
+                num_bytes = sizeof(*fiemap_s) + (extent_count *
+                                                 sizeof(struct ll_fiemap_extent));
+                OBD_VMALLOC(fiemap_s, num_bytes);
+                if (fiemap_s == NULL)
+                        RETURN(-ENOMEM);
+
+                if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
+                                   sizeof(*fiemap_s)))
+                        GOTO(error, rc = -EFAULT);
+
+                if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
+                        fiemap_s->fm_flags = fiemap_s->fm_flags &
+                                                    ~LUSTRE_FIEMAP_FLAGS_COMPAT;
+                        if (copy_to_user((char *)arg, fiemap_s,
+                                         sizeof(*fiemap_s)))
+                                GOTO(error, rc = -EFAULT);
+
+                        GOTO(error, rc = -EBADR);
+                }
+
+                /* If fm_extent_count is non-zero, read the first extent since
+                 * it is used to calculate end_offset and device from previous
+                 * fiemap call. */
+                if (extent_count) {
+                        if (copy_from_user(&fiemap_s->fm_extents[0],
+                            (char __user *)arg + sizeof(*fiemap_s),
+                            sizeof(struct ll_fiemap_extent)))
+                                GOTO(error, rc = -EFAULT);
+                }
+
+                if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
+                        int rc;
+
+                        rc = filemap_fdatawrite(inode->i_mapping);
+                        if (rc)
+                                GOTO(error, rc);
+                }
+
+                rc = ll_fiemap(inode, fiemap_s, num_bytes);
+                if (rc)
+                        GOTO(error, rc);
+
+                ret_bytes = sizeof(struct ll_user_fiemap);
+
+                if (extent_count != 0)
+                        ret_bytes += (fiemap_s->fm_mapped_extents *
+                                         sizeof(struct ll_fiemap_extent));
+
+                if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
+                        rc = -EFAULT;
+
+error:
+                OBD_VFREE(fiemap_s, num_bytes);
+                RETURN(rc);
+        }
         case EXT3_IOC_GETFLAGS:
         case EXT3_IOC_SETFLAGS:
                 RETURN(ll_iocontrol(inode, file, cmd, arg));
@@ -2282,26 +2681,17 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         */
         case LL_IOC_FLUSHCTX:
                 RETURN(ll_flush_ctx(inode));
-        case LL_IOC_GETFACL: {
-                struct rmtacl_ioctl_data ioc;
+        default: {
+                int err;
 
-                if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
-                        RETURN(-EFAULT);
-
-                RETURN(ll_ioctl_getfacl(inode, &ioc));
-        }
-        case LL_IOC_SETFACL: {
-                struct rmtacl_ioctl_data ioc;
+                if (LLIOC_STOP == 
+                    ll_iocontrol_call(inode, file, cmd, arg, &err))
+                        RETURN(err);
 
-                if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
-                        RETURN(-EFAULT);
-
-                RETURN(ll_ioctl_setfacl(inode, &ioc));
-        }
-        default:
                 RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                      (void *)arg));
         }
+        }
 }
 
 loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
@@ -2311,13 +2701,13 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
         struct lov_stripe_md *lsm = lli->lli_smd;
         loff_t retval;
         ENTRY;
-        retval = offset + ((origin == 2) ? inode->i_size :
+        retval = offset + ((origin == 2) ? i_size_read(inode) :
                            (origin == 1) ? file->f_pos : 0);
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%Lu=%#Lx(%s)\n",
                inode->i_ino, inode->i_generation, inode, retval, retval,
                origin == 2 ? "SEEK_END": origin == 1 ? "SEEK_CUR" : "SEEK_SET");
         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
-        
+
         if (origin == 2) { /* SEEK_END */
                 int nonblock = 0, rc;
 
@@ -2331,7 +2721,7 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
                 }
 
                 ll_inode_size_lock(inode, 0);
-                offset += inode->i_size;
+                offset += i_size_read(inode);
                 ll_inode_size_unlock(inode, 0);
         } else if (origin == 1) { /* SEEK_CUR */
                 offset += file->f_pos;
@@ -2404,7 +2794,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
                                            OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                                            OBD_MD_FLGROUP);
 
-                oc = ll_osscapa_get(inode, 0, CAPA_OPC_OSS_WRITE);
+                oc = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
                 err = obd_sync(ll_i2sbi(inode)->ll_dt_exp, oa, lsm,
                                0, OBD_OBJECT_EOF, oc);
                 capa_put(oc);
@@ -2425,9 +2815,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                             fid_oid(ll_inode2fid(inode)),
                             fid_ver(ll_inode2fid(inode)),
                             LDLM_FLOCK} };
+        struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+                ldlm_flock_completion_ast, NULL, file_lock };
         struct lustre_handle lockh = {0};
         ldlm_policy_data_t flock;
-        ldlm_mode_t mode = 0;
         int flags = 0;
         int rc;
         ENTRY;
@@ -2449,7 +2840,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         switch (file_lock->fl_type) {
         case F_RDLCK:
-                mode = LCK_PR;
+                einfo.ei_mode = LCK_PR;
                 break;
         case F_UNLCK:
                 /* An unlock request may or may not have any relation to
@@ -2460,10 +2851,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                  * information that is given with a normal read or write record
                  * lock request. To avoid creating another ldlm unlock (cancel)
                  * message we'll treat a LCK_NL flock request as an unlock. */
-                mode = LCK_NL;
+                einfo.ei_mode = LCK_NL;
                 break;
         case F_WRLCK:
-                mode = LCK_PW;
+                einfo.ei_mode = LCK_PW;
                 break;
         default:
                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
@@ -2490,7 +2881,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 flags = LDLM_FL_TEST_LOCK;
                 /* Save the old mode so that if the mode in the lock changes we
                  * can decrement the appropriate reader or writer refcount. */
-                file_lock->fl_type = mode;
+                file_lock->fl_type = einfo.ei_mode;
                 break;
         default:
                 CERROR("unknown fcntl lock command: %d\n", cmd);
@@ -2499,16 +2890,16 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
-               flags, mode, flock.l_flock.start, flock.l_flock.end);
+               flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
-                              LDLM_FLOCK, &flock, mode, &flags, NULL,
-                              ldlm_flock_completion_ast, NULL, file_lock,
-                              NULL, 0, NULL, &lockh, 0);
-        if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
+        rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
+                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
+        if ((file_lock->fl_flags & FL_FLOCK) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK))
                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
 #ifdef HAVE_F_OP_FLOCK
-        if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
+        if ((file_lock->fl_flags & FL_POSIX) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK) &&
             !(flags & LDLM_FL_TEST_LOCK))
                 posix_lock_file_wait(file, file_lock);
 #endif
@@ -2539,13 +2930,30 @@ int ll_have_md_lock(struct inode *inode, __u64 bits)
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
-                          LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+                          LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
-
         RETURN(0);
 }
 
+ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
+                            struct lustre_handle *lockh)
+{
+        ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
+        ldlm_mode_t rc;
+        int flags;
+        ENTRY;
+
+        fid = &ll_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
+
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
+        RETURN(rc);
+}
+
 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
                               * and return success */
@@ -2584,9 +2992,6 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
                inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
-        ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_REVALIDATE, 1);
-#endif
 
         exp = ll_i2mdexp(inode);
 
@@ -2614,7 +3019,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         GOTO (out, rc);
                 }
 
-                rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
+                rc = ll_revalidate_it_finish(req, &oit, dentry);
                 if (rc != 0) {
                         ll_intent_release(&oit);
                         GOTO(out, rc);
@@ -2631,8 +3036,8 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 }
 
                 ll_lookup_finish_locks(&oit, dentry);
-        } else if (!ll_have_md_lock(dentry->d_inode,
-                                    MDS_INODELOCK_UPDATE)) {
+        } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
+                                                     MDS_INODELOCK_LOOKUP)) {
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
                 obd_valid valid = OBD_MD_FLGETATTR;
                 struct obd_capa *oc;
@@ -2656,8 +3061,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
-                                   NULL);
+                rc = ll_prep_inode(&inode, req, NULL);
                 if (rc)
                         GOTO(out, rc);
         }
@@ -2675,7 +3079,6 @@ out:
         return rc;
 }
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
                   struct lookup_intent *it, struct kstat *stat)
 {
@@ -2705,7 +3108,7 @@ int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
 #endif
 
         ll_inode_size_lock(inode, 0);
-        stat->size = inode->i_size;
+        stat->size = i_size_read(inode);
         stat->blocks = inode->i_blocks;
         ll_inode_size_unlock(inode, 0);
 
@@ -2717,7 +3120,6 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
 
         return ll_getattr_it(mnt, de, &it, stat);
 }
-#endif
 
 static
 int lustre_check_acl(struct inode *inode, int mask)
@@ -2756,11 +3158,7 @@ int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
         return generic_permission(inode, mask, lustre_check_acl);
 }
 #else
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0))
 int ll_inode_permission(struct inode *inode, int mask, struct nameidata *nd)
-#else
-int ll_inode_permission(struct inode *inode, int mask)
-#endif
 {
         int mode = inode->i_mode;
         int rc;
@@ -2820,9 +3218,7 @@ struct file_operations ll_file_operations = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         .sendfile       = ll_file_sendfile,
-#endif
         .fsync          = ll_fsync,
 };
 
@@ -2834,9 +3230,7 @@ struct file_operations ll_file_operations_flock = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         .sendfile       = ll_file_sendfile,
-#endif
         .fsync          = ll_fsync,
 #ifdef HAVE_F_OP_FLOCK
         .flock          = ll_file_flock,
@@ -2853,9 +3247,7 @@ struct file_operations ll_file_operations_noflock = {
         .release        = ll_file_release,
         .mmap           = ll_file_mmap,
         .llseek         = ll_file_seek,
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         .sendfile       = ll_file_sendfile,
-#endif
         .fsync          = ll_fsync,
 #ifdef HAVE_F_OP_FLOCK
         .flock          = ll_file_noflock,
@@ -2864,16 +3256,12 @@ struct file_operations ll_file_operations_noflock = {
 };
 
 struct inode_operations ll_file_inode_operations = {
-#ifdef LUSTRE_KERNEL_VERSION
+#ifdef HAVE_VFS_INTENT_PATCHES
         .setattr_raw    = ll_setattr_raw,
 #endif
         .setattr        = ll_setattr,
         .truncate       = ll_truncate,
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
         .getattr        = ll_getattr,
-#else
-        .revalidate_it  = ll_inode_revalidate_it,
-#endif
         .permission     = ll_inode_permission,
         .setxattr       = ll_setxattr,
         .getxattr       = ll_getxattr,
@@ -2881,3 +3269,102 @@ struct inode_operations ll_file_inode_operations = {
         .removexattr    = ll_removexattr,
 };
 
+/* dynamic ioctl number support routins */
+static struct llioc_ctl_data {
+        struct rw_semaphore ioc_sem;
+        struct list_head    ioc_head;
+} llioc = { 
+        __RWSEM_INITIALIZER(llioc.ioc_sem), 
+        CFS_LIST_HEAD_INIT(llioc.ioc_head)
+};
+
+
+struct llioc_data {
+        struct list_head        iocd_list;
+        unsigned int            iocd_size;
+        llioc_callback_t        iocd_cb;
+        unsigned int            iocd_count;
+        unsigned int            iocd_cmd[0];
+};
+
+void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
+{
+        unsigned int size;
+        struct llioc_data *in_data = NULL;
+        ENTRY;
+
+        if (cb == NULL || cmd == NULL ||
+            count > LLIOC_MAX_CMD || count < 0)
+                RETURN(NULL);
+
+        size = sizeof(*in_data) + count * sizeof(unsigned int);
+        OBD_ALLOC(in_data, size);
+        if (in_data == NULL)
+                RETURN(NULL);
+
+        memset(in_data, 0, sizeof(*in_data));
+        in_data->iocd_size = size;
+        in_data->iocd_cb = cb;
+        in_data->iocd_count = count;
+        memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
+
+        down_write(&llioc.ioc_sem);
+        list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
+        up_write(&llioc.ioc_sem);
+
+        RETURN(in_data);
+}
+
+void ll_iocontrol_unregister(void *magic)
+{
+        struct llioc_data *tmp;
+
+        if (magic == NULL)
+                return;
+
+        down_write(&llioc.ioc_sem);
+        list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
+                if (tmp == magic) {
+                        unsigned int size = tmp->iocd_size;
+
+                        list_del(&tmp->iocd_list);
+                        up_write(&llioc.ioc_sem);
+
+                        OBD_FREE(tmp, size);
+                        return;
+                }
+        }
+        up_write(&llioc.ioc_sem);
+
+        CWARN("didn't find iocontrol register block with magic: %p\n", magic);
+}
+
+EXPORT_SYMBOL(ll_iocontrol_register);
+EXPORT_SYMBOL(ll_iocontrol_unregister);
+
+enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, 
+                        unsigned int cmd, unsigned long arg, int *rcp)
+{
+        enum llioc_iter ret = LLIOC_CONT;
+        struct llioc_data *data;
+        int rc = -EINVAL, i;
+
+        down_read(&llioc.ioc_sem);
+        list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
+                for (i = 0; i < data->iocd_count; i++) {
+                        if (cmd != data->iocd_cmd[i]) 
+                                continue;
+
+                        ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
+                        break;
+                }
+
+                if (ret == LLIOC_STOP)
+                        break;
+        }
+        up_read(&llioc.ioc_sem);
+
+        if (rcp)
+                *rcp = rc;
+        return ret;
+}