Whamcloud - gitweb
LU-911 osd: zerocopy methods in ldiskfs osd
authorAlex Zhuravlev <bzzz@whamcloud.com>
Wed, 23 Nov 2011 07:55:07 +0000 (10:55 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 29 Mar 2012 03:18:25 +0000 (23:18 -0400)
add implementation for zerooopy methods to manipulate data:
grab/release buffers, prepare them for read/write, commit
modified buffers, truncate

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: Ied662c7c837bb25b71587b69096bc12fe002d115
Reviewed-on: http://review.whamcloud.com/1834
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <tappro@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/autoconf/lustre-core.m4
lustre/osd-ldiskfs/Makefile.in
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_io.c [new file with mode: 0644]
lustre/osd-ldiskfs/osd_lproc.c

index e167ac0..9b2635c 100644 (file)
@@ -1991,6 +1991,19 @@ LB_LINUX_TRY_COMPILE([
 ])
 
 #
+# LC_EXPORT_GENERIC_ERROR_REMOVE_PAGE
+#
+AC_DEFUN([LC_EXPORT_GENERIC_ERROR_REMOVE_PAGE],
+         [LB_CHECK_SYMBOL_EXPORT(
+                        [generic_error_remove_page],
+                        [mm/truncate.c],
+                        [AC_DEFINE(HAS_GENERIC_ERROR_REMOVE_PAGE, 1,
+                                [kernel export generic_error_remove_page])],
+                        [])
+         ]
+)
+
+#
 # 2.6.36 fs_struct.lock use spinlock instead of rwlock.
 #
 AC_DEFUN([LC_FS_STRUCT_RWLOCK],
@@ -2304,6 +2317,7 @@ AC_DEFUN([LC_PROG_LINUX],
          LC_BLK_QUEUE_MAX_SEGMENTS
          LC_SET_CPUS_ALLOWED
          LC_CACHE_UPCALL
+         LC_EXPORT_GENERIC_ERROR_REMOVE_PAGE
 
          # 2.6.35
          LC_FILE_FSYNC
index b5538d7..63a2098 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := osd_ldiskfs
 osd_ldiskfs-objs := osd_handler.o osd_oi.o osd_igif.o osd_lproc.o osd_iam.o \
-                    osd_iam_lfix.o osd_iam_lvar.o
+                    osd_iam_lfix.o osd_iam_lvar.o osd_io.o
 
 EXTRA_PRE_CFLAGS := -I@LINUX@/fs -I@LDISKFS_DIR@ -I@LDISKFS_DIR@/ldiskfs
 
index 18a3a86..1ff54ba 100644 (file)
@@ -93,7 +93,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR";
 static const struct lu_object_operations      osd_lu_obj_ops;
 static const struct dt_object_operations      osd_obj_ops;
 static const struct dt_object_operations      osd_obj_ea_ops;
-static const struct dt_body_operations        osd_body_ops_new;
 static const struct dt_index_operations       osd_index_iam_ops;
 static const struct dt_index_operations       osd_index_ea_ops;
 
@@ -182,8 +181,8 @@ static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
         oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
 }
 
-static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
-                            int type, uid_t id, struct inode *inode)
+void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+                     int type, uid_t id, struct inode *inode)
 {
 #ifdef CONFIG_QUOTA
         int i, allocated = 0;
@@ -601,10 +600,14 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
                                         struct dt_device *d)
 {
         struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
         struct osd_thandle     *oh;
         struct thandle         *th;
         ENTRY;
 
+        /* on pending IO in this thread should left from prev. request */
+        LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+
         th = ERR_PTR(-ENOMEM);
         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
         if (oh != NULL) {
@@ -716,6 +719,7 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
         int                     rc = 0;
         struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
 
         ENTRY;
 
@@ -762,6 +766,19 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
                 OBD_FREE_PTR(oh);
         }
 
+        /* as we want IO to journal and data IO be concurrent, we don't block
+         * awaiting data IO completion in osd_do_bio(), instead we wait here
+         * once transaction is submitted to the journal. all reqular requests
+         * don't do direct IO (except read/write), thus this wait_event becomes
+         * no-op for them.
+         *
+         * IMPORTANT: we have to wait till any IO submited by the thread is
+         * completed otherwise iobuf may be corrupted by different request
+         */
+        cfs_wait_event(iobuf->dr_wait, cfs_atomic_read(&iobuf->dr_numreqs)==0);
+        if (!rc)
+                rc = iobuf->dr_error;
+
         RETURN(rc);
 }
 
@@ -1476,14 +1493,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         else
                 parent = osd->od_obj_area;
 
-        LASSERT(parent != NULL);
-        LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
-
 #ifdef HAVE_QUOTA_SUPPORT
         osd_push_ctxt(info->oti_env, save);
 #endif
         inode = ldiskfs_create_inode(oth->ot_handle,
-                                     osd_dt_obj(parent)->oo_inode, mode);
+                                     parent ?  osd_dt_obj(parent)->oo_inode :
+                                               osd_sb(osd)->s_root->d_inode,
+                                     mode);
 #ifdef HAVE_QUOTA_SUPPORT
         osd_pop_ctxt(save);
 #endif
@@ -2667,277 +2683,6 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_data_get          = osd_data_get,
 };
 
