Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / file.c
index 05afdd0..0b7082c 100644 (file)
@@ -1,25 +1,43 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
- *   Author: Andreas Dilger <adilger@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/llite/file.c
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LLITE
@@ -29,6 +47,7 @@
 #include <linux/pagemap.h>
 #include <linux/file.h>
 #include "llite_internal.h"
+#include <lustre/ll_fiemap.h>
 
 /* also used by llite/special.c:ll_special_open() */
 struct ll_file_data *ll_file_data_get(void)
@@ -286,15 +305,31 @@ int ll_file_release(struct inode *inode, struct file *file)
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
+#ifdef CONFIG_FS_POSIX_ACL
+        if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
+            inode == inode->i_sb->s_root->d_inode) {
+                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+
+                LASSERT(fd != NULL);
+                if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
+                        fd->fd_flags &= ~LL_FILE_RMTACL;
+                        rct_del(&sbi->ll_rct, cfs_curproc_pid());
+                        et_search_free(&sbi->ll_et, cfs_curproc_pid());
+                }
+        }
+#endif
 
-        ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
+        if (inode->i_sb->s_root != file->f_dentry)
+                ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
         fd = LUSTRE_FPRIVATE(file);
         LASSERT(fd != NULL);
 
-        /* don't do anything for / */
+        /* The last ref on @file, maybe not the the owner pid of statahead.
+         * Different processes can open the same dir, "ll_opendir_key" means:
+         * it is me that should stop the statahead thread. */
+        if (lli->lli_opendir_key == fd)
+                ll_stop_statahead(inode, fd);
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = NULL;
                 ll_file_data_put(fd);
@@ -319,6 +354,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
         struct md_op_data *op_data;
         struct ptlrpc_request *req;
         int rc;
+        ENTRY;
 
         if (!parent)
                 RETURN(-ENOENT);
@@ -366,8 +402,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                                  &itp->d.lustre.it_lock_handle, 
                                  file->f_dentry->d_inode);
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, DLM_REPLY_REC_OFF,
-                           NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
 out:
         ptlrpc_req_finished(itp->d.lustre.it_data);
 
@@ -386,9 +421,8 @@ static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
 
         LASSERT(och);
 
-        body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL);                      /* reply already checked out */
-        LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* and swabbed in md_enqueue */
 
         memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
@@ -419,15 +453,11 @@ int ll_local_open(struct file *file, struct lookup_intent *it,
                 if (rc)
                         RETURN(rc);
 
-                body = lustre_msg_buf(req->rq_repmsg,
-                                      DLM_REPLY_REC_OFF, sizeof(*body));
-
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 if ((it->it_flags & FMODE_WRITE) &&
                     (body->valid & OBD_MD_FLSIZE))
-                {
                         CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
                                lli->lli_ioepoch, PFID(&lli->lli_fid));
-                }
         }
 
         LUSTRE_FPRIVATE(file) = fd;
@@ -461,16 +491,12 @@ int ll_file_open(struct inode *inode, struct file *file)
         struct obd_client_handle **och_p;
         __u64 *och_usecount;
         struct ll_file_data *fd;
-        int rc = 0;
+        int rc = 0, opendir_set = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
                inode->i_generation, inode, file->f_flags);
 
-        /* don't do anything for / */
-        if (inode->i_sb->s_root == file->f_dentry)
-                RETURN(0);
-
 #ifdef HAVE_VFS_INTENT_PATCHES
         it = file->f_it;
 #else
@@ -482,7 +508,29 @@ int ll_file_open(struct inode *inode, struct file *file)
         if (fd == NULL)
                 RETURN(-ENOMEM);
 
-        /* don't do anything for / */
+        if (S_ISDIR(inode->i_mode)) {
+                spin_lock(&lli->lli_lock);
+                /* "lli->lli_opendir_pid != 0" means someone has set it.
+                 * "lli->lli_sai != NULL" means the previous statahead has not
+                 *                        been cleanup. */ 
+                if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) {
+                        opendir_set = 1;
+                        lli->lli_opendir_pid = cfs_curproc_pid();
+                        lli->lli_opendir_key = fd;
+                } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) {
+                        /* Two cases for this:
+                         * (1) The same process open such directory many times.
+                         * (2) The old process opened the directory, and exited
+                         *     before its children processes. Then new process
+                         *     with the same pid opens such directory before the
+                         *     old process's children processes exit.
+                         * Change the owner to the latest one. */
+                        opendir_set = 2;
+                        lli->lli_opendir_key = fd;
+                }
+                spin_unlock(&lli->lli_lock);
+        }
+
         if (inode->i_sb->s_root == file->f_dentry) {
                 LUSTRE_FPRIVATE(file) = fd;
                 RETURN(0);
@@ -511,6 +559,7 @@ int ll_file_open(struct inode *inode, struct file *file)
                 it = &oit;
         }
 
