Whamcloud - gitweb
LU-11071 osd-ldiskfs: support bio integrity with Ubuntu 18
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_io.c
index c3f43e9..55e238b 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -42,6 +42,8 @@
 #include <linux/types.h>
 /* prerequisite for linux/xattr.h */
 #include <linux/fs.h>
+#include <linux/mm.h>
+#include <linux/pagevec.h>
 
 /*
  * struct OBD_{ALLOC,FREE}*()
 /* ext_depth() */
 #include <ldiskfs/ldiskfs_extents.h>
 
+static inline bool osd_use_page_cache(struct osd_device *d)
+{
+       /* do not use pagecache if write and read caching are disabled */
+       if (d->od_writethrough_cache + d->od_read_cache == 0)
+               return false;
+       /* use pagecache by default */
+       return true;
+}
+
 static int __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,
                            int rw, int line, int pages)
 {
@@ -71,7 +82,7 @@ static int __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,
        iobuf->dr_error = 0;
        iobuf->dr_dev = d;
        iobuf->dr_frags = 0;
-       iobuf->dr_elapsed = 0;
+       iobuf->dr_elapsed = ktime_set(0, 0);
        /* must be counted before, so assert */
        iobuf->dr_rw = rw;
        iobuf->dr_init_at = line;
@@ -106,6 +117,12 @@ static int __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,
        if (unlikely(iobuf->dr_pages == NULL))
                return -ENOMEM;
 
+       lu_buf_realloc(&iobuf->dr_lnb_buf,
+                      pages * sizeof(iobuf->dr_lnbs[0]));
+       iobuf->dr_lnbs = iobuf->dr_lnb_buf.lb_buf;
+       if (unlikely(iobuf->dr_lnbs == NULL))
+               return -ENOMEM;
+
        iobuf->dr_max_pages = pages;
 
        return 0;
@@ -113,10 +130,13 @@ static int __osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,
 #define osd_init_iobuf(dev, iobuf, rw, pages) \
        __osd_init_iobuf(dev, iobuf, rw, __LINE__, pages)
 
-static void osd_iobuf_add_page(struct osd_iobuf *iobuf, struct page *page)
+static void osd_iobuf_add_page(struct osd_iobuf *iobuf,
+                              struct niobuf_local *lnb)
 {
-        LASSERT(iobuf->dr_npages < iobuf->dr_max_pages);
-        iobuf->dr_pages[iobuf->dr_npages++] = page;
+       LASSERT(iobuf->dr_npages < iobuf->dr_max_pages);
+       iobuf->dr_pages[iobuf->dr_npages] = lnb->lnb_page;
+       iobuf->dr_lnbs[iobuf->dr_npages] = lnb;
+       iobuf->dr_npages++;
 }
 
 void osd_fini_iobuf(struct osd_device *d, struct osd_iobuf *iobuf)
@@ -130,19 +150,19 @@ void osd_fini_iobuf(struct osd_device *d, struct osd_iobuf *iobuf)
                 lprocfs_oh_tally(&d->od_brw_stats.
                                  hist[BRW_R_DIO_FRAGS+rw],
                                  iobuf->dr_frags);
-                lprocfs_oh_tally_log2(&d->od_brw_stats.hist[BRW_R_IO_TIME+rw],
-                                      iobuf->dr_elapsed);
+               lprocfs_oh_tally_log2(&d->od_brw_stats.hist[BRW_R_IO_TIME+rw],
+                                     ktime_to_ms(iobuf->dr_elapsed));
         }
 }
 