-/*
- * Body operations.
- */
-
-/*
- * XXX: Another layering violation for now.
- *
- * We don't want to use ->f_op->read methods, because generic file write
- *
- *         - serializes on ->i_sem, and
- *
- *         - does a lot of extra work like balance_dirty_pages(),
- *
- * which doesn't work for globally shared files like /last-received.
- */
-static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
-{
-        struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
-
-        memcpy(buffer, (char*)ei->i_data, buflen);
-
-        return  buflen;
-}
-
-static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
-                            loff_t *offs)
-{
-        struct buffer_head *bh;
-        unsigned long block;
-        int osize = size;
-        int blocksize;
-        int csize;
-        int boffs;
-        int err;
-
-        /* prevent reading after eof */
-        spin_lock(&inode->i_lock);
-        if (i_size_read(inode) < *offs + size) {
-                size = i_size_read(inode) - *offs;
-                spin_unlock(&inode->i_lock);
-                if (size < 0) {
-                        CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
-                               i_size_read(inode), *offs);
-                        return -EBADR;
-                } else if (size == 0) {
-                        return 0;
-                }
-        } else {
-                spin_unlock(&inode->i_lock);
-        }
-
-        blocksize = 1 << inode->i_blkbits;
-
-        while (size > 0) {
-                block = *offs >> inode->i_blkbits;
-                boffs = *offs & (blocksize - 1);
-                csize = min(blocksize - boffs, size);
-                bh = ldiskfs_bread(NULL, inode, block, 0, &err);
-                if (!bh) {
-                        CERROR("can't read block: %d\n", err);
-                        return err;
-                }
-
-                memcpy(buf, bh->b_data + boffs, csize);
-                brelse(bh);
-
-                *offs += csize;
-                buf += csize;
-                size -= csize;
-        }
-        return osize;
-}
-
-static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
-                        struct lu_buf *buf, loff_t *pos,
-                        struct lustre_capa *capa)
-{
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        int rc;
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
-                RETURN(-EACCES);
-
-        /* Read small symlink from inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-            (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
-                rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
-        else
-                rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
-
-        return rc;
-}
-
-static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
-{
-
-        memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
-               buflen);
-        LDISKFS_I(inode)->i_disksize = buflen;
-        i_size_write(inode, buflen);
-        inode->i_sb->s_op->dirty_inode(inode);
-
-        return 0;
-}
-
-static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
-                                    loff_t *offs, handle_t *handle)
-{
-        struct buffer_head *bh = NULL;
-        loff_t offset = *offs;
-        loff_t new_size = i_size_read(inode);
-        unsigned long block;
-        int blocksize = 1 << inode->i_blkbits;
-        int err = 0;
-        int size;
-        int boffs;
-        int dirty_inode = 0;
-
-        while (bufsize > 0) {
-                if (bh != NULL)
-                        brelse(bh);
-
-                block = offset >> inode->i_blkbits;
-                boffs = offset & (blocksize - 1);
-                size = min(blocksize - boffs, bufsize);
-                bh = ldiskfs_bread(handle, inode, block, 1, &err);
-                if (!bh) {
-                        CERROR("can't read/create block: %d\n", err);
-                        break;
-                }
-
-                err = ldiskfs_journal_get_write_access(handle, bh);
-                if (err) {
-                        CERROR("journal_get_write_access() returned error %d\n",
-                               err);
-                        break;
-                }
-                LASSERTF(boffs + size <= bh->b_size,
-                         "boffs %d size %d bh->b_size %lu",
-                         boffs, size, (unsigned long)bh->b_size);
-                memcpy(bh->b_data + boffs, buf, size);
-                err = ldiskfs_journal_dirty_metadata(handle, bh);
-                if (err)
-                        break;
-
-                if (offset + size > new_size)
-                        new_size = offset + size;
-                offset += size;
-                bufsize -= size;
-                buf += size;
-        }
-        if (bh)
-                brelse(bh);
-
-        /* correct in-core and on-disk sizes */
-        if (new_size > i_size_read(inode)) {
-                spin_lock(&inode->i_lock);
-                if (new_size > i_size_read(inode))
-                        i_size_write(inode, new_size);
-                if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
-                        LDISKFS_I(inode)->i_disksize = i_size_read(inode);
-                        dirty_inode = 1;
-                }
-                spin_unlock(&inode->i_lock);
-                if (dirty_inode)
-                        inode->i_sb->s_op->dirty_inode(inode);
-        }
-
-        if (err == 0)
-                *offs = offset;
-        return err;
-}
-
-static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
-                                 const loff_t size, loff_t pos,
-                                 struct thandle *handle)
-{
-        struct osd_thandle *oh;
-        int credits;
-
-        LASSERT(handle != NULL);
-
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle == NULL);
-
-        /* XXX: size == 0 or INT_MAX indicating a catalog header update or
-         *      llog write, see comment in mdd_declare_llog_record().
-         *
-         *      This hack should be removed in 2.3
-         */
-        if (size == DECLARE_LLOG_REWRITE)
-                credits = 2;
-        else if (size == DECLARE_LLOG_WRITE)
-                credits = 6;
-        else
-                credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
-
-        OSD_DECLARE_OP(oh, write);
-        oh->ot_credits += credits;
-
-        if (osd_dt_obj(dt)->oo_inode == NULL)
-                return 0;
-
-        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
-                        osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
-                        osd_dt_obj(dt)->oo_inode);
-        return 0;
-}
-
-static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, loff_t *pos,
-                         struct thandle *handle, struct lustre_capa *capa,
-                         int ignore_quota)
-{
-        struct osd_object  *obj   = osd_dt_obj(dt);
-        struct inode       *inode = obj->oo_inode;
-        struct osd_thandle *oh;
-        ssize_t            result = 0;
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t           save = cfs_curproc_cap_pack();
-#endif
-
-        LASSERT(handle != NULL);
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
-                RETURN(-EACCES);
-
-        oh = container_of(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle->h_transaction != NULL);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (ignore_quota)
-                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-        else
-                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-#endif
-        /* Write small symlink to inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-           (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
-                result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
-        else
-                result = osd_ldiskfs_write_record(inode, buf->lb_buf,
-                                                  buf->lb_len, pos,
-                                                  oh->ot_handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_curproc_cap_unpack(save);
-#endif
-        if (result == 0)
-                result = buf->lb_len;
-        return result;
-}
-
-/*
- * in some cases we may need declare methods for objects being created
- * e.g., when we create symlink
- */
-static const struct dt_body_operations osd_body_ops_new = {
-        .dbo_declare_write = osd_declare_write,
-};
-
-const struct dt_body_operations osd_body_ops = {
-        .dbo_read          = osd_read,
-        .dbo_declare_write = osd_declare_write,
-        .dbo_write         = osd_write
-};
-
 static int osd_index_declare_iam_delete(const struct lu_env *env,
                                         struct dt_object *dt,
                                         const struct dt_key *key,
@@ -4304,6 +4049,11 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
         if (o->od_oi_table != NULL)
                 osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
 
+        if (o->od_fsops) {
+                fsfilt_put_ops(o->od_fsops);
+                o->od_fsops = NULL;
+        }
+
         RETURN(0);
 }
 
@@ -4316,6 +4066,13 @@ static int osd_mount(const struct lu_env *env,
         struct lustre_sb_info    *lsi;
 
         ENTRY;
+
+        o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
+        if (o->od_fsops == NULL) {
+                CERROR("Can't find fsfilt_ldiskfs\n");
+                RETURN(-ENOTSUPP);
+        }
+
         if (o->od_mount != NULL) {
                 CERROR("Already mounted (%s)\n", dev);
                 RETURN(-EEXIST);
index b5c6114..068c9af 100644 (file)
@@ -65,6 +65,8 @@
          jbd2_journal_callback_set(handle, func, jcb)
 #endif
 
+/* fsfilt_{get|put}_ops */
+#include <lustre_fsfilt.h>
 
 /* LUSTRE_OSD_NAME */
 #include <obd.h>
@@ -221,6 +223,16 @@ struct osd_device {
          * It will be initialized, using mount param.
          */
         __u32                     od_iop_mode;
+
+        struct fsfilt_operations *od_fsops;
+
+        unsigned long long        od_readcache_max_filesize;
+        int                       od_read_cache;
+        int                       od_writethrough_cache;
+
+        struct brw_stats          od_brw_stats;
+        cfs_atomic_t              od_r_in_flight;
+        cfs_atomic_t              od_w_in_flight;
 };
 
 #define OSD_TRACK_DECLARES
@@ -311,12 +323,20 @@ enum dt_txn_op {
 
 #ifdef LPROCFS
 enum {
+        LPROC_OSD_READ_BYTES    = 0,
+        LPROC_OSD_WRITE_BYTES   = 1,
+        LPROC_OSD_GET_PAGE      = 2,
+        LPROC_OSD_NO_PAGE       = 3,
+        LPROC_OSD_CACHE_ACCESS  = 4,
+        LPROC_OSD_CACHE_HIT     = 5,
+        LPROC_OSD_CACHE_MISS    = 6,
+
 #if OSD_THANDLE_STATS
         LPROC_OSD_THANDLE_STARTING,
         LPROC_OSD_THANDLE_OPEN,
         LPROC_OSD_THANDLE_CLOSING,
 #endif
-        LPROC_OSD_NR
+        LPROC_OSD_LAST,
 };
 #endif
 
@@ -375,6 +395,25 @@ struct osd_it_iam {
         struct iam_iterator    oi_it;
 };
 
+#define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512)
+
+struct osd_iobuf {
+        cfs_waitq_t        dr_wait;
+        cfs_atomic_t       dr_numreqs;  /* number of reqs being processed */
+        int                dr_max_pages;
+        int                dr_npages;
+        int                dr_error;
+        int                dr_frags;
+        unsigned int       dr_ignore_quota:1;
+        unsigned int       dr_elapsed_valid:1; /* we really did count time */
+        unsigned int       dr_rw:1;
+        struct page       *dr_pages[PTLRPC_MAX_BRW_PAGES];
+        unsigned long      dr_blocks[PTLRPC_MAX_BRW_PAGES*MAX_BLOCKS_PER_PAGE];
+        unsigned long      dr_start_time;
+        unsigned long      dr_elapsed;  /* how long io took */
+        struct osd_device *dr_dev;
+};
+
 struct osd_thread_info {
         const struct lu_env   *oti_env;
         /**
@@ -446,6 +485,10 @@ struct osd_thread_info {
         struct lu_buf          oti_buf;
         /** used in osd_ea_fid_set() to set fid into common ea */
         struct lustre_mdt_attrs oti_mdt_attrs;
+        /** 0-copy IO */
+        struct osd_iobuf       oti_iobuf;
+        struct inode           oti_inode;
+        int                    oti_created[PTLRPC_MAX_BRW_PAGES];
 #ifdef HAVE_QUOTA_SUPPORT
         struct osd_ctxt        oti_ctxt;
 #endif
@@ -465,11 +508,17 @@ int osd_procfs_fini(struct osd_device *osd);
 void osd_lprocfs_time_start(const struct lu_env *env);
 void osd_lprocfs_time_end(const struct lu_env *env,
                           struct osd_device *osd, int op);
+void osd_brw_stats_update(struct osd_device *osd, struct osd_iobuf *iobuf);
+
 #endif
 int osd_statfs(const struct lu_env *env, struct dt_device *dev,
                cfs_kstatfs_t *sfs);
 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
                     struct lustre_capa *capa, __u64 opc);
+void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+                     int type, uid_t id, struct inode *inode);
+int generic_error_remove_page(struct address_space *mapping,
+                                     struct page *page);
 
 /*
  * Invariants, assertions.
@@ -591,5 +640,7 @@ static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
         return lu_context_key_get(&env->le_ctx, &osd_key);
 }
 
+extern const struct dt_body_operations osd_body_ops_new;
+
 #endif /* __KERNEL__ */
 #endif /* _OSD_INTERNAL_H */
diff --git a/lustre/osd-ldiskfs/osd_io.c b/lustre/osd-ldiskfs/osd_io.c
new file mode 100644 (file)
index 0000000..08f26ab
--- /dev/null
@@ -0,0 +1,1219 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/osd/osd_io.c
+ *
+ * body operations
+ *
+ * Author: Nikita Danilov <nikita@clusterfs.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ *
+ */
+
+/* LUSTRE_VERSION_CODE */
+#include <lustre_ver.h>
+/* prerequisite for linux/xattr.h */
+#include <linux/types.h>
+/* prerequisite for linux/xattr.h */
+#include <linux/fs.h>
+
+/* ext_depth() */
+#include <ldiskfs/ldiskfs.h>
+#include <ldiskfs/ldiskfs_jbd2.h>
+#include <ldiskfs/ldiskfs_extents.h>
+
+/*
+ * struct OBD_{ALLOC,FREE}*()
+ * OBD_FAIL_CHECK
+ */
+#include <obd_support.h>
+
+#include "osd_internal.h"
+
+#ifndef HAVE_PAGE_CONSTANT
+#define mapping_cap_page_constant_write(mapping) 0
+#define SetPageConstant(page) do {} while (0)
+#define ClearPageConstant(page) do {} while (0)
+#endif
+
+#ifndef HAS_GENERIC_ERROR_REMOVE_PAGE
+int generic_error_remove_page(struct address_space *mapping, struct page *page)
+{
+        if (mapping == NULL)
+                return -EINVAL;
+
+        if (mapping != page->mapping)
+                return -EIO;
+        /*
+         * Only punch for normal data pages for now.
+         * Handling other types like directories would need more auditing.
+         */
+        if (!S_ISREG(mapping->host->i_mode))
+                return -EIO;
+
+        if (page_mapped(page)) {
+                unmap_mapping_range(mapping,
+                                    (loff_t)page->index << PAGE_CACHE_SHIFT,
+                                    PAGE_CACHE_SIZE, 0);
+        }
+        truncate_complete_page(mapping, page);
+        return 0;
+}
+#endif
+
+static void osd_init_iobuf(struct osd_device *d, struct osd_iobuf *iobuf,int rw)
+{
+        cfs_waitq_init(&iobuf->dr_wait);
+        cfs_atomic_set(&iobuf->dr_numreqs, 0);
+        iobuf->dr_max_pages = PTLRPC_MAX_BRW_PAGES;
+        iobuf->dr_npages = 0;
+        iobuf->dr_error = 0;
+        iobuf->dr_dev = d;
+        iobuf->dr_frags = 0;
+        iobuf->dr_elapsed = 0;
+        /* must be counted before, so assert */
+        LASSERT(iobuf->dr_elapsed_valid == 0);
+        iobuf->dr_rw = rw;
+}
+
+static void osd_iobuf_add_page(struct osd_iobuf *iobuf, struct page *page)
+{
+        LASSERT(iobuf->dr_npages < iobuf->dr_max_pages);
+        iobuf->dr_pages[iobuf->dr_npages++] = page;
+}
+
+void osd_fini_iobuf(struct osd_device *d, struct osd_iobuf *iobuf)
+{
+        int rw = iobuf->dr_rw;
+
+        if (iobuf->dr_elapsed_valid) {
+                iobuf->dr_elapsed_valid = 0;
+                LASSERT(iobuf->dr_dev == d);
+                LASSERT(iobuf->dr_frags > 0);
+                lprocfs_oh_tally(&d->od_brw_stats.
+                                 hist[BRW_R_DIO_FRAGS+rw],
+                                 iobuf->dr_frags);
+                lprocfs_oh_tally_log2(&d->od_brw_stats.hist[BRW_R_IO_TIME+rw],
+                                      iobuf->dr_elapsed);
+        }
+}
+
+#ifdef HAVE_BIO_ENDIO_2ARG
+#define DIO_RETURN(a)
+static void dio_complete_routine(struct bio *bio, int error)
+#else
+#define DIO_RETURN(a)   return(a)
+static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
+#endif
+{
+        struct osd_iobuf *iobuf = bio->bi_private;
+        struct bio_vec *bvl;
+        int i;
+
+        /* CAVEAT EMPTOR: possibly in IRQ context
+         * DO NOT record procfs stats here!!! */
+
+        if (unlikely(iobuf == NULL)) {
+                CERROR("***** bio->bi_private is NULL!  This should never "
+                       "happen.  Normally, I would crash here, but instead I "
+                       "will dump the bio contents to the console.  Please "
+                       "report this to <http://jira.whamcloud.com/> , along "
+                       "with any interesting messages leading up to this point "
+                       "(like SCSI errors, perhaps).  Because bi_private is "
+                       "NULL, I can't wake up the thread that initiated this "
+                       "IO - you will probably have to reboot this node.\n");
+                CERROR("bi_next: %p, bi_flags: %lx, bi_rw: %lu, bi_vcnt: %d, "
+                       "bi_idx: %d, bi->size: %d, bi_end_io: %p, bi_cnt: %d, "
+                       "bi_private: %p\n", bio->bi_next, bio->bi_flags,
+                       bio->bi_rw, bio->bi_vcnt, bio->bi_idx, bio->bi_size,
+                       bio->bi_end_io, cfs_atomic_read(&bio->bi_cnt),
+                       bio->bi_private);
+                DIO_RETURN(0);
+        }
+
+        /* the check is outside of the cycle for performance reason -bzzz */
+        if (!cfs_test_bit(BIO_RW, &bio->bi_rw)) {
+                bio_for_each_segment(bvl, bio, i) {
+                        if (likely(error == 0))
+                                SetPageUptodate(bvl->bv_page);
+                        LASSERT(PageLocked(bvl->bv_page));
+                        ClearPageConstant(bvl->bv_page);
+                }
+                cfs_atomic_dec(&iobuf->dr_dev->od_r_in_flight);
+        } else {
+                struct page *p = iobuf->dr_pages[0];
+                if (p->mapping) {
+                        if (mapping_cap_page_constant_write(p->mapping)) {
+                                bio_for_each_segment(bvl, bio, i) {
+                                        ClearPageConstant(bvl->bv_page);
+                                }
+                        }
+                }
+                cfs_atomic_dec(&iobuf->dr_dev->od_w_in_flight);
+        }
+
+        /* any real error is good enough -bzzz */
+        if (error != 0 && iobuf->dr_error == 0)
+                iobuf->dr_error = error;
+
+        if (cfs_atomic_dec_and_test(&iobuf->dr_numreqs)) {
+                iobuf->dr_elapsed = jiffies - iobuf->dr_start_time;
+                iobuf->dr_elapsed_valid = 1;
+                cfs_waitq_signal(&iobuf->dr_wait);
+        }
+
+        /* Completed bios used to be chained off iobuf->dr_bios and freed in
+         * filter_clear_dreq().  It was then possible to exhaust the biovec-256
+         * mempool when serious on-disk fragmentation was encountered,
+         * deadlocking the OST.  The bios are now released as soon as complete
+         * so the pool cannot be exhausted while IOs are competing. bug 10076 */
+        bio_put(bio);
+        DIO_RETURN(0);
+}
+
+static void record_start_io(struct osd_iobuf *iobuf, int size)
+{
+        struct osd_device    *osd = iobuf->dr_dev;
+        struct obd_histogram *h = osd->od_brw_stats.hist;
+
+        iobuf->dr_frags++;
+        cfs_atomic_inc(&iobuf->dr_numreqs);
+
+        if (iobuf->dr_rw == 0) {
+                cfs_atomic_inc(&osd->od_r_in_flight);
+                lprocfs_oh_tally(&h[BRW_R_RPC_HIST],
+                                 cfs_atomic_read(&osd->od_r_in_flight));
+                lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE], size);
+        } else if (iobuf->dr_rw == 1) {
+                cfs_atomic_inc(&osd->od_w_in_flight);
+                lprocfs_oh_tally(&h[BRW_W_RPC_HIST],
+                                 cfs_atomic_read(&osd->od_w_in_flight));
+                lprocfs_oh_tally_log2(&h[BRW_W_DISK_IOSIZE], size);
+        } else {
+                LBUG();
+        }
+}
+
+static void osd_submit_bio(int rw, struct bio *bio)
+{
+        LASSERTF(rw == 0 || rw == 1, "%x\n", rw);
+        if (rw == 0)
+                submit_bio(READ, bio);
+        else
+                submit_bio(WRITE, bio);
+}
+
+static int can_be_merged(struct bio *bio, sector_t sector)
+{
+        unsigned int size;
+
+        if (!bio)
+                return 0;
+
+        size = bio->bi_size >> 9;
+        return bio->bi_sector + size == sector ? 1 : 0;
+}
+
+static int osd_do_bio(struct osd_device *osd, struct inode *inode,
+                      struct osd_iobuf *iobuf)
+{
+        int            blocks_per_page = CFS_PAGE_SIZE >> inode->i_blkbits;
+        struct page  **pages = iobuf->dr_pages;
+        int            npages = iobuf->dr_npages;
+        unsigned long *blocks = iobuf->dr_blocks;
+        int            total_blocks = npages * blocks_per_page;
+        int            sector_bits = inode->i_sb->s_blocksize_bits - 9;
+        unsigned int   blocksize = inode->i_sb->s_blocksize;
+        struct bio    *bio = NULL;
+        struct page   *page;
+        unsigned int   page_offset;
+        sector_t       sector;
+        int            nblocks;
+        int            block_idx;
+        int            page_idx;
+        int            i;
+        int            rc = 0;
+        ENTRY;
+
+        LASSERT(iobuf->dr_npages == npages);
+
+        osd_brw_stats_update(osd, iobuf);
+        iobuf->dr_start_time = cfs_time_current();
+
+        for (page_idx = 0, block_idx = 0;
+             page_idx < npages;
+             page_idx++, block_idx += blocks_per_page) {
+
+                page = pages[page_idx];
+                LASSERT(block_idx + blocks_per_page <= total_blocks);
+
+                for (i = 0, page_offset = 0;
+                     i < blocks_per_page;
+                     i += nblocks, page_offset += blocksize * nblocks) {
+
+                        nblocks = 1;
+
+                        if (blocks[block_idx + i] == 0) {  /* hole */
+                                LASSERTF(iobuf->dr_rw == 0,
+                                         "page_idx %u, block_idx %u, i %u\n",
+                                         page_idx, block_idx, i);
+                                memset(kmap(page) + page_offset, 0, blocksize);
+                                kunmap(page);
+                                continue;
+                        }
+
+                        sector = (sector_t)blocks[block_idx + i] << sector_bits;
+
+                        /* Additional contiguous file blocks? */
+                        while (i + nblocks < blocks_per_page &&
+                               (sector + (nblocks << sector_bits)) ==
+                               ((sector_t)blocks[block_idx + i + nblocks] <<
+                                sector_bits))
+                                nblocks++;
+
+                        /* I only set the page to be constant only if it
+                         * is mapped to a contiguous underlying disk block(s).
+                         * It will then make sure the corresponding device
+                         * cache of raid5 will be overwritten by this page.
+                         * - jay */
+                        if (iobuf->dr_rw && (nblocks == blocks_per_page) &&
+                            mapping_cap_page_constant_write(inode->i_mapping))
+                                SetPageConstant(page);
+
+                        if (bio != NULL &&
+                            can_be_merged(bio, sector) &&
+                            bio_add_page(bio, page,
+                                         blocksize * nblocks, page_offset) != 0)
+                                continue;       /* added this frag OK */
+
+                        if (bio != NULL) {
+                                struct request_queue *q =
+                                        bdev_get_queue(bio->bi_bdev);
+
+                                /* Dang! I have to fragment this I/O */
+                                CDEBUG(D_INODE, "bio++ sz %d vcnt %d(%d) "
+                                       "sectors %d(%d) psg %d(%d) hsg %d(%d)\n",
+                                       bio->bi_size,
+                                       bio->bi_vcnt, bio->bi_max_vecs,
+                                       bio->bi_size >> 9, queue_max_sectors(q),
+                                       bio_phys_segments(q, bio),
+                                       queue_max_phys_segments(q),
+                                       bio_hw_segments(q, bio),
+                                       queue_max_hw_segments(q));
+
+                                record_start_io(iobuf, bio->bi_size);
+                                osd_submit_bio(iobuf->dr_rw, bio);
+                        }
+
+                        /* allocate new bio, limited by max BIO size, b=9945 */
+                        bio = bio_alloc(GFP_NOIO, max(BIO_MAX_PAGES,
+                                                      (npages - page_idx) *
+                                                      blocks_per_page));
+                        if (bio == NULL) {
+                                CERROR("Can't allocate bio %u*%u = %u pages\n",
+                                       (npages - page_idx), blocks_per_page,
+                                       (npages - page_idx) * blocks_per_page);
+                                rc = -ENOMEM;
+                                goto out;
+                        }
+
+                        bio->bi_bdev = inode->i_sb->s_bdev;
+                        bio->bi_sector = sector;
+                        bio->bi_end_io = dio_complete_routine;
+                        bio->bi_private = iobuf;
+
+                        rc = bio_add_page(bio, page,
+                                          blocksize * nblocks, page_offset);
+                        LASSERT(rc != 0);
+                }
+        }
+
+        if (bio != NULL) {
+                record_start_io(iobuf, bio->bi_size);
+                osd_submit_bio(iobuf->dr_rw, bio);
+                rc = 0;
+        }
+
+ out:
+        /* in order to achieve better IO throughput, we don't wait for writes
+         * completion here. instead we proceed with transaction commit in
+         * parallel and wait for IO completion once transaction is stopped
+         * see osd_trans_stop() for more details -bzzz */
+        if (iobuf->dr_rw == 0) {
+                cfs_wait_event(iobuf->dr_wait,
+                               cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+        }
+
+        if (rc == 0)
+                rc = iobuf->dr_error;
+        RETURN(rc);
+}
+
+static int osd_map_remote_to_local(loff_t offset, ssize_t len, int *nrpages,
+                                   struct niobuf_local *lnb)
+{
+        ENTRY;
+
+        *nrpages = 0;
+
+        while (len > 0) {
+                int poff = offset & (CFS_PAGE_SIZE - 1);
+                int plen = CFS_PAGE_SIZE - poff;
+
+                if (plen > len)
+                        plen = len;
+                lnb->offset = offset;
+                /* lnb->lnb_page_offset = poff; */
+                lnb->len = plen;
+                /* lb->flags = rnb->flags; */
+                lnb->flags = 0;
+                lnb->page = NULL;
+                lnb->rc = 0;
+
+                LASSERTF(plen <= len, "plen %u, len %lld\n", plen,
+                         (long long) len);
+                offset += plen;
+                len -= plen;
+                lnb++;
+                (*nrpages)++;
+        }
+
+        RETURN(0);
+}
+
+struct page *osd_get_page(struct dt_object *dt, loff_t offset, int rw)
+{
+        struct inode      *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_device *d = osd_obj2dev(osd_dt_obj(dt));
+        struct page       *page;
+
+        LASSERT(inode);
+
+        page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT,
+                                   GFP_NOFS | __GFP_HIGHMEM);
+        if (unlikely(page == NULL))
+                lprocfs_counter_add(d->od_stats, LPROC_OSD_NO_PAGE, 1);
+
+        return page;
+}
+
+/*
+ * there are following "locks":
+ * journal_start
+ * i_alloc_sem
+ * i_mutex
+ * page lock
+
+ * osd write path
+    * lock page(s)
+    * journal_start
+    * truncate_sem
+
+ * ext4 vmtruncate:
+    * lock pages, unlock
+    * journal_start
+    * lock partial page
+    * i_data_sem
+
+*/
+int osd_bufs_get(const struct lu_env *env, struct dt_object *d, loff_t pos,
+                 ssize_t len, struct niobuf_local *lnb, int rw,
+                 struct lustre_capa *capa)
+{
+        struct osd_object   *obj    = osd_dt_obj(d);
+        int npages, i, rc = 0;
+
+        LASSERT(obj->oo_inode);
+
+        osd_map_remote_to_local(pos, len, &npages, lnb);
+
+        for (i = 0; i < npages; i++, lnb++) {
+
+                /* We still set up for ungranted pages so that granted pages
+                 * can be written to disk as they were promised, and portals
+                 * needs to keep the pages all aligned properly. */
+                lnb->dentry = (void *) obj;
+
+                lnb->page = osd_get_page(d, lnb->offset, rw);
+                if (lnb->page == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+
+                /* DLM locking protects us from write and truncate competing
+                 * for same region, but truncate can leave dirty page in the
+                 * cache. it's possible the writeout on a such a page is in
+                 * progress when we access it. it's also possible that during
+                 * this writeout we put new (partial) data, but then won't
+                 * be able to proceed in filter_commitrw_write(). thus let's
+                 * just wait for writeout completion, should be rare enough.
+                 * -bzzz */
+                wait_on_page_writeback(lnb->page);
+                BUG_ON(PageWriteback(lnb->page));
+
+                lu_object_get(&d->do_lu);
+        }
+        rc = i;
+
+cleanup:
+        RETURN(rc);
+}
+
+static int osd_bufs_put(const struct lu_env *env, struct dt_object *dt,
+                        struct niobuf_local *lnb, int npages)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
+        struct osd_device      *d = osd_obj2dev(osd_dt_obj(dt));
+        int                     i;
+
+        /* to do IO stats, notice we do this here because
+         * osd_do_bio() doesn't wait for write to complete */
+        osd_fini_iobuf(d, iobuf);
+
+        for (i = 0; i < npages; i++) {
+                if (lnb[i].page == NULL)
+                        continue;
+                LASSERT(PageLocked(lnb[i].page));
+                unlock_page(lnb[i].page);
+                page_cache_release(lnb[i].page);
+                lu_object_put(env, &dt->do_lu);
+                lnb[i].page = NULL;
+        }
+        RETURN(0);
+}
+
+static int osd_write_prep(const struct lu_env *env, struct dt_object *dt,
+                          struct niobuf_local *lnb, int npages)
+{
+        struct osd_thread_info *oti   = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
+        struct inode           *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_device      *osd   = osd_obj2dev(osd_dt_obj(dt));
+        struct timeval          start;
+        struct timeval          end;
+        unsigned long           timediff;
+        ssize_t                 isize;
+        __s64                   maxidx;
+        int                     rc = 0;
+        int                     i;
+        int                     cache = 0;
+
+        LASSERT(inode);
+
+        osd_init_iobuf(osd, iobuf, 0);
+
+        isize = i_size_read(inode);
+        maxidx = ((isize + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1;
+
+        if (osd->od_writethrough_cache)
+                cache = 1;
+        if (isize > osd->od_readcache_max_filesize)
+                cache = 0;
+
+        cfs_gettimeofday(&start);
+        for (i = 0; i < npages; i++) {
+
+                if (cache == 0)
+                        generic_error_remove_page(inode->i_mapping,
+                                                  lnb[i].page);
+
+                /*
+                 * till commit the content of the page is undefined
+                 * we'll set it uptodate once bulk is done. otherwise
+                 * subsequent reads can access non-stable data
+                 */
+                ClearPageUptodate(lnb[i].page);
+
+                if (lnb[i].len == CFS_PAGE_SIZE)
+                        continue;
+
+                if (maxidx >= lnb[i].page->index) {
+                        osd_iobuf_add_page(iobuf, lnb[i].page);
+                } else {
+                        long off;
+                        char *p = kmap(lnb[i].page);
+
+                        off = lnb[i].offset;
+                        if (off)
+                                memset(p, 0, off);
+                        off = lnb[i].offset + lnb[i].len;
+                        off &= ~CFS_PAGE_MASK;
+                        if (off)
+                                memset(p + off, 0, CFS_PAGE_SIZE - off);
+                        kunmap(lnb[i].page);
+                }
+        }
+        cfs_gettimeofday(&end);
+        timediff = cfs_timeval_sub(&end, &start, NULL);
+        lprocfs_counter_add(osd->od_stats, LPROC_OSD_GET_PAGE, timediff);
+
+        if (iobuf->dr_npages) {
+                rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages,
+                                                       iobuf->dr_npages,
+                                                       iobuf->dr_blocks,
+                                                       oti->oti_created,
+                                                       0, NULL);
+                if (likely(rc == 0)) {
+                        rc = osd_do_bio(osd, inode, iobuf);
+                        /* do IO stats for preparation reads */
+                        osd_fini_iobuf(osd, iobuf);
+                }
+        }
+        RETURN(rc);
+}
+
+static int osd_declare_write_commit(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct niobuf_local *lnb, int npages,
+                                    struct thandle *handle)
+{
+        const struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
+        struct inode            *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_thandle      *oh;
+        int                      extents = 1;
+        int                      depth;
+        int                      i;
+        int                      newblocks;
+        int                      old;
+
+        LASSERT(handle != NULL);
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        old = oh->ot_credits;
+        newblocks = npages;
+
+        /* calculate number of extents (probably better to pass nb) */
+        for (i = 1; i < npages; i++)
+                if (lnb[i].offset !=
+                    lnb[i - 1].offset + lnb[i - 1].len)
+                        extents++;
+
+        /*
+         * each extent can go into new leaf causing a split
+         * 5 is max tree depth: inode + 4 index blocks
+         * with blockmaps, depth is 3 at most
+         */
+        if (LDISKFS_I(inode)->i_flags & LDISKFS_EXTENTS_FL) {
+                /*
+                 * many concurrent threads may grow tree by the time
+                 * our transaction starts. so, consider 2 is a min depth
+                 */
+                depth = ext_depth(inode);
+                depth = max(depth, 1) + 1;
+                newblocks += depth;
+                oh->ot_credits++; /* inode */
+                oh->ot_credits += depth * 2 * extents;
+        } else {
+                depth = 3;
+                newblocks += depth;
+                oh->ot_credits++; /* inode */
+                oh->ot_credits += depth * extents;
+        }
+
+        /* each new block can go in different group (bitmap + gd) */
+
+        /* we can't dirty more bitmap blocks than exist */
+        if (newblocks > LDISKFS_SB(osd_sb(osd))->s_groups_count)
+                oh->ot_credits += LDISKFS_SB(osd_sb(osd))->s_groups_count;
+        else
+                oh->ot_credits += newblocks;
+
+        /* we can't dirty more gd blocks than exist */
+        if (newblocks > LDISKFS_SB(osd_sb(osd))->s_gdb_count)
+                oh->ot_credits += LDISKFS_SB(osd_sb(osd))->s_gdb_count;
+        else
+                oh->ot_credits += newblocks;
+
+        RETURN(0);
+}
+
+/* Check if a block is allocated or not */
+static int osd_is_mapped(struct inode *inode, obd_size offset)
+{
+        sector_t (*fs_bmap)(struct address_space *, sector_t);
+
+        fs_bmap = inode->i_mapping->a_ops->bmap;
+
+        /* We can't know if we are overwriting or not */
+        if (fs_bmap == NULL)
+                return 0;
+
+        if (fs_bmap(inode->i_mapping, offset >> inode->i_blkbits) == 0)
+                return 0;
+
+        return 1;
+}
+
+static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
+                            struct niobuf_local *lnb, int npages,
+                            struct thandle *thandle)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf *iobuf = &oti->oti_iobuf;
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_device  *osd = osd_obj2dev(osd_dt_obj(dt));
+        loff_t isize;
+        int rc = 0, i;
+
+        LASSERT(inode);
+
+        osd_init_iobuf(osd, iobuf, 1);
+        isize = i_size_read(inode);
+
+        for (i = 0; i < npages; i++) {
+                if (lnb[i].rc == -ENOSPC &&
+                    osd_is_mapped(inode, lnb[i].offset)) {
+                        /* Allow the write to proceed if overwriting an
+                         * existing block */
+                        lnb[i].rc = 0;
+                }
+
+                if (lnb[i].rc) { /* ENOSPC, network RPC error, etc. */
+                        CDEBUG(D_INODE, "Skipping [%d] == %d\n", i,
+                               lnb[i].rc);
+                        LASSERT(lnb[i].page);
+                        generic_error_remove_page(inode->i_mapping,lnb[i].page);
+                        continue;
+                }
+
+                LASSERT(PageLocked(lnb[i].page));
+                LASSERT(!PageWriteback(lnb[i].page));
+
+                if (lnb[i].offset + lnb[i].len > isize)
+                        isize = lnb[i].offset + lnb[i].len;
+
+                /*
+                 * Since write and truncate are serialized by oo_sem, even
+                 * partial-page truncate should not leave dirty pages in the
+                 * page cache.
+                 */
+                LASSERT(!PageDirty(lnb[i].page));
+
+                SetPageUptodate(lnb[i].page);
+
+                osd_iobuf_add_page(iobuf, lnb[i].page);
+        }
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC)) {
+                rc = -ENOSPC;
+        } else if (iobuf->dr_npages > 0) {
+                rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages,
+                                                       iobuf->dr_npages,
+                                                       iobuf->dr_blocks,
+                                                       oti->oti_created,
+                                                       1, NULL);
+        } else {
+                /* no pages to write, no transno is needed */
+                thandle->th_local = 1;
+        }
+
+        if (likely(rc == 0)) {
+                if (isize > i_size_read(inode)) {
+                        i_size_write(inode, isize);
+                        LDISKFS_I(inode)->i_disksize = isize;
+                        inode->i_sb->s_op->dirty_inode(inode);
+                }
+
+                rc = osd_do_bio(osd, inode, iobuf);
+                /* we don't do stats here as in read path because
+                 * write is async: we'll do this in osd_put_bufs() */
+        }
+
+        if (unlikely(rc != 0)) {
+                /* if write fails, we should drop pages from the cache */
+                for (i = 0; i < npages; i++) {
+                        if (lnb[i].page == NULL)
+                                continue;
+                        LASSERT(PageLocked(lnb[i].page));
+                        generic_error_remove_page(inode->i_mapping,lnb[i].page);
+                }
+        }
+
+        RETURN(rc);
+}
+
+static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
+                         struct niobuf_local *lnb, int npages)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf *iobuf = &oti->oti_iobuf;
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
+        struct timeval start, end;
+        unsigned long timediff;
+        int rc = 0, i, m = 0, cache = 0;
+
+        LASSERT(inode);
+
+        osd_init_iobuf(osd, iobuf, 0);
+
+        if (osd->od_read_cache)
+                cache = 1;
+        if (i_size_read(inode) > osd->od_readcache_max_filesize)
+                cache = 0;
+
+        cfs_gettimeofday(&start);
+        for (i = 0; i < npages; i++) {
+
+                if (i_size_read(inode) <= lnb[i].offset)
+                        /* If there's no more data, abort early.
+                         * lnb->rc == 0, so it's easy to detect later. */
+                        break;
+
+                if (i_size_read(inode) <
+                    lnb[i].offset + lnb[i].len - 1)
+                        lnb[i].rc = i_size_read(inode) - lnb[i].offset;
+                else
+                        lnb[i].rc = lnb[i].len;
+                m += lnb[i].len;
+
+                lprocfs_counter_add(osd->od_stats, LPROC_OSD_CACHE_ACCESS, 1);
+                if (PageUptodate(lnb[i].page)) {
+                        lprocfs_counter_add(osd->od_stats,
+                                            LPROC_OSD_CACHE_HIT, 1);
+                } else {
+                        lprocfs_counter_add(osd->od_stats,
+                                            LPROC_OSD_CACHE_MISS, 1);
+                        osd_iobuf_add_page(iobuf, lnb[i].page);
+                }
+                if (cache == 0)
+                        generic_error_remove_page(inode->i_mapping,lnb[i].page);
+        }
+        cfs_gettimeofday(&end);
+        timediff = cfs_timeval_sub(&end, &start, NULL);
+        lprocfs_counter_add(osd->od_stats, LPROC_OSD_GET_PAGE, timediff);
+
+        if (iobuf->dr_npages) {
+                rc = osd->od_fsops->fs_map_inode_pages(inode, iobuf->dr_pages,
+                                                       iobuf->dr_npages,
+                                                       iobuf->dr_blocks,
+                                                       oti->oti_created,
+                                                       0, NULL);
+                rc = osd_do_bio(osd, inode, iobuf);
+
+                /* IO stats will be done in osd_bufs_put() */
+        }
+
+        RETURN(rc);
+}
+
+/*
+ * XXX: Another layering violation for now.
+ *
+ * We don't want to use ->f_op->read methods, because generic file write
+ *
+ *         - serializes on ->i_sem, and
+ *
+ *         - does a lot of extra work like balance_dirty_pages(),
+ *
+ * which doesn't work for globally shared files like /last_rcvd.
+ */
+static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
+{
+        struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
+
+        memcpy(buffer, (char *)ei->i_data, buflen);
+
+        return  buflen;
+}
+
+static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
+                            loff_t *offs)
+{
+        struct buffer_head *bh;
+        unsigned long block;
+        int osize = size;
+        int blocksize;
+        int csize;
+        int boffs;
+        int err;
+
+        /* prevent reading after eof */
+        cfs_spin_lock(&inode->i_lock);
+        if (i_size_read(inode) < *offs + size) {
+                size = i_size_read(inode) - *offs;
+                cfs_spin_unlock(&inode->i_lock);
+                if (size < 0) {
+                        CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
+                               i_size_read(inode), *offs);
+                        return -EBADR;
+                } else if (size == 0) {
+                        return 0;
+                }
+        } else {
+                cfs_spin_unlock(&inode->i_lock);
+        }
+
+        blocksize = 1 << inode->i_blkbits;
+        while (size > 0) {
+                block = *offs >> inode->i_blkbits;
+                boffs = *offs & (blocksize - 1);
+                csize = min(blocksize - boffs, size);
+                bh = ldiskfs_bread(NULL, inode, block, 0, &err);
+                if (!bh) {
+                        CERROR("%s: error reading offset %llu (block %lu): "
+                               "rc = %d\n",
+                               inode->i_sb->s_id, *offs, block, err);
+                        return err;
+                }
+
+                memcpy(buf, bh->b_data + boffs, csize);
+                brelse(bh);
+
+                *offs += csize;
+                buf += csize;
+                size -= csize;
+        }
+        return osize;
+}
+
+static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
+                        struct lu_buf *buf, loff_t *pos,
+                        struct lustre_capa *capa)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+        int           rc;
+
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
+                RETURN(-EACCES);
+
+        /* Read small symlink from inode body as we need to maintain correct
+         * on-disk symlinks for ldiskfs.
+         */
+        if (S_ISLNK(dt->do_lu.lo_header->loh_attr) &&
+            (buf->lb_len <= sizeof(LDISKFS_I(inode)->i_data)))
+                rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
+        else
+                rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
+
+        return rc;
+}
+
+static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
+                                 const loff_t size, loff_t pos,
+                                 struct thandle *handle)
+{
+        struct osd_thandle *oh;
+        int                 credits;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        /* XXX: size == 0 or INT_MAX indicating a catalog header update or
+         *      llog write, see comment in mdd_declare_llog_record().
+         *
+         *      This hack will be removed with llog over OSD landing
+         */
+        if (size == DECLARE_LLOG_REWRITE)
+                credits = 2;
+        else if (size == DECLARE_LLOG_WRITE)
+                credits = 6;
+        else
+                credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+
+        OSD_DECLARE_OP(oh, write);
+        oh->ot_credits += credits;
+
+        if (osd_dt_obj(dt)->oo_inode == NULL)
+                return 0;
+
+        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+                        osd_dt_obj(dt)->oo_inode);
+        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+                        osd_dt_obj(dt)->oo_inode);
+        return 0;
+}
+
+static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
+{
+
+        memcpy((char *)&LDISKFS_I(inode)->i_data, (char *)buffer, buflen);
+        LDISKFS_I(inode)->i_disksize = buflen;
+        i_size_write(inode, buflen);
+        inode->i_sb->s_op->dirty_inode(inode);
+
+        return 0;
+}
+
+static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
+                                    loff_t *offs, handle_t *handle)
+{
+        struct buffer_head *bh        = NULL;
+        loff_t              offset    = *offs;
+        loff_t              new_size  = i_size_read(inode);
+        unsigned long       block;
+        int                 blocksize = 1 << inode->i_blkbits;
+        int                 err = 0;
+        int                 size;
+        int                 boffs;
+        int                 dirty_inode = 0;
+
+        while (bufsize > 0) {
+                if (bh != NULL)
+                        brelse(bh);
+
+                block = offset >> inode->i_blkbits;
+                boffs = offset & (blocksize - 1);
+                size = min(blocksize - boffs, bufsize);
+                bh = ldiskfs_bread(handle, inode, block, 1, &err);
+                if (!bh) {
+                        CERROR("%s: error reading offset %llu (block %lu): "
+                               "rc = %d\n",
+                               inode->i_sb->s_id, offset, block, err);
+                        break;
+                }
+
+                err = ldiskfs_journal_get_write_access(handle, bh);
+                if (err) {
+                        CERROR("journal_get_write_access() returned error %d\n",
+                               err);
+                        break;
+                }
+                LASSERTF(boffs + size <= bh->b_size,
+                         "boffs %d size %d bh->b_size %lu",
+                         boffs, size, (unsigned long)bh->b_size);
+                memcpy(bh->b_data + boffs, buf, size);
+                err = ldiskfs_journal_dirty_metadata(handle, bh);
+                if (err)
+                        break;
+
+                if (offset + size > new_size)
+                        new_size = offset + size;
+                offset += size;
+                bufsize -= size;
+                buf += size;
+        }
+        if (bh)
+                brelse(bh);
+
+        /* correct in-core and on-disk sizes */
+        if (new_size > i_size_read(inode)) {
+                cfs_spin_lock(&inode->i_lock);
+                if (new_size > i_size_read(inode))
+                        i_size_write(inode, new_size);
+                if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
+                        LDISKFS_I(inode)->i_disksize = i_size_read(inode);
+                        dirty_inode = 1;
+                }
+                cfs_spin_unlock(&inode->i_lock);
+                if (dirty_inode)
+                        inode->i_sb->s_op->dirty_inode(inode);
+        }
+
+        if (err == 0)
+                *offs = offset;
+        return err;
+}
+
+static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, loff_t *pos,
+                         struct thandle *handle, struct lustre_capa *capa,
+                         int ignore_quota)
+{
+        struct inode       *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_thandle *oh;
+        ssize_t             result;
+#ifdef HAVE_QUOTA_SUPPORT
+        cfs_cap_t           save = cfs_curproc_cap_pack();
+#endif
+
+        LASSERT(dt_object_exists(dt));
+
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
+                return -EACCES;
+
+        LASSERT(handle != NULL);
+
+        /* XXX: don't check: one declared chunk can be used many times */
+        /* OSD_EXEC_OP(handle, write); */
+
+        oh = container_of(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (ignore_quota)
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
+        else
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
+#endif
+        /* Write small symlink to inode body as we need to maintain correct
+         * on-disk symlinks for ldiskfs.
+         */
+        if (S_ISLNK(dt->do_lu.lo_header->loh_attr) &&
+            (buf->lb_len < sizeof(LDISKFS_I(inode)->i_data)))
+                result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
+        else
+                result = osd_ldiskfs_write_record(inode, buf->lb_buf,
+                                                  buf->lb_len, pos,
+                                                  oh->ot_handle);
+#ifdef HAVE_QUOTA_SUPPORT
+        cfs_curproc_cap_unpack(save);
+#endif
+        if (result == 0)
+                result = buf->lb_len;
+        return result;
+}
+
+static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
+                             __u64 start, __u64 end, struct thandle *th)
+{
+        struct osd_thandle *oh;
+        ENTRY;
+
+        LASSERT(th);
+        oh = container_of(th, struct osd_thandle, ot_super);
+
+        OSD_DECLARE_OP(oh, punch);
+
+        /*
+         * we don't need to reserve credits for whole truncate
+         * it's not possible as truncate may need to free too many
+         * blocks and that won't fit a single transaction. instead
+         * we reserve credits to change i_size and put inode onto
+         * orphan list. if needed truncate will extend or restart
+         * transaction
+         */
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+        oh->ot_credits += 3;
+
+        RETURN(0);
+}
+
+static int osd_punch(const struct lu_env *env, struct dt_object *dt,
+                     __u64 start, __u64 end, struct thandle *th,
+                     struct lustre_capa *capa)
+{
+        struct osd_thandle *oh;
+        struct osd_object  *obj = osd_dt_obj(dt);
+        struct inode       *inode = obj->oo_inode;
+        handle_t           *h;
+        tid_t               tid;
+        int                 rc, rc2 = 0;
+        ENTRY;
+
+        LASSERT(end == OBD_OBJECT_EOF);
+        LASSERT(dt_object_exists(dt));
+        LASSERT(osd_invariant(obj));
+
+        LASSERT(th);
+        oh = container_of(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle->h_transaction != NULL);
+
+        OSD_EXEC_OP(th, punch);
+
+        tid = oh->ot_handle->h_transaction->t_tid;
+
+        rc = vmtruncate(inode, start);
+
+        /*
+         * For a partial-page truncate, flush the page to disk immediately to
+         * avoid data corruption during direct disk write.  b=17397
+         */
+        if (rc == 0 && (start & ~CFS_PAGE_MASK) != 0)
+                rc = filemap_fdatawrite_range(inode->i_mapping, start, start+1);
+
+        h = journal_current_handle();
+        LASSERT(h != NULL);
+        LASSERT(h == oh->ot_handle);
+
+        if (tid != h->h_transaction->t_tid) {
+                int credits = oh->ot_credits;
+                /*
+                 * transaction has changed during truncate
+                 * we need to restart the handle with our credits
+                 */
+                if (h->h_buffer_credits < credits) {
+                        if (ldiskfs_journal_extend(h, credits))
+                                rc2 = ldiskfs_journal_restart(h, credits);
+                }
+        }
+
+        RETURN(rc == 0 ? rc2 : rc);
+}
+
+static int osd_fiemap_get(const struct lu_env *env, struct dt_object *dt,
+                          struct ll_user_fiemap *fm)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct dentry          *dentry = &info->oti_obj_dentry;
+        struct file            *file   = &info->oti_file;
+        mm_segment_t            saved_fs;
+        int rc;
+
+        LASSERT(inode);
+        dentry->d_inode = inode;
+        file->f_dentry = dentry;
+        file->f_mapping = inode->i_mapping;
+        file->f_op = inode->i_fop;
+
+        saved_fs = get_fs();
+        set_fs(get_ds());
+        /* ldiskfs_ioctl does not have a inode argument */
+        if (inode->i_fop->unlocked_ioctl)
+                rc = inode->i_fop->unlocked_ioctl(file, FSFILT_IOC_FIEMAP,
+                                                  (long)fm);
+        else
+                rc = -ENOTTY;
+        set_fs(saved_fs);
+        return rc;
+}
+
+/*
+ * in some cases we may need declare methods for objects being created
+ * e.g., when we create symlink
+ */
+const struct dt_body_operations osd_body_ops_new = {
+        .dbo_declare_write = osd_declare_write,
+};
+
+const struct dt_body_operations osd_body_ops = {
+        .dbo_read                 = osd_read,
+        .dbo_declare_write        = osd_declare_write,
+        .dbo_write                = osd_write,
+        .dbo_bufs_get             = osd_bufs_get,
+        .dbo_bufs_put             = osd_bufs_put,
+        .dbo_write_prep           = osd_write_prep,
+        .dbo_declare_write_commit = osd_declare_write_commit,
+        .dbo_write_commit         = osd_write_commit,
+        .dbo_read_prep            = osd_read_prep,
+        .do_declare_punch         = osd_declare_punch,
+        .do_punch                 = osd_punch,
+        .dbo_fiemap_get           = osd_fiemap_get,
+};
+
index 34a3a20..2bc3f37 100644 (file)
 
 #ifdef LPROCFS
 