+restart:
         /* Let's see if we have file open on MDS already. */
         if (it->it_flags & FMODE_WRITE) {
                 och_p = &lli->lli_mds_write_och;
@@ -530,8 +579,9 @@ int ll_file_open(struct inode *inode, struct file *file)
                            let's close it somehow. This will decref request. */
                         rc = it_open_error(DISP_OPEN_OPEN, it);
                         if (rc) {
+                                up(&lli->lli_och_sem);
                                 ll_file_data_put(fd);
-                                GOTO(out_och_free, rc);
+                                GOTO(out_openerr, rc);
                         }       
                         ll_release_openhandle(file->f_dentry, it);
                         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, 
@@ -541,25 +591,26 @@ int ll_file_open(struct inode *inode, struct file *file)
 
                 rc = ll_local_open(file, it, fd, NULL);
                 if (rc) {
+                        (*och_usecount)--;
                         up(&lli->lli_och_sem);
                         ll_file_data_put(fd);
-                        RETURN(rc);
+                        GOTO(out_openerr, rc);
                 }
         } else {
                 LASSERT(*och_usecount == 0);
-                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
-                if (!*och_p) {
-                        ll_file_data_put(fd);
-                        GOTO(out_och_free, rc = -ENOMEM);
-                }
-                (*och_usecount)++;
                 if (!it->d.lustre.it_disposition) {
+                        /* We cannot just request lock handle now, new ELC code
+                           means that one of other OPEN locks for this file
+                           could be cancelled, and since blocking ast handler
+                           would attempt to grab och_sem as well, that would
+                           result in a deadlock */
+                        up(&lli->lli_och_sem);
                         it->it_flags |= O_CHECK_STALE;
                         rc = ll_intent_file_open(file, NULL, 0, it);
                         it->it_flags &= ~O_CHECK_STALE;
                         if (rc) {
                                 ll_file_data_put(fd);
-                                GOTO(out_och_free, rc);
+                                GOTO(out_openerr, rc);
                         }
 
                         /* Got some error? Release the request */
@@ -570,7 +621,14 @@ int ll_file_open(struct inode *inode, struct file *file)
                         md_set_lock_data(ll_i2sbi(inode)->ll_md_exp,
                                          &it->d.lustre.it_lock_handle,
                                          file->f_dentry->d_inode);
+                        goto restart;
                 }
+                OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
+                if (!*och_p) {
+                        ll_file_data_put(fd);
+                        GOTO(out_och_free, rc = -ENOMEM);
+                }
+                (*och_usecount)++;
                 req = it->d.lustre.it_data;
 
                 /* md_intent_lock() didn't get a request ref if there was an
@@ -587,7 +645,6 @@ int ll_file_open(struct inode *inode, struct file *file)
                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
                 rc = ll_local_open(file, it, fd, *och_p);
                 if (rc) {
-                        up(&lli->lli_och_sem);
                         ll_file_data_put(fd);
                         GOTO(out_och_free, rc);
                 }
@@ -624,6 +681,13 @@ out_och_free:
                         (*och_usecount)--;
                 }
                 up(&lli->lli_och_sem);
+out_openerr:
+                if (opendir_set == 1) {
+                        lli->lli_opendir_key = NULL;
+                        lli->lli_opendir_pid = 0;
+                } else if (unlikely(opendir_set == 2)) {
+                        ll_stop_statahead(inode, fd);
+                }
         }
 
         return rc;
@@ -673,27 +737,13 @@ int ll_inode_getattr(struct inode *inode, struct obdo *obdo)
                                  OBD_MD_FLCTIME | OBD_MD_FLSIZE);
 
         obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
-        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
+        CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n",
                lli->lli_smd->lsm_object_id, i_size_read(inode),
-               inode->i_blocks, inode->i_blksize);
+               (unsigned long long)inode->i_blocks,
+               (unsigned long)ll_inode_blksize(inode));
         RETURN(0);
 }
 
-static inline void ll_remove_suid(struct inode *inode)
-{
-        unsigned int mode;
-
-        /* set S_IGID if S_IXGRP is set, and always set S_ISUID */
-        mode = (inode->i_mode & S_IXGRP)*(S_ISGID/S_IXGRP) | S_ISUID;
-
-        /* was any of the uid bits set? */
-        mode &= inode->i_mode;
-        if (mode && !capable(CAP_FSETID)) {
-                inode->i_mode &= ~mode;
-                // XXX careful here - we cannot change the size
-        }
-}
-
 static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -702,9 +752,9 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
         struct {
                 char name[16];
                 struct ldlm_lock *lock;
-                struct lov_stripe_md *lsm;
-        } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+        } key = { .name = KEY_LOCK_TO_STRIPE, .lock = lock };
         __u32 stripe, vallen = sizeof(stripe);