-#ifndef REQ_WRITE /* pre-2.6.35 */
-#define __REQ_WRITE BIO_RW
-#endif
-
 #ifdef HAVE_BIO_ENDIO_USES_ONE_ARG
 static void dio_complete_routine(struct bio *bio)
 {
+# ifdef HAVE_BI_STATUS
+       int error = bio->bi_status;
+# else
        int error = bio->bi_error;
+# endif
 #else
 static void dio_complete_routine(struct bio *bio, int error)
 {
@@ -158,16 +178,26 @@ static void dio_complete_routine(struct bio *bio, int error)
                CERROR("***** bio->bi_private is NULL!  This should never "
                       "happen.  Normally, I would crash here, but instead I "
                       "will dump the bio contents to the console.  Please "
-                      "report this to <https://jira.hpdd.intel.com/> , along "
+                      "report this to <https://jira.whamcloud.com/> , along "
                       "with any interesting messages leading up to this point "
                       "(like SCSI errors, perhaps).  Because bi_private is "
                       "NULL, I can't wake up the thread that initiated this "
                       "IO - you will probably have to reboot this node.\n");
-               CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
-                      "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
-                      "bi_private: %p\n", bio->bi_next,
+               CERROR("bi_next: %p, bi_flags: %lx, "
+#ifdef HAVE_BI_RW
+                      "bi_rw: %lu,"
+#else
+                      "bi_opf: %u,"
+#endif
+                      "bi_vcnt: %d, bi_idx: %d, bi->size: %d, bi_end_io: %p,"
+                      "bi_cnt: %d, bi_private: %p\n", bio->bi_next,
                        (unsigned long)bio->bi_flags,
-                       bio->bi_rw, bio->bi_vcnt, bio_idx(bio),
+#ifdef HAVE_BI_RW
+                       bio->bi_rw,
+#else
+                       bio->bi_opf,
+#endif
+                       bio->bi_vcnt, bio_idx(bio),
                        bio_sectors(bio) << 9, bio->bi_end_io,
 #ifdef HAVE_BI_CNT
                        atomic_read(&bio->bi_cnt),
@@ -179,7 +209,7 @@ static void dio_complete_routine(struct bio *bio, int error)
        }
 
        /* the check is outside of the cycle for performance reason -bzzz */
-       if (!test_bit(__REQ_WRITE, &bio->bi_rw)) {
+       if (!bio_data_dir(bio)) {
                bio_for_each_segment_all(bvl, bio, iter) {
                        if (likely(error == 0))
                                SetPageUptodate(bvl_to_page(bvl));
@@ -202,7 +232,9 @@ static void dio_complete_routine(struct bio *bio, int error)
         * call to OSD.
         */
        if (atomic_read(&iobuf->dr_numreqs) == 1) {
-               iobuf->dr_elapsed = jiffies - iobuf->dr_start_time;
+               ktime_t now = ktime_get();
+
+               iobuf->dr_elapsed = ktime_sub(now, iobuf->dr_start_time);
                iobuf->dr_elapsed_valid = 1;
        }
        if (atomic_dec_and_test(&iobuf->dr_numreqs))
@@ -242,10 +274,15 @@ static void record_start_io(struct osd_iobuf *iobuf, int size)
 static void osd_submit_bio(int rw, struct bio *bio)
 {
         LASSERTF(rw == 0 || rw == 1, "%x\n", rw);
+#ifdef HAVE_SUBMIT_BIO_2ARGS
         if (rw == 0)
                 submit_bio(READ, bio);
         else
                 submit_bio(WRITE, bio);
+#else
+        bio->bi_opf |= rw;
+        submit_bio(bio);
+#endif
 }
 
 static int can_be_merged(struct bio *bio, sector_t sector)
@@ -256,32 +293,222 @@ static int can_be_merged(struct bio *bio, sector_t sector)
        return bio_end_sector(bio) == sector ? 1 : 0;
 }
 
+/*
+ * This function will change the data written, thus it should only be
+ * used when checking data integrity feature
+ */
+static void bio_integrity_fault_inject(struct bio *bio)
+{
+       struct bio_vec *bvec;
+       int i;
+       void *kaddr;
+       char *addr;
+
+       bio_for_each_segment_all(bvec, bio, i) {
+               struct page *page = bvec->bv_page;
+
+               kaddr = kmap(page);
+               addr = kaddr;
+               *addr = ~(*addr);
+               kunmap(page);
+               break;
+       }
+}
+
+static int bio_dif_compare(__u16 *expected_guard_buf, void *bio_prot_buf,
+                          unsigned int sectors, int tuple_size)
+{
+       __u16 *expected_guard;
+       __u16 *bio_guard;
+       int i;
+
+       expected_guard = expected_guard_buf;
+       for (i = 0; i < sectors; i++) {
+               bio_guard = (__u16 *)bio_prot_buf;
+               if (*bio_guard != *expected_guard) {
+                       CERROR("unexpected guard tags on sector %d "
+                              "expected guard %u, bio guard "
+                              "%u, sectors %u, tuple size %d\n",
+                              i, *expected_guard, *bio_guard, sectors,
+                              tuple_size);
+                       return -EIO;
+               }
+               expected_guard++;
+               bio_prot_buf += tuple_size;
+       }
+       return 0;
+}
+
+static int osd_bio_integrity_compare(struct bio *bio, struct block_device *bdev,
+                                    struct osd_iobuf *iobuf, int index)
+{
+       struct blk_integrity *bi = bdev_get_integrity(bdev);
+       struct bio_integrity_payload *bip = bio->bi_integrity;
+       struct niobuf_local *lnb;
+       unsigned short sector_size = blk_integrity_interval(bi);
+       void *bio_prot_buf = page_address(bip->bip_vec->bv_page) +
+               bip->bip_vec->bv_offset;
+       struct bio_vec *bv;
+       sector_t sector = bio_start_sector(bio);
+       unsigned int i, sectors, total;
+       __u16 *expected_guard;
+       int rc;
+
+       total = 0;
+       bio_for_each_segment_all(bv, bio, i) {
+               lnb = iobuf->dr_lnbs[index];
+               expected_guard = lnb->lnb_guards;
+               sectors = bv->bv_len / sector_size;
+               if (lnb->lnb_guard_rpc) {
+                       rc = bio_dif_compare(expected_guard, bio_prot_buf,
+                                            sectors, bi->tuple_size);
+                       if (rc)
+                               return rc;
+               }
+
+               sector += sectors;
+               bio_prot_buf += sectors * bi->tuple_size;
+               total += sectors * bi->tuple_size;
+               LASSERT(total <= bip_size(bio->bi_integrity));
+               index++;
+       }
+       return 0;
+}
+
+static int osd_bio_integrity_handle(struct osd_device *osd, struct bio *bio,
+                                   struct osd_iobuf *iobuf,
+                                   int start_page_idx, bool fault_inject,
+                                   bool integrity_enabled)
+{
+       struct super_block *sb = osd_sb(osd);
+       int rc;
+#ifdef HAVE_BIO_INTEGRITY_PREP_FN
+       integrity_gen_fn *generate_fn = NULL;
+       integrity_vrfy_fn *verify_fn = NULL;
+#endif
+
+       ENTRY;
+
+       if (!integrity_enabled)
+               RETURN(0);
+
+#ifdef HAVE_BIO_INTEGRITY_PREP_FN
+       rc = osd_get_integrity_profile(osd, &generate_fn, &verify_fn);
+       if (rc)
+               RETURN(rc);
+
+       rc = bio_integrity_prep_fn(bio, generate_fn, verify_fn);
+#else
+       rc = bio_integrity_prep(bio);
+#endif
+       if (rc)
+               RETURN(rc);
+
+       /* Verify and inject fault only when writing */
+       if (iobuf->dr_rw == 1) {
+               if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_OST_INTEGRITY_CMP))) {
+                       rc = osd_bio_integrity_compare(bio, sb->s_bdev, iobuf,
+                                                      start_page_idx);
+                       if (rc)
+                               RETURN(rc);
+               }
+
+               if (unlikely(fault_inject))
+                       bio_integrity_fault_inject(bio);
+       }
+
+       RETURN(0);
+}
+
+#ifdef HAVE_BIO_INTEGRITY_PREP_FN
+#  ifdef HAVE_BIO_ENDIO_USES_ONE_ARG
+static void dio_integrity_complete_routine(struct bio *bio)
+{
+#  else
+static void dio_integrity_complete_routine(struct bio *bio, int error)
+{
+#  endif
+       struct osd_bio_private *bio_private = bio->bi_private;
+
+       bio->bi_private = bio_private->obp_iobuf;
+#  ifdef HAVE_BIO_ENDIO_USES_ONE_ARG
+       dio_complete_routine(bio);
+#  else
+       dio_complete_routine(bio, error);
+#  endif
+
+       OBD_FREE_PTR(bio_private);
+}
+#endif
+
+static int osd_bio_init(struct bio *bio, struct osd_iobuf *iobuf,
+                       bool integrity_enabled, int start_page_idx,
+                       struct osd_bio_private **pprivate)
+{
+#ifdef HAVE_BIO_INTEGRITY_PREP_FN
+       struct osd_bio_private *bio_private;
+
+       ENTRY;
+
+       *pprivate = NULL;
+       if (integrity_enabled) {
+               OBD_ALLOC_GFP(bio_private, sizeof(*bio_private), GFP_NOIO);
+               if (bio_private == NULL)
+                       RETURN(-ENOMEM);
+               bio->bi_end_io = dio_integrity_complete_routine;
+               bio->bi_private = bio_private;
+               bio_private->obp_start_page_idx = start_page_idx;
+               bio_private->obp_iobuf = iobuf;
+               *pprivate = bio_private;
+       } else {
+               bio->bi_end_io = dio_complete_routine;
+               bio->bi_private = iobuf;
+       }
+       RETURN(0);
+#else
+       ENTRY;
+
+       bio->bi_end_io = dio_complete_routine;
+       bio->bi_private = iobuf;
+       RETURN(0);
+#endif
+}
+
 static int osd_do_bio(struct osd_device *osd, struct inode *inode,
                       struct osd_iobuf *iobuf)
 {
-       int            blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
-       struct page  **pages = iobuf->dr_pages;
-       int            npages = iobuf->dr_npages;
-       sector_t      *blocks = iobuf->dr_blocks;
-       int            total_blocks = npages * blocks_per_page;
-       int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
-       unsigned int   blocksize = inode->i_sb->s_blocksize;
-       struct bio    *bio = NULL;
-       struct page   *page;
-       unsigned int   page_offset;
-       sector_t       sector;
-       int            nblocks;
-       int            block_idx;
-       int            page_idx;
-       int            i;
-       int            rc = 0;
+       int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+       struct page **pages = iobuf->dr_pages;
+       int npages = iobuf->dr_npages;
+       sector_t *blocks = iobuf->dr_blocks;
+       int total_blocks = npages * blocks_per_page;
+       struct super_block *sb = inode->i_sb;
+       int sector_bits = sb->s_blocksize_bits - 9;
+       unsigned int blocksize = sb->s_blocksize;
+       struct block_device *bdev = sb->s_bdev;
+       struct osd_bio_private *bio_private = NULL;
+       struct bio *bio = NULL;
+       int bio_start_page_idx;
+       struct page *page;
+       unsigned int page_offset;
+       sector_t sector;
+       int nblocks;
+       int block_idx;
+       int page_idx;
+       int i;
+       int rc = 0;
+       bool fault_inject;
+       bool integrity_enabled;
        DECLARE_PLUG(plug);
        ENTRY;
 
+       fault_inject = OBD_FAIL_CHECK(OBD_FAIL_OST_INTEGRITY_FAULT);
         LASSERT(iobuf->dr_npages == npages);
 
-        osd_brw_stats_update(osd, iobuf);
-        iobuf->dr_start_time = cfs_time_current();
+       integrity_enabled = bdev_integrity_enabled(bdev, iobuf->dr_rw);
+
+       osd_brw_stats_update(osd, iobuf);
+       iobuf->dr_start_time = ktime_get();
 
        blk_start_plug(&plug);
         for (page_idx = 0, block_idx = 0;
@@ -322,8 +549,7 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode,
                                 continue;       /* added this frag OK */
 
                        if (bio != NULL) {
-                               struct request_queue *q =
-                                       bdev_get_queue(bio->bi_bdev);
+                               struct request_queue *q = bio_get_queue(bio);
                                unsigned int bi_size = bio_sectors(bio) << 9;
 
                                /* Dang! I have to fragment this I/O */
@@ -335,10 +561,19 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode,
                                        bio_phys_segments(q, bio),
                                        queue_max_phys_segments(q),
                                       0, queue_max_hw_segments(q));
+                               rc = osd_bio_integrity_handle(osd, bio,
+                                       iobuf, bio_start_page_idx,
+                                       fault_inject, integrity_enabled);
+                               if (rc) {
+                                       bio_put(bio);
+                                       goto out;
+                               }
+
                                record_start_io(iobuf, bi_size);
                                osd_submit_bio(iobuf->dr_rw, bio);
                        }
 
+                       bio_start_page_idx = page_idx;
                        /* allocate new bio */
                        bio = bio_alloc(GFP_NOIO, min(BIO_MAX_PAGES,
                                                      (npages - page_idx) *
@@ -351,11 +586,19 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode,
                                 goto out;
                         }
 
-                       bio->bi_bdev = inode->i_sb->s_bdev;
+                       bio_set_dev(bio, bdev);
                        bio_set_sector(bio, sector);
+#ifdef HAVE_BI_RW
                        bio->bi_rw = (iobuf->dr_rw == 0) ? READ : WRITE;
-                       bio->bi_end_io = dio_complete_routine;
-                       bio->bi_private = iobuf;
+#else
+                       bio->bi_opf = (iobuf->dr_rw == 0) ? READ : WRITE;
+#endif
+                       rc = osd_bio_init(bio, iobuf, integrity_enabled,
+                                         bio_start_page_idx, &bio_private);
+                       if (rc) {
+                               bio_put(bio);
+                               goto out;
+                       }
 
                        rc = bio_add_page(bio, page,
                                          blocksize * nblocks, page_offset);
@@ -364,6 +607,15 @@ static int osd_do_bio(struct osd_device *osd, struct inode *inode,
        }
 
        if (bio != NULL) {
+               rc = osd_bio_integrity_handle(osd, bio, iobuf,
+                                             bio_start_page_idx,
+                                             fault_inject,
+                                             integrity_enabled);
+               if (rc) {
+                       bio_put(bio);
+                       goto out;
+               }
+
                record_start_io(iobuf, bio_sectors(bio) << 9);
                osd_submit_bio(iobuf->dr_rw, bio);
                rc = 0;
@@ -376,14 +628,19 @@ out:
         * completion here. instead we proceed with transaction commit in
         * parallel and wait for IO completion once transaction is stopped
         * see osd_trans_stop() for more details -bzzz */
-       if (iobuf->dr_rw == 0) {
+       if (iobuf->dr_rw == 0 || fault_inject) {
                wait_event(iobuf->dr_wait,
                           atomic_read(&iobuf->dr_numreqs) == 0);
                osd_fini_iobuf(osd, iobuf);
        }
 
-       if (rc == 0)
+       if (rc == 0) {
                rc = iobuf->dr_error;
+       } else {
+               if (bio_private)
+                       OBD_FREE_PTR(bio_private);
+       }
+
        RETURN(rc);
 }
 
@@ -407,6 +664,8 @@ static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages,
                lnb->lnb_flags = 0;
                lnb->lnb_page = NULL;
                lnb->lnb_rc = 0;
+               lnb->lnb_guard_rpc = 0;
+               lnb->lnb_guard_disk = 0;
 
                 LASSERTF(plen <= len, "plen %u, len %lld\n", plen,
                          (long long) len);
@@ -419,22 +678,54 @@ static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages,
         RETURN(0);
 }
 
-static struct page *osd_get_page(struct dt_object *dt, loff_t offset,
-                                gfp_t gfp_mask)
+static struct page *osd_get_page(const struct lu_env *env, struct dt_object *dt,
+                                loff_t offset, gfp_t gfp_mask)
 {
+       struct osd_thread_info *oti = osd_oti_get(env);
        struct inode *inode = osd_dt_obj(dt)->oo_inode;
        struct osd_device *d = osd_obj2dev(osd_dt_obj(dt));
        struct page *page;
+       int cur = oti->oti_dio_pages_used;
 
         LASSERT(inode);
 
-       page = find_or_create_page(inode->i_mapping, offset >> PAGE_SHIFT,
-                                  gfp_mask);
+       if (osd_use_page_cache(d)) {
+               page = find_or_create_page(inode->i_mapping,
+                                          offset >> PAGE_SHIFT,
+                                          gfp_mask);
+
+               if (likely(page))
+                       LASSERT(!test_bit(PG_private_2, &page->flags));
+               else
+                       lprocfs_counter_add(d->od_stats, LPROC_OSD_NO_PAGE, 1);
+       } else {
+
+               LASSERT(oti->oti_dio_pages);
 
-        if (unlikely(page == NULL))
-                lprocfs_counter_add(d->od_stats, LPROC_OSD_NO_PAGE, 1);
+               if (unlikely(!oti->oti_dio_pages[cur])) {
+                       LASSERT(cur < PTLRPC_MAX_BRW_PAGES);
+                       page = alloc_page(gfp_mask);
+                       if (!page)
+                               return NULL;
+                       oti->oti_dio_pages[cur] = page;
+               }
 
-        return page;
+               page = oti->oti_dio_pages[cur];
+               LASSERT(!test_bit(PG_private_2, &page->flags));
+               set_bit(PG_private_2, &page->flags);
+               oti->oti_dio_pages_used++;
+
+               LASSERT(!PageLocked(page));
+               lock_page(page);
+
+               LASSERT(!page->mapping);
+               LASSERT(!PageWriteback(page));
+               ClearPageUptodate(page);
+
+               page->index = offset >> PAGE_SHIFT;
+       }
+
+       return page;
 }
 
 /*
@@ -471,18 +762,45 @@ static struct page *osd_get_page(struct dt_object *dt, loff_t offset,
 static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt,
                        struct niobuf_local *lnb, int npages)
 {
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct pagevec pvec;
        int i;
 
+#ifdef HAVE_PAGEVEC_INIT_ONE_PARAM
+       pagevec_init(&pvec);
+#else
+       pagevec_init(&pvec, 0);
+#endif
+
        for (i = 0; i < npages; i++) {
-               if (lnb[i].lnb_page == NULL)
+               struct page *page = lnb[i].lnb_page;
+
+               if (page == NULL)
                        continue;
-               LASSERT(PageLocked(lnb[i].lnb_page));
-               unlock_page(lnb[i].lnb_page);
-               put_page(lnb[i].lnb_page);
+               LASSERT(PageLocked(page));
+
+               /* if the page isn't cached, then reset uptodate
+                * to prevent reuse */
+               if (test_bit(PG_private_2, &page->flags)) {
+                       clear_bit(PG_private_2, &page->flags);
+                       ClearPageUptodate(page);
+                       unlock_page(page);
+                       oti->oti_dio_pages_used--;
+               } else {
+                       unlock_page(page);
+                       if (pagevec_add(&pvec, page) == 0)
+                               pagevec_release(&pvec);
+               }
                dt_object_put(env, dt);
+
                lnb[i].lnb_page = NULL;
        }
 
+       LASSERTF(oti->oti_dio_pages_used == 0, "%d\n", oti->oti_dio_pages_used);
+
+       /* Release any partial pagevec */
+       pagevec_release(&pvec);
+
        RETURN(0);
 }
 
@@ -514,19 +832,29 @@ static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
                        loff_t pos, ssize_t len, struct niobuf_local *lnb,
                        enum dt_bufs_type rw)
 {
+       struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_object *obj = osd_dt_obj(dt);
        int npages, i, rc = 0;
        gfp_t gfp_mask;
 
        LASSERT(obj->oo_inode);
 
+       if (!osd_use_page_cache(osd_obj2dev(obj))) {
+               if (unlikely(!oti->oti_dio_pages)) {
+                       OBD_ALLOC(oti->oti_dio_pages,
+                                 sizeof(struct page *) * PTLRPC_MAX_BRW_PAGES);
+                       if (!oti->oti_dio_pages)
+                               return -ENOMEM;
+               }
+       }
+
        osd_map_remote_to_local(pos, len, &npages, lnb);
 
        /* this could also try less hard for DT_BUFS_TYPE_READAHEAD pages */
        gfp_mask = rw & DT_BUFS_TYPE_LOCAL ? (GFP_NOFS | __GFP_HIGHMEM) :
                                             GFP_HIGHUSER;
        for (i = 0; i < npages; i++, lnb++) {
-               lnb->lnb_page = osd_get_page(dt, lnb->lnb_file_offset,
+               lnb->lnb_page = osd_get_page(env, dt, lnb->lnb_file_offset,
                                             gfp_mask);
                if (lnb->lnb_page == NULL)
                        GOTO(cleanup, rc = -ENOMEM);
@@ -760,8 +1088,13 @@ map:
                        if (pblock != 0) {
                                /* unmap any possible underlying metadata from
                                 * the block device mapping.  bug 6998. */
+#ifndef HAVE_CLEAN_BDEV_ALIASES
                                unmap_underlying_metadata(inode->i_sb->s_bdev,
                                                          *(bp->blocks));
+#else
+                               clean_bdev_aliases(inode->i_sb->s_bdev,
+                                                  *(bp->blocks), 1);
+#endif
                        }
                        bp->blocks++;
                        bp->num--;
@@ -950,9 +1283,15 @@ cont_map:
                                         * mapping.  bug 6998. */
                                        if ((map.m_flags & LDISKFS_MAP_NEW) &&
                                            create)
+#ifndef HAVE_CLEAN_BDEV_ALIASES
                                                unmap_underlying_metadata(
                                                        inode->i_sb->s_bdev,
                                                        map.m_pblk + c);
+#else
+                                               clean_bdev_aliases(
+                                                       inode->i_sb->s_bdev,
+                                                       map.m_pblk + c, 1);
+#endif
                                }
                        }
                        rc = 0;
@@ -1022,7 +1361,7 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
                        continue;
 
                if (maxidx >= lnb[i].lnb_page->index) {
-                       osd_iobuf_add_page(iobuf, lnb[i].lnb_page);
+                       osd_iobuf_add_page(iobuf, &lnb[i]);
                } else {
                        long off;
                        char *p = kmap(lnb[i].lnb_page);
@@ -1139,7 +1478,9 @@ static int osd_declare_write_commit(const struct lu_env *env,
                    lnb[i - 1].lnb_file_offset + lnb[i - 1].lnb_len)
                        extents++;
 
-               if (!osd_is_mapped(dt, lnb[i].lnb_file_offset, &extent))
+               if (osd_is_mapped(dt, lnb[i].lnb_file_offset, &extent))
+                       lnb[i].lnb_flags |= OBD_BRW_MAPPED;
+               else
                        quota_space += PAGE_SIZE;
 
                /* ignore quota for the whole request if any page is from
@@ -1218,6 +1559,9 @@ static int osd_declare_write_commit(const struct lu_env *env,
        if (flags & QUOTA_FL_OVER_PRJQUOTA)
                lnb[0].lnb_flags |= OBD_BRW_OVER_PRJQUOTA;
 
+       if (rc == 0)
+               rc = osd_trunc_lock(osd_dt_obj(dt), oh, true);
+
        RETURN(rc);
 }
 
@@ -1232,7 +1576,6 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
         struct osd_device  *osd = osd_obj2dev(osd_dt_obj(dt));
         loff_t isize;
         int rc = 0, i;
-       struct osd_fextent extent = { 0 };
 
         LASSERT(inode);
 
@@ -1245,7 +1588,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
 
         for (i = 0; i < npages; i++) {
                if (lnb[i].lnb_rc == -ENOSPC &&
-                   osd_is_mapped(dt, lnb[i].lnb_file_offset, &extent)) {
+                   (lnb[i].lnb_flags & OBD_BRW_MAPPED)) {
                        /* Allow the write to proceed if overwriting an
                         * existing block */
                        lnb[i].lnb_rc = 0;
@@ -1275,7 +1618,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
 
                SetPageUptodate(lnb[i].lnb_page);
 
-               osd_iobuf_add_page(iobuf, lnb[i].lnb_page);
+               osd_iobuf_add_page(iobuf, &lnb[i]);
         }
 
        osd_trans_exec_op(env, thandle, OSD_OT_WRITE);
@@ -1371,7 +1714,7 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
                        cache_hits++;
                } else {
                        cache_misses++;
-                       osd_iobuf_add_page(iobuf, lnb[i].lnb_page);
+                       osd_iobuf_add_page(iobuf, &lnb[i]);
                }
 
                if (cache == 0)
@@ -1504,8 +1847,7 @@ static inline int osd_extents_enabled(struct super_block *sb,
        if (inode != NULL) {
                if (LDISKFS_I(inode)->i_flags & LDISKFS_EXTENTS_FL)
                        return 1;
-       } else if (LDISKFS_HAS_INCOMPAT_FEATURE(sb,
-                               LDISKFS_FEATURE_INCOMPAT_EXTENTS)) {
+       } else if (ldiskfs_has_feature_extents(sb)) {
                return 1;
        }
        return 0;
@@ -1645,6 +1987,10 @@ out:
                                           i_gid_read(inode),
                                           i_projid_read(inode), 0,
                                           oh, obj, NULL, OSD_QID_BLK);
+
+       if (rc == 0)
+               rc = osd_trunc_lock(obj, oh, true);
+
        RETURN(rc);
 }
 