-static const char *osd_counter_names[LPROC_OSD_NR] = {
+void osd_brw_stats_update(struct osd_device *osd, struct osd_iobuf *iobuf)
+{
+        struct brw_stats *s = &osd->od_brw_stats;
+        unsigned long    *last_block = NULL;
+        struct page     **pages = iobuf->dr_pages;
+        struct page      *last_page = NULL;
+        unsigned long     discont_pages = 0;
+        unsigned long     discont_blocks = 0;
+        unsigned long    *blocks = iobuf->dr_blocks;
+        int               i, nr_pages = iobuf->dr_npages;
+        int               blocks_per_page;
+        int               rw = iobuf->dr_rw;
+
+        if (unlikely(nr_pages == 0))
+                return;
+
+        blocks_per_page = CFS_PAGE_SIZE >> osd_sb(osd)->s_blocksize_bits;
+
+        lprocfs_oh_tally_log2(&s->hist[BRW_R_PAGES+rw], nr_pages);
+
+        while (nr_pages-- > 0) {
+                if (last_page && (*pages)->index != (last_page->index + 1))
+                        discont_pages++;
+                last_page = *pages;
+                pages++;
+                for (i = 0; i < blocks_per_page; i++) {
+                        if (last_block && *blocks != (*last_block + 1))
+                                discont_blocks++;
+                        last_block = blocks++;
+                }
+        }
+
+        lprocfs_oh_tally(&s->hist[BRW_R_DISCONT_PAGES+rw], discont_pages);
+        lprocfs_oh_tally(&s->hist[BRW_R_DISCONT_BLOCKS+rw], discont_blocks);
+}
+
+#define pct(a, b) (b ? a * 100 / b : 0)
+
+static void display_brw_stats(struct seq_file *seq, char *name, char *units,
+        struct obd_histogram *read, struct obd_histogram *write, int scale)
+{
+        unsigned long read_tot, write_tot, r, w, read_cum = 0, write_cum = 0;
+        int i;
+
+        seq_printf(seq, "\n%26s read      |     write\n", " ");
+        seq_printf(seq, "%-22s %-5s %% cum %% |  %-11s %% cum %%\n",
+                   name, units, units);
+
+        read_tot = lprocfs_oh_sum(read);
+        write_tot = lprocfs_oh_sum(write);
+        for (i = 0; i < OBD_HIST_MAX; i++) {
+                r = read->oh_buckets[i];
+                w = write->oh_buckets[i];
+                read_cum += r;
+                write_cum += w;
+                if (read_cum == 0 && write_cum == 0)
+                        continue;
+
+                if (!scale)
+                        seq_printf(seq, "%u", i);
+                else if (i < 10)
+                        seq_printf(seq, "%u", scale << i);
+                else if (i < 20)
+                        seq_printf(seq, "%uK", scale << (i-10));
+                else
+                        seq_printf(seq, "%uM", scale << (i-20));
+
+                seq_printf(seq, ":\t\t%10lu %3lu %3lu   | %4lu %3lu %3lu\n",
+                           r, pct(r, read_tot), pct(read_cum, read_tot),
+                           w, pct(w, write_tot), pct(write_cum, write_tot));
+
+                if (read_cum == read_tot && write_cum == write_tot)
+                        break;
+        }
+}
+
+static void brw_stats_show(struct seq_file *seq, struct brw_stats *brw_stats)
+{
+        struct timeval now;
+
+        /* this sampling races with updates */
+        cfs_gettimeofday(&now);
+        seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
+                   now.tv_sec, now.tv_usec);
+
+        display_brw_stats(seq, "pages per bulk r/w", "rpcs",
+                          &brw_stats->hist[BRW_R_PAGES],
+                          &brw_stats->hist[BRW_W_PAGES], 1);
+
+        display_brw_stats(seq, "discontiguous pages", "rpcs",
+                          &brw_stats->hist[BRW_R_DISCONT_PAGES],
+                          &brw_stats->hist[BRW_W_DISCONT_PAGES], 0);
+
+        display_brw_stats(seq, "discontiguous blocks", "rpcs",
+                          &brw_stats->hist[BRW_R_DISCONT_BLOCKS],
+                          &brw_stats->hist[BRW_W_DISCONT_BLOCKS], 0);
+
+        display_brw_stats(seq, "disk fragmented I/Os", "ios",
+                          &brw_stats->hist[BRW_R_DIO_FRAGS],
+                          &brw_stats->hist[BRW_W_DIO_FRAGS], 0);
+
+        display_brw_stats(seq, "disk I/Os in flight", "ios",
+                          &brw_stats->hist[BRW_R_RPC_HIST],
+                          &brw_stats->hist[BRW_W_RPC_HIST], 0);
+
+        display_brw_stats(seq, "I/O time (1/1000s)", "ios",
+                          &brw_stats->hist[BRW_R_IO_TIME],
+                          &brw_stats->hist[BRW_W_IO_TIME], 1000 / CFS_HZ);
+
+        display_brw_stats(seq, "disk I/O size", "ios",
+                          &brw_stats->hist[BRW_R_DISK_IOSIZE],
+                          &brw_stats->hist[BRW_W_DISK_IOSIZE], 1);
+}
+
+#undef pct
+
+static int osd_brw_stats_seq_show(struct seq_file *seq, void *v)
+{
+        struct osd_device *osd = seq->private;
+
+        brw_stats_show(seq, &osd->od_brw_stats);
+
+        return 0;
+}
+
+static ssize_t osd_brw_stats_seq_write(struct file *file, const char *buf,
+                                       size_t len, loff_t *off)
+{
+        struct seq_file *seq = file->private_data;
+        struct osd_device *osd = seq->private;
+        int i;
+
+        for (i = 0; i < BRW_LAST; i++)
+                lprocfs_oh_clear(&osd->od_brw_stats.hist[i]);
+
+        return len;
+}
+
+LPROC_SEQ_FOPS(osd_brw_stats);
+
+static int osd_stats_init(struct osd_device *osd)
+{
+        int i, result;
+        ENTRY;
+
+        for (i = 0; i < BRW_LAST; i++)
+                cfs_spin_lock_init(&osd->od_brw_stats.hist[i].oh_lock);
+
+        osd->od_stats = lprocfs_alloc_stats(LPROC_OSD_LAST, 0);
+        if (osd->od_stats != NULL) {
+                result = lprocfs_register_stats(osd->od_proc_entry, "stats",
+                                                osd->od_stats);
+                if (result)
+                        GOTO(out, result);
+
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_GET_PAGE,
+                                     LPROCFS_CNTR_AVGMINMAX|LPROCFS_CNTR_STDDEV,
+                                     "get_page", "usec");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_NO_PAGE,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "get_page_failures", "num");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_CACHE_ACCESS,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_access", "pages");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_CACHE_HIT,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_hit", "pages");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_CACHE_MISS,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "cache_miss", "pages");
+#if OSD_THANDLE_STATS
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_THANDLE_STARTING,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "thandle starting", "usec");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_THANDLE_OPEN,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "thandle open", "usec");
+                lprocfs_counter_init(osd->od_stats, LPROC_OSD_THANDLE_CLOSING,
+                                     LPROCFS_CNTR_AVGMINMAX,
+                                     "thandle closing", "usec");
+#endif
+                lprocfs_seq_create(osd->od_proc_entry, "brw_stats",
+                                   0444, &osd_brw_stats_fops, osd);
+        } else
+                result = -ENOMEM;
+
+out:
+        RETURN(result);
+}
+
+static const char *osd_counter_names[] = {
 #if OSD_THANDLE_STATS
         [LPROC_OSD_THANDLE_STARTING] = "thandle starting",
         [LPROC_OSD_THANDLE_OPEN]     = "thandle open",
@@ -87,6 +277,9 @@ int osd_procfs_init(struct osd_device *osd, const char *name)
         rc = lu_time_init(&osd->od_stats,
                           osd->od_proc_entry,
                           osd_counter_names, ARRAY_SIZE(osd_counter_names));
+
+        rc = osd_stats_init(osd);
+
         EXIT;
 out:
         if (rc)