+        struct lov_oinfo *loinfo;
         int rc;
         ENTRY;
 
@@ -712,7 +762,7 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
                 GOTO(check, stripe = 0);
 
         /* get our offset in the lov */
-        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
+        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe, lsm);
         if (rc != 0) {
                 CERROR("obd_get_info: rc = %d\n", rc);
                 RETURN(rc);
@@ -720,175 +770,103 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
         LASSERT(stripe < lsm->lsm_stripe_count);
 
 check:
-        if (lsm->lsm_oinfo[stripe]->loi_id != lock->l_resource->lr_name.name[0]||
-            lsm->lsm_oinfo[stripe]->loi_gr != lock->l_resource->lr_name.name[2]){
+        loinfo = lsm->lsm_oinfo[stripe];
+        if (!osc_res_name_eq(loinfo->loi_id, loinfo->loi_gr,
+                            &lock->l_resource->lr_name)){
                 LDLM_ERROR(lock, "resource doesn't match object "LPU64"/"LPU64,
-                           lsm->lsm_oinfo[stripe]->loi_id,
-                           lsm->lsm_oinfo[stripe]->loi_gr);
+                           loinfo->loi_id, loinfo->loi_gr);
                 RETURN(-ELDLM_NO_LOCK_DATA);
         }
 
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
-        struct address_space *mapping = inode->i_mapping;
-
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
         ENTRY;
-        tmpex = lock->l_policy_data;
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               i_size_read(inode));
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu",
-                           CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >>
-            CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(mapping);
-                if (ll_teardown_mmaps(mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
-
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
-
-                if (!mapping_has_pages(mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
 
-                cond_resched();
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                page = find_get_page(mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
+        mapping = page->mapping;
+
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
+
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
                 lock_page(page);
-
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                        wait_on_page_writeback(page);
-                        if (rc != 0) {
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
-                                if (rc == -ENOSPC)
-                                        set_bit(AS_ENOSPC, &mapping->flags);
-                                else
-                                        set_bit(AS_EIO, &mapping->flags);
-                        }
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-
-                if (rc2 <= 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        /* checking again to account for writeback's
-                         * lock_page() */
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, mapping);
-                        ll_truncate_complete_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+                set_bit(AS_EIO, &mapping->flags);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                /* checking again to account for writeback's lock_page() */
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -896,60 +874,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_queue_done_writing(inode, 0);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -1015,7 +970,6 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         struct lov_stripe_md *lsm;
         struct ost_lvb *lvb;
         int rc, stripe;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };
         ENTRY;
 
         if (inode == NULL)
@@ -1032,13 +986,16 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         if (stripe < 0)
                 GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
+        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                             sizeof(*lvb));
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc) {
                 CERROR("lustre_pack_reply: %d\n", rc);
                 GOTO(iput, rc);
         }
 
-        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));
+        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
         lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
         lvb->lvb_mtime = LTIME_S(inode->i_mtime);
         lvb->lvb_atime = LTIME_S(inode->i_atime);
@@ -1061,23 +1018,27 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         return rc;
 }
 
-static void ll_merge_lvb(struct inode *inode)
+static int ll_merge_lvb(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ost_lvb lvb;
+        int rc;
+
         ENTRY;
 
         ll_inode_size_lock(inode, 1);
         inode_init_lvb(inode, &lvb);
-        obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
+        rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
         i_size_write(inode, lvb.lvb_size);
         inode->i_blocks = lvb.lvb_blocks;
+
         LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
         LTIME_S(inode->i_atime) = lvb.lvb_atime;
         LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
         ll_inode_size_unlock(inode, 1);
-        EXIT;
+
+        RETURN(rc);
 }
 
 int ll_local_size(struct inode *inode)