@@ -1757,7 +2103,7 @@ int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                         const struct lu_buf *buf, loff_t *pos,
-                        struct thandle *handle, int ignore_quota)
+                        struct thandle *handle)
 {
        struct inode            *inode = osd_dt_obj(dt)->oo_inode;
        struct osd_thandle      *oh;
@@ -1825,18 +2171,23 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
                                   i_projid_read(inode), 0, oh, osd_dt_obj(dt),
                                   NULL, OSD_QID_BLK);
+
+       if (rc == 0)
+               rc = osd_trunc_lock(osd_dt_obj(dt), oh, false);
+
        RETURN(rc);
 }
 
 static int osd_punch(const struct lu_env *env, struct dt_object *dt,
                     __u64 start, __u64 end, struct thandle *th)
 {
+       struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_device *osd = osd_obj2dev(obj);
+       struct inode *inode = obj->oo_inode;
+       struct osd_access_lock *al;
        struct osd_thandle *oh;
-       struct osd_object  *obj = osd_dt_obj(dt);
-       struct inode       *inode = obj->oo_inode;
-       handle_t           *h;
-       tid_t               tid;
-       int                rc = 0, rc2 = 0;
+       int rc = 0, found = 0;
+       bool grow = false;
        ENTRY;
 
        LASSERT(end == OBD_OBJECT_EOF);
@@ -1849,49 +2200,51 @@ static int osd_punch(const struct lu_env *env, struct dt_object *dt,
        oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
-       osd_trans_exec_op(env, th, OSD_OT_PUNCH);
+       /* we used to skip truncate to current size to
+        * optimize truncates on OST. with DoM we can
+        * get attr_set to set specific size (MDS_REINT)
+        * and then get truncate RPC which essentially
+        * would be skipped. this is bad.. so, disable
+        * this optimization on MDS till the client stop
+        * to sent MDS_REINT (LU-11033) -bzzz */
+       if (osd->od_is_ost && i_size_read(inode) == start)
+               RETURN(0);
 
-       tid = oh->ot_handle->h_transaction->t_tid;
+       osd_trans_exec_op(env, th, OSD_OT_PUNCH);
 
        spin_lock(&inode->i_lock);
+       if (i_size_read(inode) < start)
+               grow = true;
        i_size_write(inode, start);
        spin_unlock(&inode->i_lock);
        ll_truncate_pagecache(inode, start);
-#ifdef HAVE_INODEOPS_TRUNCATE
-       if (inode->i_op->truncate) {
-               inode->i_op->truncate(inode);
-       } else
-#endif
-               ldiskfs_truncate(inode);
-
-       /*
-        * For a partial-page truncate, flush the page to disk immediately to
-        * avoid data corruption during direct disk write.  b=17397
-        */
-       if ((start & ~PAGE_MASK) != 0)
-                rc = filemap_fdatawrite_range(inode->i_mapping, start, start+1);
 
-        h = journal_current_handle();
-        LASSERT(h != NULL);
-        LASSERT(h == oh->ot_handle);
+       /* optimize grow case */
+       if (grow) {
+               osd_execute_truncate(obj);
+               GOTO(out, rc);
+       }
 
-       /* do not check credits with osd_trans_exec_check() as the truncate
-        * can restart the transaction internally and we restart the
-        * transaction in this case */
+       /* add to orphan list to ensure truncate completion
+        * if this transaction succeed. ldiskfs_truncate()
+        * will take the inode out of the list */
+       rc = ldiskfs_orphan_add(oh->ot_handle, inode);
+       if (rc != 0)
+               GOTO(out, rc);
 
-        if (tid != h->h_transaction->t_tid) {
-                int credits = oh->ot_credits;
-                /*
-                 * transaction has changed during truncate
-                 * we need to restart the handle with our credits
-                 */
-                if (h->h_buffer_credits < credits) {
-                        if (ldiskfs_journal_extend(h, credits))
-                                rc2 = ldiskfs_journal_restart(h, credits);
-                }
-        }
+       list_for_each_entry(al, &oh->ot_trunc_locks, tl_list) {
+               if (obj != al->tl_obj)
+                       continue;
+               LASSERT(al->tl_shared == 0);
+               found = 1;
+               /* do actual truncate in osd_trans_stop() */
+               al->tl_truncate = 1;
+               break;
+       }
+       LASSERT(found);
 
-        RETURN(rc == 0 ? rc2 : rc);
+out:
+       RETURN(rc);
 }
 
 static int fiemap_check_ranges(struct inode *inode,
@@ -1970,8 +2323,8 @@ static int osd_ladvise(const struct lu_env *env, struct dt_object *dt,
                if (end == 0)
                        break;
                invalidate_mapping_pages(inode->i_mapping,
-                                        start >> PAGE_CACHE_SHIFT,
-                                        (end - 1) >> PAGE_CACHE_SHIFT);
+                                        start >> PAGE_SHIFT,
+                                        (end - 1) >> PAGE_SHIFT);
                break;
        default:
                rc = -ENOTSUPP;
@@ -2004,3 +2357,120 @@ const struct dt_body_operations osd_body_ops = {
        .dbo_fiemap_get                 = osd_fiemap_get,
        .dbo_ladvise                    = osd_ladvise,
 };
+
+/**
+ * Get a truncate lock
+ *
+ * In order to take multi-transaction truncate out of main transaction we let
+ * the caller grab a lock on the object passed. the lock can be shared (for
+ * writes) and exclusive (for truncate). It's not allowed to mix truncate
+ * and write in the same transaction handle (do not confuse with big ldiskfs
+ * transaction containing lots of handles).
+ * The lock must be taken at declaration.
+ *
+ * \param obj          object to lock
+ * \oh                 transaction
+ * \shared             shared or exclusive
+ *
+ * \retval 0           lock is granted
+ * \retval -NOMEM      no memory to allocate lock
+ */
+int osd_trunc_lock(struct osd_object *obj, struct osd_thandle *oh, bool shared)
+{
+       struct osd_access_lock *al, *tmp;
+
+       LASSERT(obj);
+       LASSERT(oh);
+
+       list_for_each_entry(tmp, &oh->ot_trunc_locks, tl_list) {
+               if (tmp->tl_obj != obj)
+                       continue;
+               LASSERT(tmp->tl_shared == shared);
+               /* found same lock */
+               return 0;
+       }
+
+       OBD_ALLOC_PTR(al);
+       if (unlikely(al == NULL))
+               return -ENOMEM;
+       al->tl_obj = obj;
+       al->tl_truncate = false;
+       if (shared)
+               down_read(&obj->oo_ext_idx_sem);
+       else
+               down_write(&obj->oo_ext_idx_sem);
+       al->tl_shared = shared;
+
+       list_add(&al->tl_list, &oh->ot_trunc_locks);
+
+       return 0;
+}
+
+void osd_trunc_unlock_all(struct list_head *list)
+{
+       struct osd_access_lock *al, *tmp;
+       list_for_each_entry_safe(al, tmp, list, tl_list) {
+               if (al->tl_shared)
+                       up_read(&al->tl_obj->oo_ext_idx_sem);
+               else
+                       up_write(&al->tl_obj->oo_ext_idx_sem);
+               list_del(&al->tl_list);
+               OBD_FREE_PTR(al);
+       }
+}
+
+void osd_execute_truncate(struct osd_object *obj)
+{
+       struct osd_device *d = osd_obj2dev(obj);
+       struct inode *inode = obj->oo_inode;
+       __u64 size;
+
+       /* simulate crash before (in the middle) of delayed truncate */
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FAIL_AT_TRUNCATE)) {
+               struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
+               struct ldiskfs_sb_info *sbi = LDISKFS_SB(inode->i_sb);
+
+               mutex_lock(&sbi->s_orphan_lock);
+               list_del_init(&ei->i_orphan);
+               mutex_unlock(&sbi->s_orphan_lock);
+               return;
+       }
+
+#ifdef HAVE_INODEOPS_TRUNCATE
+       if (inode->i_op->truncate)
+               inode->i_op->truncate(inode);
+       else
+#endif
+               ldiskfs_truncate(inode);
+
+       /*
+        * For a partial-page truncate, flush the page to disk immediately to
+        * avoid data corruption during direct disk write.  b=17397
+        */
+       size = i_size_read(inode);
+       if ((size & ~PAGE_MASK) == 0)
+               return;
+       if (osd_use_page_cache(d)) {
+               filemap_fdatawrite_range(inode->i_mapping, size, size + 1);
+       } else {
+               /* Notice we use "wait" version to ensure I/O is complete */
+               filemap_write_and_wait_range(inode->i_mapping, size, size + 1);
+               invalidate_mapping_pages(inode->i_mapping, size >> PAGE_SHIFT,
+                                        size >> PAGE_SHIFT);
+       }
+}
+
+void osd_process_truncates(struct list_head *list)
+{
+       struct osd_access_lock *al;
+
+       LASSERT(journal_current_handle() == NULL);
+
+       list_for_each_entry(al, list, tl_list) {
+               if (al->tl_shared)
+                       continue;
+               if (!al->tl_truncate)
+                       continue;
+               osd_execute_truncate(al->tl_obj);
+       }
+}