@@ -1094,15 +1055,15 @@ int ll_local_size(struct inode *inode)
                 RETURN(0);
 
         rc = obd_match(sbi->ll_dt_exp, lli->lli_smd, LDLM_EXTENT,
-                       &policy, LCK_PR | LCK_PW, &flags, inode, &lockh);
+                       &policy, LCK_PR, &flags, inode, &lockh);
         if (rc < 0)
                 RETURN(rc);
         else if (rc == 0)
                 RETURN(-ENODATA);
 
-        ll_merge_lvb(inode);
-        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR | LCK_PW, &lockh);
-        RETURN(0);
+        rc = ll_merge_lvb(inode);
+        obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
+        RETURN(rc);
 }
 
 int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
@@ -1118,7 +1079,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -1181,7 +1142,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1199,10 +1160,10 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
                 RETURN(rc > 0 ? -EIO : rc);
         }
 
-        ll_merge_lvb(inode);
+        rc = ll_merge_lvb(inode);
 
-        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %lu\n",
-               i_size_read(inode), inode->i_blocks);
+        CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n",
+               i_size_read(inode), (unsigned long long)inode->i_blocks);
 
         RETURN(rc);
 }
@@ -1236,7 +1197,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1300,6 +1261,332 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
         RETURN(rc);
 }
 
+static void ll_set_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        cfs_time_t now = cfs_time_current();
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_contention_time = now;
+        lli->lli_flags |= LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_flags &= ~LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        ENTRY;
+
+        if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+                CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+                       " osc connect flags = 0x"LPX64"\n",
+                       sbi->ll_lco.lco_flags);
+                RETURN(0);
+        }
+        if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+                RETURN(1);
+        if (lli->lli_flags & LLIF_CONTENDED) {
+                cfs_time_t cur_time = cfs_time_current();
+                cfs_time_t retry_time;
+
+                retry_time = cfs_time_add(
+                        lli->lli_contention_time,
+                        cfs_time_seconds(sbi->ll_contention_time));
+                if (cfs_time_after(cur_time, retry_time)) {
+                        ll_clear_file_contended(inode);
+                        RETURN(0);
+                }
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+                                 const char *buf, size_t count,
+                                 loff_t start, loff_t end, int rw)
+{
+        int append;
+        int tree_locked = 0;
+        int rc;
+        struct inode * inode = file->f_dentry->d_inode;
+        ENTRY;
+
+        append = (rw == OBD_BRW_WRITE) && (file->f_flags & O_APPEND);
+
+        if (append || !ll_is_file_contended(file)) {
+                struct ll_lock_tree_node *node;
+                int ast_flags;
+
+                ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+                if (file->f_flags & O_NONBLOCK)
+                        ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+                node = ll_node_from_inode(inode, start, end,
+                                          (rw == OBD_BRW_WRITE) ? LCK_PW : LCK_PR);
+                if (IS_ERR(node)) {
+                        rc = PTR_ERR(node);
+                        GOTO(out, rc);
+                }
+                tree->lt_fd = LUSTRE_FPRIVATE(file);
+                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                if (rc == 0)
+                        tree_locked = 1;
+                else if (rc == -EUSERS)
+                        ll_set_file_contended(inode);
+                else
+                        GOTO(out, rc);
+        }
+        RETURN(tree_locked);
+out:
+        return rc;
+}
+
+/**
+ * Checks if requested extent lock is compatible with a lock under a page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param page the page under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param start start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced or
+ * \post result == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_release_short_lock
+ */
+static int ll_reget_short_lock(struct page *page, int rw,
+                               obd_off start, obd_off end,
+                               void **cookie)
+{
+        struct ll_async_page *llap;
+        struct obd_export *exp;
+        struct inode *inode = page->mapping->host;
+
+        ENTRY;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                RETURN(0);
+
+        llap = llap_cast_private(page);
+        if (llap == NULL)
+                RETURN(0);
+
+        RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
+                                    &llap->llap_cookie, rw, start, end,
+                                    cookie));
+}
+
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_reget_short_lock
+ */
+static void ll_release_short_lock(struct inode *inode, obd_off end,
+                                  void *cookie, int rw)
+{
+        struct obd_export *exp;
+        int rc;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                return;
+
+        rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
+                                    cookie, rw);
+        if (rc < 0)
+                CERROR("unlock failed (%d)\n", rc);
+}
+
+/**
+ * Checks if requested extent lock is compatible
+ * with a lock under a page in page cache.
+ *
+ * Checks if a lock under some \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file the file under which lock is considered
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ * \param buf userspace buffer for the data
+ *
+ * \post result == 1, *cookie == context, appropriate lock is referenced
+ * \post retuls == 0
+ *
+ * \retval 1 owned lock is reused for the request
+ * \retval 0 no lock reused for the request
+ *
+ * \see ll_file_put_fast_lock
+ */
+static inline int ll_file_get_fast_lock(struct file *file,
+                                        obd_off ppos, obd_off end,
+                                        char *buf, void **cookie, int rw)
+{
+        int rc = 0;
+        struct page *page;
+
+        ENTRY;
+
+        if (!ll_region_mapped((unsigned long)buf, end - ppos)) {
+                page = find_lock_page(file->f_dentry->d_inode->i_mapping,
+                                      ppos >> CFS_PAGE_SHIFT);
+                if (page) {
+                        if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+                                rc = 1;
+
+                        unlock_page(page);
+                        page_cache_release(page);
+                }
+        }
+
+        RETURN(rc);
+}
+
+/**
+ * Releases a reference to a lock taken in a "fast" way.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_fast_lock
+ */
+static inline void ll_file_put_fast_lock(struct inode *inode, obd_off end,
+                                         void *cookie, int rw)
+{
+        ll_release_short_lock(inode, end, cookie, rw);
+}
+
+enum ll_lock_style {
+        LL_LOCK_STYLE_NOLOCK   = 0,
+        LL_LOCK_STYLE_FASTLOCK = 1,
+        LL_LOCK_STYLE_TREELOCK = 2
+};
+
+/**
+ * Checks if requested extent lock is compatible with a lock 
+ * under a page cache page.
+ *
+ * Checks if the lock under \a page is compatible with a read or write lock
+ * (specified by \a rw) for an extent [\a start , \a end].
+ *
+ * \param file file under which I/O is processed
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param ppos start of the requested extent
+ * \param end end of the requested extent
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ * \param buf userspace buffer for the data
+ *
+ * \retval LL_LOCK_STYLE_FASTLOCK owned lock is reused through fast lock
+ * \retval LL_LOCK_STYLE_TREELOCK got a lock through tree lock
+ * \retval LL_LOCK_STYLE_NOLOCK got no lock
+ *
+ * \see ll_file_put_lock
+ */
+static inline int ll_file_get_lock(struct file *file, obd_off ppos,
+                                   obd_off end, char *buf, void **cookie,
+                                   struct ll_lock_tree *tree, int rw)
+{
+        int rc;
+
+        ENTRY;
+
+        if (ll_file_get_fast_lock(file, ppos, end, buf, cookie, rw))
+                RETURN(LL_LOCK_STYLE_FASTLOCK);
+
+        rc = ll_file_get_tree_lock(tree, file, buf, ppos - end, ppos, end, rw);
+        /* rc: 1 for tree lock, 0 for no lock, <0 for error */
+        switch (rc) {
+        case 1:
+                RETURN(LL_LOCK_STYLE_TREELOCK);
+        case 0:
+                RETURN(LL_LOCK_STYLE_NOLOCK);
+        }
+
+        /* an error happened if we reached this point, rc = -errno here */
+        RETURN(rc);
+}
+
+/**
+ * Drops the lock taken by ll_file_get_lock.
+ *
+ * Releases a read or a write (specified by \a rw) lock
+ * referenced by \a tree or \a cookie.
+ *
+ * \param inode inode to which data belong
+ * \param end end of the locked extent
+ * \param lockstyle facility through which the lock was taken
+ * \param rw OBD_BRW_READ if requested for reading,
+ *           OBD_BRW_WRITE if requested for writing
+ * \param cookie transparent parameter for passing locking context
+ *           (only used with LL_LOCK_STYLE_FASTLOCK)
+ * \param tree lock tree (only used with LL_LOCK_STYLE_TREELOCK)
+ *
+ * \post appropriate lock is dereferenced
+ *
+ * \see ll_file_get_lock
+ */
+static inline void ll_file_put_lock(struct inode *inode, obd_off end,
+                                    enum ll_lock_style lock_style,
+                                    void *cookie, struct ll_lock_tree *tree,
+                                    int rw)
+
+{
+        switch (lock_style) {
+        case LL_LOCK_STYLE_TREELOCK:
+                ll_tree_unlock(tree);
+                break;
+        case LL_LOCK_STYLE_FASTLOCK:
+                ll_file_put_fast_lock(inode, end, cookie, rw);
+                break;
+        default:
+                CERROR("invalid locking style (%d)\n", lock_style);
+        }
+}
+
 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                             loff_t *ppos)
 {
@@ -1308,12 +1595,13 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc, ra = 0;
-        loff_t end;
+        int ra = 0;
+        obd_off end;
         ssize_t retval, chunk, sum = 0;
+        int lock_style;
+        void *cookie;
 
         __u64 kms;
         ENTRY;
@@ -1351,13 +1639,11 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         RETURN(-EFAULT);
                 RETURN(count);
         }
-
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
-                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, 
-                                (obd_off *)&end);
+                obd_extent_calc(sbi->ll_dt_exp, lsm, OBD_CALC_STRIPE_END, &end);
 
                 /* correct, the end is beyond the request */
                 if (end > *ppos + count - 1)
@@ -1370,16 +1656,10 @@ repeat:
                 end = *ppos + count - 1;
         }
 
-        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
-        if (IS_ERR(node)){
-                GOTO(out, retval = PTR_ERR(node));
-        }
-
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        lock_style = ll_file_get_lock(file, (obd_off)(*ppos), end,
+                                      buf, &cookie, &tree, OBD_BRW_READ);
+        if (lock_style < 0)
+                GOTO(out, retval = lock_style);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1410,7 +1690,9 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        ll_tree_unlock(&tree);
+                        if (lock_style != LL_LOCK_STYLE_NOLOCK)
+                                ll_file_put_lock(inode, end, lock_style,
+                                                 cookie, &tree, OBD_BRW_READ);
                         goto out;
                 }
         } else {
@@ -1429,23 +1711,28 @@ repeat:
         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
                inode->i_ino, chunk, *ppos, i_size_read(inode));
 
-        /* turn off the kernel's read-ahead */
-        file->f_ra.ra_pages = 0;
+        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
+                /* turn off the kernel's read-ahead */
+                file->f_ra.ra_pages = 0;
 
-        /* initialize read-ahead window once per syscall */
-        if (ra == 0) {
-                ra = 1;
-                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-                ll_ra_read_in(file, &bead);
-        }
+                /* initialize read-ahead window once per syscall */
+                if (ra == 0) {
+                        ra = 1;
+                        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                        ll_ra_read_in(file, &bead);
+                }
 
-        /* BUG: 5972 */
-        file_accessed(file);
-        retval = generic_file_read(file, buf, chunk, ppos);
-        ll_rw_stats_tally(sbi, current->pid, file, count, 0);
+                /* BUG: 5972 */
+                file_accessed(file);
+                retval = generic_file_read(file, buf, chunk, ppos);
+                ll_file_put_lock(inode, end, lock_style, cookie, &tree, 
+                                 OBD_BRW_READ);
+        } else {
+                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+        }
 
-        ll_tree_unlock(&tree);
+        ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
 
         if (retval > 0) {
                 buf += retval;
@@ -1472,11 +1759,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
-        int rc;
+        int tree_locked;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1524,16 +1810,11 @@ repeat:
                 lock_start = *ppos;
                 lock_end = *ppos + count - 1;
         }
-        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
-
-        if (IS_ERR(node))
-                GOTO(out, retval = PTR_ERR(node));
 
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+                                            lock_start, lock_end, OBD_BRW_WRITE);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         /* This is ok, g_f_w will overwrite this under i_sem if it races
          * with a local truncate, it just makes our maxbyte checking easier.
@@ -1548,18 +1829,23 @@ repeat:
                 send_sig(SIGXFSZ, current, 0);
                 GOTO(out_unlock, retval = -EFBIG);
         }
-        if (*ppos + count > maxbytes)
-                count = maxbytes - *ppos;
+        if (end > maxbytes - 1)
+                end = maxbytes - 1;
 
         /* generic_file_write handles O_APPEND after getting i_mutex */
         chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
-        retval = generic_file_write(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+        if (tree_locked)
+                retval = generic_file_write(file, buf, chunk, ppos);
+        else
+                retval = ll_file_lockless_io(file, (char*)buf, chunk,
+                                             ppos, WRITE);
+        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
 out_unlock:
-        ll_tree_unlock(&tree);
+        if (tree_locked)
+                ll_tree_unlock(&tree);
 
 out:
         if (retval > 0) {
@@ -1621,6 +1907,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (rc != 0)
                 RETURN(rc);
 
+        ll_clear_file_contended(inode);
         ll_inode_size_lock(inode, 1);
         /*
          * Consistency guarantees: following possibilities exist for the
@@ -1717,7 +2004,6 @@ static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
         obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                         OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
-        oti.oti_objid = NULL;
         memcpy(lsm2, lsm, lsm_size);
         rc = obd_create(exp, oa, &lsm2, &oti);
 
@@ -1785,7 +2071,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
         oc = ll_mdscapa_get(inode);
         rc = md_getattr_name(sbi->ll_md_exp, ll_inode2fid(inode),
                              oc, filename, strlen(filename) + 1,
-                             OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize, &req);
+                             OBD_MD_FLEASIZE | OBD_MD_FLDIREA, lmmsize,
+                             ll_i2suppgid(inode), &req);
         capa_put(oc);
         if (rc < 0) {
                 CDEBUG(D_INFO, "md_getattr_name failed "
@@ -1793,10 +2080,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc);
         }
 
-        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
+        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
         LASSERT(body != NULL); /* checked by mdc_getattr_name */
-        /* swabbed by mdc_getattr_name */
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF);
 
         lmmsize = body->eadatasize;
 
@@ -1805,9 +2090,8 @@ int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
                 GOTO(out, rc = -ENODATA);
         }
 
-        lmm = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, lmmsize);
+        lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
         LASSERT(lmm != NULL);
-        LASSERT_REPSWABBED(req, REPLY_REC_OFF + 1);
 
         /*
          * This is coming from the MDS, so is probably in
@@ -2207,6 +2491,49 @@ int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
         RETURN(rc);
 }
 
+/**
+ * Get size for inode for which FIEMAP mapping is requested.
+ * Make the FIEMAP get_info call and returns the result.
+ */
+int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
+              int num_bytes)
+{
+        struct obd_export *exp = ll_i2dtexp(inode);
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
+        int vallen = num_bytes;
+        int rc;
+        ENTRY;
+
+        /* If the stripe_count > 1 and the application does not understand
+         * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
+         */
+        if (lsm->lsm_stripe_count > 1 &&
+            !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
+                return -EOPNOTSUPP;
+
+        fm_key.oa.o_id = lsm->lsm_object_id;
+        fm_key.oa.o_gr = lsm->lsm_object_gr;
+        fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLFID | OBD_MD_FLGROUP |
+                        OBD_MD_FLSIZE);
+
+        /* If filesize is 0, then there would be no objects for mapping */
+        if (fm_key.oa.o_size == 0) {
+                fiemap->fm_mapped_extents = 0;
+                RETURN(0);
+        }
+
+        memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
+
+        rc = obd_get_info(exp, sizeof(fm_key), &fm_key, &vallen, fiemap, lsm);
+        if (rc)
+                CERROR("obd_get_info failed: rc = %d\n", rc);
+
+        RETURN(rc);
+}
+
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                   unsigned long arg)
 {
@@ -2256,6 +2583,72 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
                 RETURN(ll_lov_getstripe(inode, arg));
         case LL_IOC_RECREATE_OBJ:
                 RETURN(ll_lov_recreate_obj(inode, file, arg));
+        case EXT3_IOC_FIEMAP: {
+                struct ll_user_fiemap *fiemap_s;
+                size_t num_bytes, ret_bytes;
+                unsigned int extent_count;
+                int rc = 0;
+
+                /* Get the extent count so we can calculate the size of
+                 * required fiemap buffer */
+                if (get_user(extent_count,
+                    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
+                        RETURN(-EFAULT);
+                num_bytes = sizeof(*fiemap_s) + (extent_count *
+                                                 sizeof(struct ll_fiemap_extent));
+                OBD_VMALLOC(fiemap_s, num_bytes);
+                if (fiemap_s == NULL)
+                        RETURN(-ENOMEM);
+
+                if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
+                                   sizeof(*fiemap_s)))
+                        GOTO(error, rc = -EFAULT);
+
+                if (fiemap_s->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
+                        fiemap_s->fm_flags = fiemap_s->fm_flags &
+                                                    ~LUSTRE_FIEMAP_FLAGS_COMPAT;
+                        if (copy_to_user((char *)arg, fiemap_s,
+                                         sizeof(*fiemap_s)))
+                                GOTO(error, rc = -EFAULT);
+
+                        GOTO(error, rc = -EBADR);
+                }
+
+                /* If fm_extent_count is non-zero, read the first extent since
+                 * it is used to calculate end_offset and device from previous
+                 * fiemap call. */
+                if (extent_count) {
+                        if (copy_from_user(&fiemap_s->fm_extents[0],
+                            (char __user *)arg + sizeof(*fiemap_s),
+                            sizeof(struct ll_fiemap_extent)))
+                                GOTO(error, rc = -EFAULT);
+                }
+
+                if (fiemap_s->fm_flags & FIEMAP_FLAG_SYNC) {
+                        int rc;
+
+                        rc = filemap_fdatawrite(inode->i_mapping);
+                        if (rc)
+                                GOTO(error, rc);
+                }
+
+                rc = ll_fiemap(inode, fiemap_s, num_bytes);
+                if (rc)
+                        GOTO(error, rc);
+
+                ret_bytes = sizeof(struct ll_user_fiemap);
+
+                if (extent_count != 0)
+                        ret_bytes += (fiemap_s->fm_mapped_extents *
+                                         sizeof(struct ll_fiemap_extent));
+
+                if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
+                        rc = -EFAULT;
+
+error:
+                OBD_VFREE(fiemap_s, num_bytes);
+                RETURN(rc);
+        }
         case EXT3_IOC_GETFLAGS:
         case EXT3_IOC_SETFLAGS:
                 RETURN(ll_iocontrol(inode, file, cmd, arg));
@@ -2288,22 +2681,6 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         */
         case LL_IOC_FLUSHCTX:
                 RETURN(ll_flush_ctx(inode));
-        case LL_IOC_GETFACL: {
-                struct rmtacl_ioctl_data ioc;
-
-                if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
-                        RETURN(-EFAULT);
-
-                RETURN(ll_ioctl_getfacl(inode, &ioc));
-        }
-        case LL_IOC_SETFACL: {
-                struct rmtacl_ioctl_data ioc;
-
-                if (copy_from_user(&ioc, (void *)arg, sizeof(ioc)))
-                        RETURN(-EFAULT);
-
-                RETURN(ll_ioctl_setfacl(inode, &ioc));
-        }
         default: {
                 int err;
 
@@ -2517,10 +2894,12 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
                               &flock, &flags, NULL, 0, NULL, &lockh, 0);
-        if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
+        if ((file_lock->fl_flags & FL_FLOCK) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK))
                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
 #ifdef HAVE_F_OP_FLOCK
-        if ((file_lock->fl_flags & FL_POSIX) && (rc == 0) &&
+        if ((file_lock->fl_flags & FL_POSIX) &&
+            (rc == 0 || file_lock->fl_type == F_UNLCK) &&
             !(flags & LDLM_FL_TEST_LOCK))
                 posix_lock_file_wait(file, file_lock);
 #endif
@@ -2551,13 +2930,30 @@ int ll_have_md_lock(struct inode *inode, __u64 bits)
 
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
         if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
-                          LCK_CR|LCK_CW|LCK_PR, &lockh)) {
+                          LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh)) {
                 RETURN(1);
         }
-
         RETURN(0);
 }
 
+ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
+                            struct lustre_handle *lockh)
+{
+        ldlm_policy_data_t policy = { .l_inodebits = {bits}};
+        struct lu_fid *fid;
+        ldlm_mode_t rc;
+        int flags;
+        ENTRY;
+
+        fid = &ll_i2info(inode)->lli_fid;
+        CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
+
+        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
+        rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+                           LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
+        RETURN(rc);
+}
+
 static int ll_inode_revalidate_fini(struct inode *inode, int rc) {
         if (rc == -ENOENT) { /* Already unlinked. Just update nlink
                               * and return success */
@@ -2623,7 +3019,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         GOTO (out, rc);
                 }
 
-                rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, &oit, dentry);
+                rc = ll_revalidate_it_finish(req, &oit, dentry);
                 if (rc != 0) {
                         ll_intent_release(&oit);
                         GOTO(out, rc);
@@ -2640,8 +3036,8 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                 }
 
                 ll_lookup_finish_locks(&oit, dentry);
-        } else if (!ll_have_md_lock(dentry->d_inode,
-                                    MDS_INODELOCK_UPDATE)) {
+        } else if (!ll_have_md_lock(dentry->d_inode, MDS_INODELOCK_UPDATE |
+                                                     MDS_INODELOCK_LOOKUP)) {
                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
                 obd_valid valid = OBD_MD_FLGETATTR;
                 struct obd_capa *oc;
@@ -2665,8 +3061,7 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, REPLY_REC_OFF,
-                                   NULL);
+                rc = ll_prep_inode(&inode, req, NULL);
                 if (rc)
                         GOTO(out, rc);
         }