Whamcloud - gitweb
LU-1405 osd: Kernel panic when running sanityn test_12
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index c461662..827621b 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
- *
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -94,126 +88,12 @@ static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
 
-struct osd_directory {
-        struct iam_container od_container;
-        struct iam_descr     od_descr;
-};
-
-struct osd_object {
-        struct dt_object       oo_dt;
-        /**
-         * Inode for file system object represented by this osd_object. This
-         * inode is pinned for the whole duration of lu_object life.
-         *
-         * Not modified concurrently (either setup early during object
-         * creation, or assigned by osd_object_create() under write lock).
-         */
-        struct inode          *oo_inode;
-        /**
-         * to protect index ops.
-         */
-        struct htree_lock_head *oo_hl_head;
-        cfs_rw_semaphore_t     oo_ext_idx_sem;
-        cfs_rw_semaphore_t     oo_sem;
-        struct osd_directory  *oo_dir;
-        /** protects inode attributes. */
-        cfs_spinlock_t         oo_guard;
-        /**
-         * Following two members are used to indicate the presence of dot and
-         * dotdot in the given directory. This is required for interop mode
-         * (b11826).
-         */
-        int                    oo_compat_dot_created;
-        int                    oo_compat_dotdot_created;
-
-        const struct lu_env   *oo_owner;
-#ifdef CONFIG_LOCKDEP
-        struct lockdep_map     oo_dep_map;
-#endif
-};
-
 static const struct lu_object_operations      osd_lu_obj_ops;
-static const struct lu_device_operations      osd_lu_ops;
-static       struct lu_context_key            osd_key;
 static const struct dt_object_operations      osd_obj_ops;
 static const struct dt_object_operations      osd_obj_ea_ops;
-static const struct dt_body_operations        osd_body_ops;
 static const struct dt_index_operations       osd_index_iam_ops;
 static const struct dt_index_operations       osd_index_ea_ops;
 
-struct osd_thandle {
-        struct thandle          ot_super;
-        handle_t               *ot_handle;
-        struct journal_callback ot_jcb;
-        cfs_list_t              ot_dcb_list;
-        /* Link to the device, for debugging. */
-        struct lu_ref_link     *ot_dev_link;
-
-#if OSD_THANDLE_STATS
-        /** time when this handle was allocated */
-        cfs_time_t oth_alloced;
-
-        /** time when this thanle was started */
-        cfs_time_t oth_started;
-#endif
-};
-
-/*
- * Helpers.
- */
-static int lu_device_is_osd(const struct lu_device *d)
-{
-        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
-}
-
-static struct osd_device *osd_dt_dev(const struct dt_device *d)
-{
-        LASSERT(lu_device_is_osd(&d->dd_lu_dev));
-        return container_of0(d, struct osd_device, od_dt_dev);
-}
-
-static struct osd_device *osd_dev(const struct lu_device *d)
-{
-        LASSERT(lu_device_is_osd(d));
-        return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
-}
-
-static struct osd_device *osd_obj2dev(const struct osd_object *o)
-{
-        return osd_dev(o->oo_dt.do_lu.lo_dev);
-}
-
-static struct super_block *osd_sb(const struct osd_device *dev)
-{
-        return dev->od_mount->lmi_mnt->mnt_sb;
-}
-
-static int osd_object_is_root(const struct osd_object *obj)
-{
-        return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
-}
-
-static struct osd_object *osd_obj(const struct lu_object *o)
-{
-        LASSERT(lu_device_is_osd(o->lo_dev));
-        return container_of0(o, struct osd_object, oo_dt.do_lu);
-}
-
-static struct osd_object *osd_dt_obj(const struct dt_object *d)
-{
-        return osd_obj(&d->do_lu);
-}
-
-static struct lu_device *osd2lu_dev(struct osd_device *osd)
-{
-        return &osd->od_dt_dev.dd_lu_dev;
-}
-
-static journal_t *osd_journal(const struct osd_device *dev)
-{
-        return LDISKFS_SB(osd_sb(dev))->s_journal;
-}
-
 static int osd_has_index(const struct osd_object *obj)
 {
         return obj->oo_dt.do_index_ops != NULL;
@@ -228,8 +108,8 @@ static int osd_object_invariant(const struct lu_object *l)
 static inline void
 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
 {
-        struct md_ucred    *uc = md_ucred(env);
-        struct cred        *tc;
+        struct md_ucred *uc = md_ucred(env);
+        struct cred     *tc;
 
         LASSERT(uc != NULL);
 
@@ -259,11 +139,6 @@ osd_pop_ctxt(struct osd_ctxt *save)
 }
 #endif
 
-static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
-{
-        return lu_context_key_get(&env->le_ctx, &osd_key);
-}
-
 /*
  * Concurrency: doesn't matter
  */
@@ -287,13 +162,55 @@ static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
 static int osd_root_get(const struct lu_env *env,
                         struct dt_device *dev, struct lu_fid *f)
 {
-        struct inode *inode;
-
-        inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
-        LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
+        lu_local_obj_fid(f, OSD_FS_ROOT_OID);
         return 0;
 }
 
+static inline int osd_qid_type(struct osd_thandle *oh, int i)
+{
+        return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
+}
+
+static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
+{
+        oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
+}
+
+void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+                     int type, uid_t id, struct inode *inode)
+{
+#ifdef CONFIG_QUOTA
+        int i, allocated = 0;
+        struct osd_object *obj;
+
+        LASSERT(dt != NULL);
+        LASSERT(oh != NULL);
+        LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u",
+                 oh->ot_id_cnt);
+
+        /* id entry is allocated in the quota file */
+        if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off)
+                allocated = 1;
+
+        for (i = 0; i < oh->ot_id_cnt; i++) {
+                if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type)
+                        return;
+        }
+
+        if (unlikely(i >= OSD_MAX_UGID_CNT)) {
+                CERROR("more than %d uid/gids for a transaction?\n", i);
+                return;
+        }
+
+        oh->ot_id_array[i] = id;
+        osd_qid_set_type(oh, i, type);
+        oh->ot_id_cnt++;
+        obj = osd_dt_obj(dt);
+        oh->ot_credits += (allocated || id == 0) ?
+                1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj)));
+#endif
+}
+
 /*
  * OSD object methods.
  */
@@ -332,23 +249,15 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 /*
  * retrieve object from backend ext fs.
  **/
-static struct inode *osd_iget(struct osd_thread_info *info,
-                              struct osd_device *dev,
-                              const struct osd_inode_id *id)
+struct inode *osd_iget(struct osd_thread_info *info,
+                       struct osd_device *dev,
+                       const struct osd_inode_id *id)
 {
         struct inode *inode = NULL;
 
-#ifdef HAVE_EXT4_LDISKFS
         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
-        if (IS_ERR(inode))
-        /* Newer kernels return an error instead of a NULL pointer */
-                inode = NULL;
-#else
-        inode = iget(osd_sb(dev), id->oii_ino);
-#endif
-        if (inode == NULL) {
-                CERROR("no inode\n");
-                inode = ERR_PTR(-EACCES);
+        if (IS_ERR(inode)) {
+                CERROR("Cannot get inode, rc = %li\n", PTR_ERR(inode));
         } else if (id->oii_gen != OSD_OII_NOGEN &&
                    inode->i_generation != id->oii_gen) {
                 iput(inode);
@@ -383,13 +292,12 @@ static int osd_fid_lookup(const struct lu_env *env,
         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
         struct osd_device      *dev;
         struct osd_inode_id    *id;
-        struct osd_oi          *oi;
         struct inode           *inode;
         int                     result;
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
+        LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
         /*
          * This assertion checks that osd layer sees only local
          * fids. Unfortunately it is somewhat expensive (does a
@@ -400,18 +308,18 @@ static int osd_fid_lookup(const struct lu_env *env,
         ENTRY;
 
         info = osd_oti_get(env);
+        LASSERT(info);
         dev  = osd_dev(ldev);
         id   = &info->oti_id;
-        oi   = &dev->od_oi;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
                 RETURN(-ENOENT);
 
-        result = osd_oi_lookup(info, oi, fid, id);
+        result = osd_oi_lookup(info, dev, fid, id);
         if (result != 0) {
                 if (result == -ENOENT)
                         result = 0;
-                goto out;
+                GOTO(out, result);
         }
 
         inode = osd_iget(info, dev, id);
@@ -424,7 +332,7 @@ static int osd_fid_lookup(const struct lu_env *env,
                  * place holders for objects yet to be created.
                  */
                 result = PTR_ERR(inode);
-                goto out;
+                GOTO(out, result);
         }
 
         obj->oo_inode = inode;
@@ -446,7 +354,6 @@ static int osd_fid_lookup(const struct lu_env *env,
         }
 out:
         LINVRNT(osd_invariant(obj));
-
         RETURN(result);
 }
 
@@ -474,6 +381,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
         LINVRNT(osd_invariant(obj));
 
         result = osd_fid_lookup(env, obj, lu_object_fid(l));
+        obj->oo_dt.do_body_ops = &osd_body_ops_new;
         if (result == 0) {
                 if (obj->oo_inode != NULL)
                         osd_object_init0(obj);
@@ -498,30 +406,6 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         OBD_FREE_PTR(obj);
 }
 
-/**
- * IAM Iterator
- */
-static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
-                                             const struct iam_container *bag)
-{
-        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
-                                           osd_oti_get(env)->oti_it_ipd);
-}
-
-static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
-                                              const struct iam_container *bag)
-{
-        return bag->ic_descr->id_ops->id_ipd_alloc(bag,
-                                           osd_oti_get(env)->oti_idx_ipd);
-}
-
-static void osd_ipd_put(const struct lu_env *env,
-                        const struct iam_container *bag,
-                        struct iam_path_descr *ipd)
-{
-        bag->ic_descr->id_ops->id_ipd_free(ipd);
-}
-
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
@@ -641,9 +525,11 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev,
  * Concurrency: doesn't access mutable data.
  */
 static int osd_param_is_sane(const struct osd_device *dev,
-                             const struct txn_param *param)
+                             const struct thandle *th)
 {
-        return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+        struct osd_thandle *oh;
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
 }
 
 /*
@@ -668,9 +554,11 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
 
         dt_txn_hook_commit(th);
 
-        /* call per-transaction callbacks if any */
-        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
-                dcb->dcb_func(NULL, th, dcb, error);
+       /* call per-transaction callbacks if any */
+       cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
+               cfs_list_del_init(&dcb->dcb_linkage);
+               dcb->dcb_func(NULL, th, dcb, error);
+       }
 
         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
         lu_device_put(lud);
@@ -681,86 +569,140 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
         OBD_FREE_PTR(oh);
 }
 
+static struct thandle *osd_trans_create(const struct lu_env *env,
+                                        struct dt_device *d)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
+        struct osd_thandle     *oh;
+        struct thandle         *th;
+        ENTRY;
+
+        /* on pending IO in this thread should left from prev. request */
+        LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+
+        th = ERR_PTR(-ENOMEM);
+        OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
+        if (oh != NULL) {
+                th = &oh->ot_super;
+                th->th_dev = d;
+                th->th_result = 0;
+                th->th_tags = LCT_TX_HANDLE;
+                oh->ot_credits = 0;
+                oti->oti_dev = osd_dt_dev(d);
+                CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
+                osd_th_alloced(oh);
+        }
+        RETURN(th);
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
-static struct thandle *osd_trans_start(const struct lu_env *env,
-                                       struct dt_device *d,
-                                       struct txn_param *p)
+int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+                    struct thandle *th)
 {
+        struct osd_thread_info *oti = osd_oti_get(env);
         struct osd_device  *dev = osd_dt_dev(d);
         handle_t           *jh;
         struct osd_thandle *oh;
-        struct thandle     *th;
-        int hook_res;
+        int rc;
 
         ENTRY;
 
-        hook_res = dt_txn_hook_start(env, d, p);
-        if (hook_res != 0)
-                RETURN(ERR_PTR(hook_res));
+        LASSERT(current->journal_info == NULL);
 
-        if (osd_param_is_sane(dev, p)) {
-                OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
-                if (oh != NULL) {
-                        struct osd_thread_info *oti = osd_oti_get(env);
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh != NULL);
+        LASSERT(oh->ot_handle == NULL);
 
-                        /*
-                         * XXX temporary stuff. Some abstraction layer should
-                         * be used.
-                         */
-                        oti->oti_dev = dev;
-                        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
-                        osd_th_alloced(oh);
-                        jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
-                        osd_th_started(oh);
-                        if (!IS_ERR(jh)) {
-                                oh->ot_handle = jh;
-                                th = &oh->ot_super;
-                                th->th_dev = d;
-                                th->th_result = 0;
-                                th->th_sync = 0;
-                                lu_device_get(&d->dd_lu_dev);
-                                oh->ot_dev_link = lu_ref_add
-                                        (&d->dd_lu_dev.ld_reference,
-                                         "osd-tx", th);
-                                /* add commit callback */
-                                lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
-                                lu_context_enter(&th->th_ctx);
-                                LASSERT(oti->oti_txns == 0);
-                                LASSERT(oti->oti_r_locks == 0);
-                                LASSERT(oti->oti_w_locks == 0);
-                                oti->oti_txns++;
-                        } else {
-                                OBD_FREE_PTR(oh);
-                                th = (void *)jh;
-                        }
-                } else
-                        th = ERR_PTR(-ENOMEM);
-        } else {
-                CERROR("Invalid transaction parameters\n");
-                th = ERR_PTR(-EINVAL);
+        rc = dt_txn_hook_start(env, d, th);
+        if (rc != 0)
+                GOTO(out, rc);
+
+        if (!osd_param_is_sane(dev, th)) {
+                CWARN("%s: too many transaction credits (%d > %d)\n",
+                      d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
+                      osd_journal(dev)->j_max_transaction_buffers);
+                /* XXX Limit the credits to 'max_transaction_buffers', and
+                 *     let the underlying filesystem to catch the error if
+                 *     we really need so many credits.
+                 *
+                 *     This should be removed when we can calculate the
+                 *     credits precisely. */
+                oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
+#ifdef OSD_TRACK_DECLARES
+                CERROR("  attr_set: %d, punch: %d, xattr_set: %d,\n",
+                       oh->ot_declare_attr_set, oh->ot_declare_punch,
+                       oh->ot_declare_xattr_set);
+                CERROR("  create: %d, ref_add: %d, ref_del: %d, write: %d\n",
+                       oh->ot_declare_create, oh->ot_declare_ref_add,
+                       oh->ot_declare_ref_del, oh->ot_declare_write);
+                CERROR("  insert: %d, delete: %d, destroy: %d\n",
+                       oh->ot_declare_insert, oh->ot_declare_delete,
+                       oh->ot_declare_destroy);
+#endif
         }
 
-        RETURN(th);
+        /*
+         * XXX temporary stuff. Some abstraction layer should
+         * be used.
+         */
+        jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
+        osd_th_started(oh);
+        if (!IS_ERR(jh)) {
+                oh->ot_handle = jh;
+                LASSERT(oti->oti_txns == 0);
+                lu_context_init(&th->th_ctx, th->th_tags);
+                lu_context_enter(&th->th_ctx);
+
+                lu_device_get(&d->dd_lu_dev);
+                oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
+                                             "osd-tx", th);
+
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&d->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+
+                oti->oti_txns++;
+                rc = 0;
+        } else {
+                rc = PTR_ERR(jh);
+        }
+out:
+        RETURN(rc);
 }
 
 /*
  * Concurrency: shouldn't matter.
  */
-static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
 {
-        int result;
-        struct osd_thandle *oh;
+        int                     rc = 0;
+        struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
 
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
+
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
                 hdl->h_sync = th->th_sync;
+
                 /*
                  * add commit callback
                  * notice we don't do this in osd_trans_start()
@@ -771,20 +713,47 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
 
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
-                LASSERT(oti->oti_r_locks == 0);
-                LASSERT(oti->oti_w_locks == 0);
-                result = dt_txn_hook_stop(env, th);
-                if (result != 0)
-                        CERROR("Failure in transaction hook: %d\n", result);
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+                rc = dt_txn_hook_stop(env, th);
+                if (rc != 0)
+                        CERROR("Failure in transaction hook: %d\n", rc);
                 oh->ot_handle = NULL;
                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
-                                  result = ldiskfs_journal_stop(hdl));
-                if (result != 0)
-                        CERROR("Failure to stop transaction: %d\n", result);
+                                  rc = ldiskfs_journal_stop(hdl));
+                if (rc != 0)
+                        CERROR("Failure to stop transaction: %d\n", rc);
         } else {
                 OBD_FREE_PTR(oh);
         }
-        EXIT;
+
+        /* as we want IO to journal and data IO be concurrent, we don't block
+         * awaiting data IO completion in osd_do_bio(), instead we wait here
+         * once transaction is submitted to the journal. all reqular requests
+         * don't do direct IO (except read/write), thus this wait_event becomes
+         * no-op for them.
+         *
+         * IMPORTANT: we have to wait till any IO submited by the thread is
+         * completed otherwise iobuf may be corrupted by different request
+         */
+        cfs_wait_event(iobuf->dr_wait,
+                       cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+        if (!rc)
+                rc = iobuf->dr_error;
+
+        RETURN(rc);
 }
 
 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
@@ -798,35 +767,6 @@ static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 }
 
 /*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle.
- */
-static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
-{
-        const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
-        struct osd_device      *osd = osd_obj2dev(obj);
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct txn_param       *prm = &oti->oti_txn;
-        struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
-        struct thandle         *th;
-        int result;
-
-        lu_env_init(env_del_obj, LCT_DT_THREAD);
-        txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
-                            OSD_TXN_INODE_DELETE_CREDITS);
-        th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
-        if (!IS_ERR(th)) {
-                result = osd_oi_delete(osd_oti_get(env_del_obj),
-                                       &osd->od_oi, fid, th);
-                osd_trans_stop(env_del_obj, th);
-        } else
-                result = PTR_ERR(th);
-
-        lu_env_fini(env_del_obj);
-        return result;
-}
-
-/*
  * Called just before object is freed. Releases all resources except for
  * object itself (that is released by osd_object_free()).
  *
@@ -846,16 +786,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
         osd_index_fini(obj);
         if (inode != NULL) {
-                int result;
-
-                if (osd_inode_unlinked(inode)) {
-                        result = osd_inode_remove(env, obj);
-                        if (result != 0)
-                                LU_OBJECT_DEBUG(D_ERROR, env, l,
-                                                "Failed to cleanup: %d\n",
-                                                result);
-                }
-
                 iput(inode);
                 obj->oo_inode = NULL;
         }
@@ -867,11 +797,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 static void osd_object_release(const struct lu_env *env,
                                struct lu_object *l)
 {
-        struct osd_object *o = osd_obj(l);
-
-        LASSERT(!lu_object_is_dying(l->lo_header));
-        if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
-                cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
 }
 
 /*
@@ -898,24 +823,39 @@ static int osd_object_print(const struct lu_env *env, void *cookie,
  * Concurrency: shouldn't matter.
  */
 int osd_statfs(const struct lu_env *env, struct dt_device *d,
-               cfs_kstatfs_t *sfs)
+               struct obd_statfs *sfs)
 {
-        struct osd_device *osd = osd_dt_dev(d);
+        struct osd_device  *osd = osd_dt_dev(d);
         struct super_block *sb = osd_sb(osd);
+        struct kstatfs     *ksfs;
         int result = 0;
 
+        /* osd_lproc.c call this without env, allocate ksfs for that case */
+        if (unlikely(env == NULL)) {
+                OBD_ALLOC_PTR(ksfs);
+                if (ksfs == NULL)
+                        return -ENOMEM;
+        } else {
+                ksfs = &osd_oti_get(env)->oti_ksfs;
+        }
+
         cfs_spin_lock(&osd->od_osfs_lock);
         /* cache 1 second */
         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
-                result = ll_do_statfs(sb, &osd->od_kstatfs);
-                if (likely(result == 0)) /* N.B. statfs can't really fail */
+                result = ll_do_statfs(sb, ksfs);
+                if (likely(result == 0)) /* N.B. statfs can't really fail */
                         osd->od_osfs_age = cfs_time_current_64();
+                        statfs_pack(&osd->od_statfs, ksfs);
+                }
         }
 
         if (likely(result == 0))
-                *sfs = osd->od_kstatfs;
+                *sfs = osd->od_statfs;
         cfs_spin_unlock(&osd->od_osfs_lock);
 
+        if (unlikely(env == NULL))
+                OBD_FREE_PTR(ksfs);
+
         return result;
 }
 
@@ -939,6 +879,14 @@ static void osd_conf_get(const struct lu_env *env,
                 param->ddp_mntopts |= MNTOPT_USERXATTR;
         if (test_opt(sb, POSIX_ACL))
                 param->ddp_mntopts |= MNTOPT_ACL;
+
+#if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
+        if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
+                param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
+        else
+#endif
+                param->ddp_max_ea_size = sb->s_blocksize;
+
 }
 
 /**
@@ -988,20 +936,19 @@ static int osd_commit_async(const struct lu_env *env,
 /*
  * Concurrency: shouldn't matter.
  */
-lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
 
-static void osd_ro(const struct lu_env *env, struct dt_device *d)
+static int osd_ro(const struct lu_env *env, struct dt_device *d)
 {
+        struct super_block *sb = osd_sb(osd_dt_dev(d));
+        int rc;
         ENTRY;
 
         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
 
-        __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
-                          fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
-        EXIT;
+        rc = __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
+        RETURN(rc);
 }
 
-
 /*
  * Concurrency: serialization provided by callers.
  */
@@ -1042,7 +989,7 @@ static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
  * Note: we do not count into QUOTA here.
  * If we mount with --data_journal we may need more.
  */
-static const int osd_dto_credits_noquota[DTO_NR] = {
+const int osd_dto_credits_noquota[DTO_NR] = {
         /**
          * Insert/Delete.
          * INDEX_EXTRA_TRANS_BLOCKS(8) +
@@ -1055,7 +1002,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         /**
          * Unused now
          */
-        [DTO_IDNEX_UPDATE]  = 16,
+        [DTO_INDEX_UPDATE]  = 16,
         /**
          * Create a object. The same as create object in EXT3.
          * DATA_TRANS_BLOCKS(14) +
@@ -1064,14 +1011,13 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
          */
         [DTO_OBJECT_CREATE] = 25,
         /**
-         * Unused now
+         * XXX: real credits to be fixed
          */
         [DTO_OBJECT_DELETE] = 25,
         /**
-         * Attr set credits.
-         * 3(inode bits, group, GDT)
+         * Attr set credits (inode)
          */
-        [DTO_ATTR_SET_BASE] = 3,
+        [DTO_ATTR_SET_BASE] = 1,
         /**
          * Xattr set. The same as xattr of EXT3.
          * DATA_TRANS_BLOCKS(14)
@@ -1081,7 +1027,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         [DTO_XATTR_SET]     = 14,
         [DTO_LOG_REC]       = 14,
         /**
-         * creadits for inode change during write.
+         * credits for inode change during write.
          */
         [DTO_WRITE_BASE]    = 3,
         /**
@@ -1095,90 +1041,10 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         [DTO_ATTR_SET_CHOWN]= 0
 };
 
-/**
- * Note: we count into QUOTA here.
- * If we mount with --data_journal we may need more.
- */
-static const int osd_dto_credits_quota[DTO_NR] = {
-        /**
-         * INDEX_EXTRA_TRANS_BLOCKS(8) +
-         * SINGLEDATA_TRANS_BLOCKS(8) +
-         * 2 * QUOTA_TRANS_BLOCKS(2)
-         */
-        [DTO_INDEX_INSERT]  = 20,
-        /**
-         * INDEX_EXTRA_TRANS_BLOCKS(8) +
-         * SINGLEDATA_TRANS_BLOCKS(8) +
-         * 2 * QUOTA_TRANS_BLOCKS(2)
-         */
-        [DTO_INDEX_DELETE]  = 20,
-        /**
-         * Unused now.
-         */
-        [DTO_IDNEX_UPDATE]  = 16,
-        /*
-         * Create a object. Same as create object in EXT3 filesystem.
-         * DATA_TRANS_BLOCKS(16) +
-         * INDEX_EXTRA_BLOCKS(8) +
-         * 3(inode bits, groups, GDT) +
-         * 2 * QUOTA_INIT_BLOCKS(25)
-         */
-        [DTO_OBJECT_CREATE] = 77,
-        /*
-         * Unused now.
-         * DATA_TRANS_BLOCKS(16) +
-         * INDEX_EXTRA_BLOCKS(8) +
-         * 3(inode bits, groups, GDT) +
-         * QUOTA(?)
-         */
-        [DTO_OBJECT_DELETE] = 27,
-        /**
-         * Attr set credits.
-         * 3 (inode bit, group, GDT) +
-         */
-        [DTO_ATTR_SET_BASE] = 3,
-        /**
-         * Xattr set. The same as xattr of EXT3.
-         * DATA_TRANS_BLOCKS(16)
-         * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS are
-         *           also counted in. Do not know why?
-         */
-        [DTO_XATTR_SET]     = 16,
-        [DTO_LOG_REC]       = 16,
-        /**
-         * creadits for inode change during write.
-         */
-        [DTO_WRITE_BASE]    = 3,
-        /**
-         * credits for single block write.
-         */
-        [DTO_WRITE_BLOCK]   = 16,
-        /**
-         * Attr set credits for chown.
-         * It is added to already set setattr credits
-         * 2 * QUOTA_INIT_BLOCKS(25) +
-         * 2 * QUOTA_DEL_BLOCKS(9)
-         */
-        [DTO_ATTR_SET_CHOWN]= 68,
-};
-
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
-                          enum dt_txn_op op)
-{
-        LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
-                ARRAY_SIZE(osd_dto_credits_quota));
-        LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
-#ifdef HAVE_QUOTA_SUPPORT
-        if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
-                return osd_dto_credits_quota[op];
-        else
-#endif
-                return osd_dto_credits_noquota[op];
-}
-
 static const struct dt_device_operations osd_dt_ops = {
         .dt_root_get       = osd_root_get,
         .dt_statfs         = osd_statfs,
+        .dt_trans_create   = osd_trans_create,
         .dt_trans_start    = osd_trans_start,
         .dt_trans_stop     = osd_trans_stop,
         .dt_trans_cb_add   = osd_trans_cb_add,
@@ -1186,7 +1052,6 @@ static const struct dt_device_operations osd_dt_ops = {
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
         .dt_commit_async   = osd_commit_async,
-        .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt,
         .dt_init_quota_ctxt= osd_init_quota_ctxt,
 };
@@ -1315,8 +1180,8 @@ static int capa_is_sane(const struct lu_env *env,
         RETURN(0);
 }
 
-static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
-                           struct lustre_capa *capa, __u64 opc)
+int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
+                    struct lustre_capa *capa, __u64 opc)
 {
         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
@@ -1414,6 +1279,42 @@ static int osd_attr_get(const struct lu_env *env,
         return 0;
 }
 
+static int osd_declare_attr_set(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_attr *attr,
+                                struct thandle *handle)
+{
+        struct osd_thandle *oh;
+        struct osd_object *obj;
+
+        LASSERT(dt != NULL);
+        LASSERT(handle != NULL);
+
+        obj = osd_dt_obj(dt);
+        LASSERT(osd_invariant(obj));
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, attr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        if (attr && attr->la_valid & LA_UID) {
+                if (obj->oo_inode)
+                        osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid,
+                                        obj->oo_inode);
+                osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
+        }
+        if (attr && attr->la_valid & LA_GID) {
+                if (obj->oo_inode)
+                        osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid,
+                                        obj->oo_inode);
+                osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
+        }
+
+        return 0;
+}
+
 static int osd_inode_setattr(const struct lu_env *env,
                              struct inode *inode, const struct lu_attr *attr)
 {
@@ -1423,28 +1324,6 @@ static int osd_inode_setattr(const struct lu_env *env,
 
         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
 
-#ifdef HAVE_QUOTA_SUPPORT
-        if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
-            (bits & LA_GID && attr->la_gid != inode->i_gid)) {
-                struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
-                struct iattr iattr;
-                int rc;
-
-                iattr.ia_valid = 0;
-                if (bits & LA_UID)
-                        iattr.ia_valid |= ATTR_UID;
-                if (bits & LA_GID)
-                        iattr.ia_valid |= ATTR_GID;
-                iattr.ia_uid = attr->la_uid;
-                iattr.ia_gid = attr->la_gid;
-                osd_push_ctxt(env, save);
-                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
-                osd_pop_ctxt(save);
-                if (rc != 0)
-                        return rc;
-        }
-#endif
-
         if (bits & LA_ATIME)
                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
         if (bits & LA_CTIME)
@@ -1489,6 +1368,7 @@ static int osd_attr_set(const struct lu_env *env,
                         struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
+        struct inode      *inode;
         int rc;
 
         LASSERT(handle != NULL);
@@ -1498,12 +1378,37 @@ static int osd_attr_set(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, attr_set);
+
+        inode = obj->oo_inode;
+#ifdef HAVE_QUOTA_SUPPORT
+        if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
+            (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
+                struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
+                struct iattr iattr;
+                int rc;
+
+                iattr.ia_valid = 0;
+                if (attr->la_valid & LA_UID)
+                        iattr.ia_valid |= ATTR_UID;
+                if (attr->la_valid & LA_GID)
+                        iattr.ia_valid |= ATTR_GID;
+                iattr.ia_uid = attr->la_uid;
+                iattr.ia_gid = attr->la_gid;
+                osd_push_ctxt(env, save);
+                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
+                osd_pop_ctxt(save);
+                if (rc != 0)
+                        return rc;
+        }
+#endif
+
         cfs_spin_lock(&obj->oo_guard);
-        rc = osd_inode_setattr(env, obj->oo_inode, attr);
+        rc = osd_inode_setattr(env, inode, attr);
         cfs_spin_unlock(&obj->oo_guard);
 
         if (!rc)
-                obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
+                inode->i_sb->s_op->dirty_inode(inode);
         return rc;
 }
 
@@ -1527,27 +1432,13 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
         return 0;
 }
 
-static struct dentry * osd_child_dentry_get(const struct lu_env *env,
-                                            struct osd_object *obj,
-                                            const char *name,
-                                            const int namelen)
+struct dentry *osd_child_dentry_get(const struct lu_env *env,
+                                    struct osd_object *obj,
+                                    const char *name, const int namelen)
 {
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry *child_dentry = &info->oti_child_dentry;
-        struct dentry *obj_dentry = &info->oti_obj_dentry;
-
-        obj_dentry->d_inode = obj->oo_inode;
-        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
-        obj_dentry->d_name.hash = 0;
-
-        child_dentry->d_name.hash = 0;
-        child_dentry->d_parent = obj_dentry;
-        child_dentry->d_name.name = name;
-        child_dentry->d_name.len = namelen;
-        return child_dentry;
+        return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
 }
 
-
 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                       cfs_umode_t mode,
                       struct dt_allocation_hint *hint,
@@ -1556,7 +1447,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         int result;
         struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oth;
-        struct dt_object   *parent;
+        struct dt_object   *parent = NULL;
         struct inode       *inode;
 #ifdef HAVE_QUOTA_SUPPORT
         struct osd_ctxt    *save = &info->oti_ctxt;
@@ -1577,17 +1468,14 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         if (hint && hint->dah_parent)
                 parent = hint->dah_parent;
-        else
-                parent = osd->od_obj_area;
-
-        LASSERT(parent != NULL);
-        LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
 
 #ifdef HAVE_QUOTA_SUPPORT
         osd_push_ctxt(info->oti_env, save);
 #endif
         inode = ldiskfs_create_inode(oth->ot_handle,
-                                     osd_dt_obj(parent)->oo_inode, mode);
+                                     parent ? osd_dt_obj(parent)->oo_inode :
+                                              osd_sb(osd)->s_root->d_inode,
+                                     mode);
 #ifdef HAVE_QUOTA_SUPPORT
         osd_pop_ctxt(save);
 #endif
@@ -1812,10 +1700,50 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
         id->oii_ino = obj->oo_inode->i_ino;
         id->oii_gen = obj->oo_inode->i_generation;
 
-        return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+        return osd_oi_insert(info, osd, fid, id, th,
                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
 }
 
+static int osd_declare_object_create(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_attr *attr,
+                                     struct dt_allocation_hint *hint,
+                                     struct dt_object_format *dof,
+                                     struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, create);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+        /* XXX: So far, only normal fid needs be inserted into the oi,
+         *      things could be changed later. Revise following code then. */
+        if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
+                OSD_DECLARE_OP(oh, insert);
+                oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+        }
+        /* If this is directory, then we expect . and .. to be inserted as
+         * well. The one directory block always needs to be created for the
+         * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
+         * block), there is no danger of needing a tree for the first block.
+         */
+        if (attr && S_ISDIR(attr->la_mode)) {
+                OSD_DECLARE_OP(oh, insert);
+                OSD_DECLARE_OP(oh, insert);
+                oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE];
+        }
+
+        if (attr) {
+                osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
+                osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
+        }
+        return 0;
+}
+
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                              struct lu_attr *attr,
                              struct dt_allocation_hint *hint,
@@ -1834,6 +1762,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, create);
+
         result = __osd_object_create(info, obj, attr, hint, dof, th);
         if (result == 0)
                 result = __osd_oi_insert(env, obj, fid, th);
@@ -1844,6 +1774,77 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
+ * Called to destroy on-disk representation of the object
+ *
+ * Concurrency: must be locked
+ */
+static int osd_declare_object_destroy(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct thandle *th)
+{
+        struct osd_object  *obj = osd_dt_obj(dt);
+        struct inode       *inode = obj->oo_inode;
+        struct osd_thandle *oh;
+
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+        LASSERT(inode);
+
+        OSD_DECLARE_OP(oh, destroy);
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode);
+        osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode);
+
+        RETURN(0);
+}
+
+static int osd_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
+{
+        const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct inode           *inode = obj->oo_inode;
+        struct osd_device      *osd = osd_obj2dev(obj);
+        struct osd_thandle     *oh;
+        int                     result;
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle);
+        LASSERT(inode);
+        LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
+
+        if (S_ISDIR(inode->i_mode)) {
+                LASSERT(osd_inode_unlinked(inode) ||
+                        inode->i_nlink == 1);
+                cfs_spin_lock(&obj->oo_guard);
+                inode->i_nlink = 0;
+                cfs_spin_unlock(&obj->oo_guard);
+                inode->i_sb->s_op->dirty_inode(inode);
+        } else {
+                LASSERT(osd_inode_unlinked(inode));
+        }
+
+        OSD_EXEC_OP(th, destroy);
+
+        result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
+
+        /* XXX: add to ext3 orphan list */
+        /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
+
+        /* not needed in the cache anymore */
+        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+        RETURN(0);
+}
+
+/**
  * Helper function for osd_xattr_set()
  */
 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -1854,7 +1855,7 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         struct osd_thread_info *info     = osd_oti_get(env);
         struct dentry          *dentry   = &info->oti_child_dentry;
         int                     fs_flags = 0;
-        int  rc;
+        int                     rc;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
@@ -1906,17 +1907,6 @@ static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
 }
 
 /**
- * Helper function to pack the fid, ldiskfs stores fid in packed format.
- */
-void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
-                  struct lu_fid *befider)
-{
-        fid_cpu_to_be(befider, (struct lu_fid *)fid);
-        memcpy(pack->fp_area, befider, sizeof(*befider));
-        pack->fp_len =  sizeof(*befider) + 1;
-}
-
-/**
  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
  * To have compatilibility with 1.8 ldiskfs driver we need to have
@@ -1934,23 +1924,6 @@ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
                       (struct lu_fid *)fid);
 }
 
-int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
-{
-        int result;
-
-        result = 0;
-        switch (pack->fp_len) {
-        case sizeof *fid + 1:
-                memcpy(fid, pack->fp_area, sizeof *fid);
-                fid_be_to_cpu(fid, fid);
-                break;
-        default:
-                CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
-                result = -EIO;
-        }
-        return result;
-}
-
 /**
  * Try to read the fid from inode ea into dt_rec, if return value
  * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
@@ -2019,15 +1992,15 @@ out:
  * \retval -ve, on error
  */
 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
-                             struct lu_attr *attr,
-                             struct dt_allocation_hint *hint,
-                             struct dt_object_format *dof,
-                             struct thandle *th)
+                                struct lu_attr *attr,
+                                struct dt_allocation_hint *hint,
+                                struct dt_object_format *dof,
+                                struct thandle *th)
 {
         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct osd_thread_info *info   = osd_oti_get(env);
-        int result;
+        int                     result;
 
         ENTRY;
 
@@ -2036,8 +2009,9 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
-        result = __osd_object_create(info, obj, attr, hint, dof, th);
+        OSD_EXEC_OP(th, create);
 
+        result = __osd_object_create(info, obj, attr, hint, dof, th);
         /* objects under osd root shld have igif fid, so dont add fid EA */
         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
                 result = osd_ea_fid_set(env, dt, fid);
@@ -2050,75 +2024,200 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         RETURN(result);
 }
 
+static int osd_declare_object_ref_add(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        /* it's possible that object doesn't exist yet */
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_add);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_add(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_add(const struct lu_env *env,
+                              struct dt_object *dt, struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
-        struct inode *inode = obj->oo_inode;
+        struct inode      *inode = obj->oo_inode;
 
         LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_add);
+
+        /*
+         * DIR_NLINK feature is set for compatibility reasons if:
+         * 1) nlinks > LDISKFS_LINK_MAX, or
+         * 2) nlinks == 2, since this indicates i_nlink was previously 1.
+         *
+         * It is easier to always set this flag (rather than check and set),
+         * since it has less overhead, and the superblock will be dirtied
+         * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
+         * do not actually care whether this flag is set or not.
+         */
         cfs_spin_lock(&obj->oo_guard);
-        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         inode->i_nlink++;
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
+                if (inode->i_nlink >= LDISKFS_LINK_MAX ||
+                    inode->i_nlink == 2)
+                        inode->i_nlink = 1;
+        }
+        LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+static int osd_declare_object_ref_del(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_del);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
 }
 
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_del(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
+                              struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
-        struct inode *inode = obj->oo_inode;
+        struct inode      *inode = obj->oo_inode;
 
         LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_del);
+
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
+        /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
+         * then the nlink count is 1. Don't let it be set to 0 or the directory
+         * inode will be deleted incorrectly. */
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
+                inode->i_nlink++;
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+/*
+ * Get the 64-bit version for an inode.
+ */
+static int osd_object_version_get(const struct lu_env *env,
+                                  struct dt_object *dt, dt_obj_version_t *ver)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        *ver = LDISKFS_I(inode)->i_fs_version;
+        return 0;
+}
+
+/*
+ * Concurrency: @dt is read locked.
+ */
+static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
+                         struct lu_buf *buf, const char *name,
+                         struct lustre_capa *capa)
+{
+        struct osd_object      *obj    = osd_dt_obj(dt);
+        struct inode           *inode  = obj->oo_inode;
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct dentry          *dentry = &info->oti_obj_dentry;
+
+        /* version get is not real XATTR but uses xattr API */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_get(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
+        LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
+
+        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
+                return -EACCES;
+
+        dentry->d_inode = inode;
+        return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
+}
+
+
+static int osd_declare_xattr_set(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                 const struct lu_buf *buf, const char *name,
+                                 int fl, struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* no credits for version */
+                return 0;
+        }
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
 }
 
 /*
- * Concurrency: @dt is read locked.
+ * Set the 64-bit version for object
  */
-static int osd_xattr_get(const struct lu_env *env,
-                         struct dt_object *dt,
-                         struct lu_buf *buf,
-                         const char *name,
-                         struct lustre_capa *capa)
+static void osd_object_version_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   dt_obj_version_t *new_version)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_obj_dentry;
-
-        LASSERT(dt_object_exists(dt));
-        LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
-        LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
-                return -EACCES;
+        CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
 
-        dentry->d_inode = inode;
-        return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
+        LDISKFS_I(inode)->i_fs_version = *new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
 }
 
 /*
@@ -2130,19 +2229,27 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
 {
         LASSERT(handle != NULL);
 
+        /* version set is not real XATTR */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_set(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
         return __osd_xattr_set(env, dt, buf, name, fl);
 }
 
 /*
  * Concurrency: @dt is read locked.
  */
-static int osd_xattr_list(const struct lu_env *env,
-                          struct dt_object *dt,
-                          struct lu_buf *buf,
-                          struct lustre_capa *capa)
+static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
+                          struct lu_buf *buf, struct lustre_capa *capa)
 {
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
@@ -2160,13 +2267,29 @@ static int osd_xattr_list(const struct lu_env *env,
         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
 }
 
+static int osd_declare_xattr_del(const struct lu_env *env,
+                                 struct dt_object *dt, const char *name,
+                                 struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
-static int osd_xattr_del(const struct lu_env *env,
-                         struct dt_object *dt,
-                         const char *name,
-                         struct thandle *handle,
+static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                         const char *name, struct thandle *handle,
                          struct lustre_capa *capa)
 {
         struct osd_object      *obj    = osd_dt_obj(dt);
@@ -2183,6 +2306,8 @@ static int osd_xattr_del(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
+
         dentry->d_inode = inode;
         rc = inode->i_op->removexattr(dentry, name);
         return rc;
@@ -2277,12 +2402,13 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
 
 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
 {
-        int rc;
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
         struct file            *file   = &info->oti_file;
+        int                     rc;
+
         ENTRY;
 
         dentry->d_inode = inode;
@@ -2295,35 +2421,6 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
-/*
- * Get the 64-bit version for an inode.
- */
-static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
-                                               struct dt_object *dt)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
-               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        return LDISKFS_I(inode)->i_fs_version;
-}
-
-/*
- * Set the 64-bit version and return the old version.
- */
-static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
-                                   dt_obj_version_t new_version)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
-               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        LDISKFS_I(inode)->i_fs_version = new_version;
-        /** Version is set after all inode operations are finished,
-         *  so we should mark it dirty here */
-        inode->i_sb->s_op->dirty_inode(inode);
-}
-
 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
                         void **data)
 {
@@ -2469,27 +2566,33 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 }
 
 static const struct dt_object_operations osd_obj_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
 /**
@@ -2497,255 +2600,51 @@ static const struct dt_object_operations osd_obj_ops = {
  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_object_operations osd_obj_ea_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_ea_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_ea_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
-/*
- * Body operations.
- */
-
-/*
- * XXX: Another layering violation for now.
- *
- * We don't want to use ->f_op->read methods, because generic file write
- *
- *         - serializes on ->i_sem, and
- *
- *         - does a lot of extra work like balance_dirty_pages(),
- *
- * which doesn't work for globally shared files like /last-received.
- */
-static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
-{
-        struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
-
-        memcpy(buffer, (char*)ei->i_data, buflen);
-
-        return  buflen;
-}
-
-static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
-                            loff_t *offs)
-{
-        struct buffer_head *bh;
-        unsigned long block;
-        int osize = size;
-        int blocksize;
-        int csize;
-        int boffs;
-        int err;
-
-        /* prevent reading after eof */
-        spin_lock(&inode->i_lock);
-        if (i_size_read(inode) < *offs + size) {
-                size = i_size_read(inode) - *offs;
-                spin_unlock(&inode->i_lock);
-                if (size < 0) {
-                        CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
-                               i_size_read(inode), *offs);
-                        return -EBADR;
-                } else if (size == 0) {
-                        return 0;
-                }
-        } else {
-                spin_unlock(&inode->i_lock);
-        }
-
-        blocksize = 1 << inode->i_blkbits;
-
-        while (size > 0) {
-                block = *offs >> inode->i_blkbits;
-                boffs = *offs & (blocksize - 1);
-                csize = min(blocksize - boffs, size);
-                bh = ldiskfs_bread(NULL, inode, block, 0, &err);
-                if (!bh) {
-                        CERROR("can't read block: %d\n", err);
-                        return err;
-                }
-
-                memcpy(buf, bh->b_data + boffs, csize);
-                brelse(bh);
-
-                *offs += csize;
-                buf += csize;
-                size -= csize;
-        }
-        return osize;
-}
-
-static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
-                        struct lu_buf *buf, loff_t *pos,
-                        struct lustre_capa *capa)
+static int osd_index_declare_iam_delete(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        int rc;
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
-                RETURN(-EACCES);
-
-        /* Read small symlink from inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-            (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
-                rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
-        else
-                rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
-
-        return rc;
-}
+        struct osd_thandle    *oh;
 
-static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
-{
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
 
-        memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
-               buflen);
-        LDISKFS_I(inode)->i_disksize = buflen;
-        i_size_write(inode, buflen);
-        inode->i_sb->s_op->dirty_inode(inode);
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
 
         return 0;
 }
 
-static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
-                                    loff_t *offs, handle_t *handle)
-{
-        struct buffer_head *bh = NULL;
-        loff_t offset = *offs;
-        loff_t new_size = i_size_read(inode);
-        unsigned long block;
-        int blocksize = 1 << inode->i_blkbits;
-        int err = 0;
-        int size;
-        int boffs;
-        int dirty_inode = 0;
-
-        while (bufsize > 0) {
-                if (bh != NULL)
-                        brelse(bh);
-
-                block = offset >> inode->i_blkbits;
-                boffs = offset & (blocksize - 1);
-                size = min(blocksize - boffs, bufsize);
-                bh = ldiskfs_bread(handle, inode, block, 1, &err);
-                if (!bh) {
-                        CERROR("can't read/create block: %d\n", err);
-                        break;
-                }
-
-                err = ldiskfs_journal_get_write_access(handle, bh);
-                if (err) {
-                        CERROR("journal_get_write_access() returned error %d\n",
-                               err);
-                        break;
-                }
-                LASSERTF(boffs + size <= bh->b_size,
-                         "boffs %d size %d bh->b_size %lu",
-                         boffs, size, (unsigned long)bh->b_size);
-                memcpy(bh->b_data + boffs, buf, size);
-                err = ldiskfs_journal_dirty_metadata(handle, bh);
-                if (err)
-                        break;
-
-                if (offset + size > new_size)
-                        new_size = offset + size;
-                offset += size;
-                bufsize -= size;
-                buf += size;
-        }
-        if (bh)
-                brelse(bh);
-
-        /* correct in-core and on-disk sizes */
-        if (new_size > i_size_read(inode)) {
-                spin_lock(&inode->i_lock);
-                if (new_size > i_size_read(inode))
-                        i_size_write(inode, new_size);
-                if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
-                        LDISKFS_I(inode)->i_disksize = i_size_read(inode);
-                        dirty_inode = 1;
-                }
-                spin_unlock(&inode->i_lock);
-                if (dirty_inode)
-                        inode->i_sb->s_op->dirty_inode(inode);
-        }
-
-        if (err == 0)
-                *offs = offset;
-        return err;
-}
-
-static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, loff_t *pos,
-                         struct thandle *handle, struct lustre_capa *capa,
-                         int ignore_quota)
-{
-        struct osd_object  *obj   = osd_dt_obj(dt);
-        struct inode       *inode = obj->oo_inode;
-        struct osd_thandle *oh;
-        ssize_t            result = 0;
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t           save = cfs_curproc_cap_pack();
-#endif
-
-        LASSERT(handle != NULL);
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
-                RETURN(-EACCES);
-
-        oh = container_of(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle->h_transaction != NULL);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (ignore_quota)
-                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-        else
-                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-#endif
-        /* Write small symlink to inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-           (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
-                result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
-        else
-                result = osd_ldiskfs_write_record(inode, buf->lb_buf,
-                                                  buf->lb_len, pos,
-                                                  oh->ot_handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_curproc_cap_unpack(save);
-#endif
-        if (result == 0)
-                result = buf->lb_len;
-        return result;
-}
-
-static const struct dt_body_operations osd_body_ops = {
-        .dbo_read  = osd_read,
-        .dbo_write = osd_write
-};
-
-
 /**
  *      delete a (key, value) pair from index \a dt specified by \a key
  *
@@ -2759,14 +2658,15 @@ static const struct dt_body_operations osd_body_ops = {
  */
 
 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
-                                const struct dt_key *key, struct thandle *handle,
+                                const struct dt_key *key,
+                                struct thandle *handle,
                                 struct lustre_capa *capa)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct osd_thandle    *oh;
         struct iam_path_descr *ipd;
         struct iam_container  *bag = &obj->oo_dir->od_container;
-        int rc;
+        int                    rc;
 
         ENTRY;
 
@@ -2778,6 +2678,8 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
                 RETURN(-EACCES);
 
+        OSD_EXEC_OP(handle, delete);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -2792,11 +2694,36 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_ea_delete(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        LASSERT(osd_dt_obj(dt)->oo_inode);
+        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+                        osd_dt_obj(dt)->oo_inode);
+        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+                        osd_dt_obj(dt)->oo_inode);
+
+        return 0;
+}
+
 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
                                           struct dt_rec *fid)
 {
         struct osd_fid_pack *rec;
-        int rc = -ENODATA;
+        int                  rc = -ENODATA;
 
         if (de->file_type & LDISKFS_DIRENT_LUFID) {
                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
@@ -2816,7 +2743,8 @@ static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
  * \retval -ve, on error
  */
 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
-                               const struct dt_key *key, struct thandle *handle,
+                               const struct dt_key *key,
+                               struct thandle *handle,
                                struct lustre_capa *capa)
 {
         struct osd_object          *obj    = osd_dt_obj(dt);
@@ -2826,8 +2754,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct htree_lock          *hlock = NULL;
-
-        int rc;
+        int                         rc;
 
         ENTRY;
 
@@ -2835,6 +2762,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
 
+        OSD_EXEC_OP(handle, delete);
+
         oh = container_of(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle != NULL);
         LASSERT(oh->ot_handle->h_transaction != NULL);
@@ -2885,13 +2814,14 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
                                 struct dt_rec *rec, const struct dt_key *key,
                                 struct lustre_capa *capa)
 {
-        struct osd_object     *obj = osd_dt_obj(dt);
-        struct iam_path_descr *ipd;
-        struct iam_container  *bag = &obj->oo_dir->od_container;
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct iam_path_descr  *ipd;
+        struct iam_container   *bag = &obj->oo_dir->od_container;
         struct osd_thread_info *oti = osd_oti_get(env);
         struct iam_iterator    *it = &oti->oti_idx_it;
-        struct iam_rec *iam_rec;
-        int rc;
+        struct iam_rec         *iam_rec;
+        int                     rc;
+
         ENTRY;
 
         LASSERT(osd_invariant(obj));
@@ -2929,6 +2859,26 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_iam_insert(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_rec *rec,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        return 0;
+}
+
 /**
  *      Inserts (key, value) pair in \a dt index object.
  *
@@ -2941,9 +2891,9 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
  *      \retval -ve failure
  */
 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
-                                const struct dt_rec *rec, const struct dt_key *key,
-                                struct thandle *th, struct lustre_capa *capa,
-                                int ignore_quota)
+                                const struct dt_rec *rec,
+                                const struct dt_key *key, struct thandle *th,
+                                struct lustre_capa *capa, int ignore_quota)
 {
         struct osd_object     *obj = osd_dt_obj(dt);
         struct iam_path_descr *ipd;
@@ -2953,8 +2903,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         cfs_cap_t              save = cfs_curproc_cap_pack();
 #endif
         struct osd_thread_info *oti = osd_oti_get(env);
-        struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
-        int rc;
+        struct iam_rec         *iam_rec = (struct iam_rec *)oti->oti_ldp;
+        int                     rc;
 
         ENTRY;
 
@@ -2966,6 +2916,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 return -EACCES;
 
+        OSD_EXEC_OP(th, insert);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -3002,17 +2954,14 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
  * \retval -ve, on error
  */
 static int __osd_ea_add_rec(struct osd_thread_info *info,
-                            struct osd_object *pobj,
-                            struct inode  *cinode,
-                            const char *name,
-                            const struct dt_rec *fid,
-                            struct htree_lock *hlock,
-                            struct thandle *th)
+                            struct osd_object *pobj, struct inode  *cinode,
+                            const char *name, const struct dt_rec *fid,
+                            struct htree_lock *hlock, struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
-        struct dentry      *child;
-        struct osd_thandle *oth;
-        int rc;
+        struct dentry               *child;
+        struct osd_thandle          *oth;
+        int                          rc;
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle != NULL);
@@ -3020,13 +2969,19 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
 
         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
 
+        /* XXX: remove fid_is_igif() check here.
+         * IGIF check is just to handle insertion of .. when it is 'ROOT',
+         * it is IGIF now but needs FID in dir entry as well for readdir
+         * to work.
+         * LU-838 should fix that and remove fid_is_igif() check */
         if (fid_is_igif((struct lu_fid *)fid) ||
             fid_is_norm((struct lu_fid *)fid)) {
                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
                 osd_get_ldiskfs_dirent_param(ldp, fid);
-                child->d_fsdata = (void*) ldp;
-        } else
+                child->d_fsdata = (void *)ldp;
+        } else {
                 child->d_fsdata = NULL;
+        }
         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
 
         RETURN(rc);
@@ -3051,10 +3006,10 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                               const struct dt_rec *dot_dot_fid,
                               struct thandle *th)
 {
-        struct inode            *inode  = dir->oo_inode;
+        struct inode                *inode = dir->oo_inode;
         struct ldiskfs_dentry_param *dot_ldp;
         struct ldiskfs_dentry_param *dot_dot_ldp;
-        struct osd_thandle      *oth;
+        struct osd_thandle          *oth;
         int result = 0;
 
         oth = container_of(th, struct osd_thandle, ot_super);
@@ -3075,7 +3030,7 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
 
                 if (!dir->oo_compat_dot_created)
                         return -EINVAL;
-                if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
+                if (!fid_is_igif((struct lu_fid *)dot_fid)) {
                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
                 } else {
@@ -3102,16 +3057,13 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
  * It will call the appropriate osd_add* function and return the
  * value, return by respective functions.
  */
-static int osd_ea_add_rec(const struct lu_env *env,
-                          struct osd_object *pobj,
-                          struct inode *cinode,
-                          const char *name,
-                          const struct dt_rec *fid,
-                          struct thandle *th)
-{
-        struct osd_thread_info    *info   = osd_oti_get(env);
-        struct htree_lock         *hlock;
-        int rc;
+static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
+                          struct inode *cinode, const char *name,
+                          const struct dt_rec *fid, struct thandle *th)
+{
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct htree_lock      *hlock;
+        int                     rc;
 
         hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
 
@@ -3162,8 +3114,8 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         struct buffer_head         *bh;
         struct lu_fid              *fid = (struct lu_fid *) rec;
         struct htree_lock          *hlock = NULL;
-        int ino;
-        int rc;
+        int                         ino;
+        int                         rc;
 
         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
 
@@ -3210,10 +3162,10 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct lu_fid *fid)
 {
-        struct lu_device         *ludev = dt->do_lu.lo_dev;
-        struct osd_object        *child = NULL;
-        struct lu_object         *luch;
-        struct lu_object         *lo;
+        struct lu_device  *ludev = dt->do_lu.lo_dev;
+        struct osd_object *child = NULL;
+        struct lu_object  *luch;
+        struct lu_object  *lo;
 
         luch = lu_object_find(env, ludev, fid, NULL);
         if (!IS_ERR(luch)) {
@@ -3254,6 +3206,32 @@ static inline void osd_object_put(const struct lu_env *env,
         lu_object_put(env, &obj->oo_dt.do_lu);
 }
 
+static int osd_index_declare_ea_insert(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        LASSERT(osd_dt_obj(dt)->oo_inode);
+        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
+                        osd_dt_obj(dt)->oo_inode);
+        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
+                        osd_dt_obj(dt)->oo_inode);
+
+        return 0;
+}
+
 /**
  * Index add function for interoperability mode (b11826).
  * It will add the directory entry.This entry is needed to
@@ -3270,14 +3248,14 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                                const struct dt_key *key, struct thandle *th,
                                struct lustre_capa *capa, int ignore_quota)
 {
-        struct osd_object        *obj   = osd_dt_obj(dt);
-        struct lu_fid            *fid   = (struct lu_fid *) rec;
-        const char               *name  = (const char *)key;
-        struct osd_object        *child;
+        struct osd_object *obj   = osd_dt_obj(dt);
+        struct lu_fid     *fid   = (struct lu_fid *) rec;
+        const char        *name  = (const char *)key;
+        struct osd_object *child;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t                 save  = cfs_curproc_cap_pack();
+        cfs_cap_t          save  = cfs_curproc_cap_pack();
 #endif
-        int rc;
+        int                rc;
 
         ENTRY;
 
@@ -3320,12 +3298,12 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
                                      __u32 unused,
                                      struct lustre_capa *capa)
 {
-        struct osd_it_iam         *it;
+        struct osd_it_iam      *it;
         struct osd_thread_info *oti = osd_oti_get(env);
-        struct osd_object     *obj = osd_dt_obj(dt);
-        struct lu_object      *lo  = &dt->do_lu;
-        struct iam_path_descr *ipd;
-        struct iam_container  *bag = &obj->oo_dir->od_container;
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct lu_object       *lo  = &dt->do_lu;
+        struct iam_path_descr  *ipd;
+        struct iam_container   *bag = &obj->oo_dir->od_container;
 
         LASSERT(lu_object_exists(lo));
 
@@ -3350,7 +3328,7 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
 
 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it_iam     *it = (struct osd_it_iam *)di;
+        struct osd_it_iam *it = (struct osd_it_iam *)di;
         struct osd_object *obj = it->oi_obj;
 
         iam_it_fini(&it->oi_it);
@@ -3370,7 +3348,7 @@ static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
  */
 
 static int osd_it_iam_get(const struct lu_env *env,
-                      struct dt_it *di, const struct dt_key *key)
+                          struct dt_it *di, const struct dt_key *key)
 {
         struct osd_it_iam *it = (struct osd_it_iam *)di;
 
@@ -3430,13 +3408,11 @@ static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
         return iam_it_key_size(&it->oi_it);
 }
 
-static inline void osd_it_append_attrs(struct lu_dirent*ent,
-                                       __u32 attr,
-                                       int len,
-                                       __u16 type)
+static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
+                                       int len, __u16 type)
 {
-        struct luda_type        *lt;
-        const unsigned           align = sizeof(struct luda_type) - 1;
+        struct luda_type *lt;
+        const unsigned    align = sizeof(struct luda_type) - 1;
 
         /* check if file type is required */
         if (attr & LUDA_TYPE) {
@@ -3455,12 +3431,9 @@ static inline void osd_it_append_attrs(struct lu_dirent*ent,
  */
 
 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
-                                      struct lu_fid *fid,
-                                      __u64 offset,
-                                      char *name,
-                                      __u16 namelen,
-                                      __u16 type,
-                                      __u32 attr)
+                                      struct lu_fid *fid, __u64 offset,
+                                      char *name, __u16 namelen,
+                                      __u16 type, __u32 attr)
 {
         fid_cpu_to_le(&ent->lde_fid, fid);
         ent->lde_attrs = LUDA_FID;
@@ -3480,13 +3453,13 @@ static inline void osd_it_pack_dirent(struct lu_dirent *ent,
  */
 static int osd_it_iam_rec(const struct lu_env *env,
                           const struct dt_it *di,
-                          struct lu_dirent *lde,
-                          __u32 attr)
+                          struct dt_rec *dtrec, __u32 attr)
 {
         struct osd_it_iam *it        = (struct osd_it_iam *)di;
         struct osd_thread_info *info = osd_oti_get(env);
         struct lu_fid     *fid       = &info->oti_fid;
         const struct osd_fid_pack *rec;
+        struct lu_dirent *lde = (struct lu_dirent *)dtrec;
         char *name;
         int namelen;
         __u64 hash;
@@ -3537,7 +3510,7 @@ static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
  */
 
 static int osd_it_iam_load(const struct lu_env *env,
-                       const struct dt_it *di, __u64 hash)
+                           const struct dt_it *di, __u64 hash)
 {
         struct osd_it_iam *it = (struct osd_it_iam *)di;
 
@@ -3545,9 +3518,11 @@ static int osd_it_iam_load(const struct lu_env *env,
 }
 
 static const struct dt_index_operations osd_index_iam_ops = {
-        .dio_lookup = osd_index_iam_lookup,
-        .dio_insert = osd_index_iam_insert,
-        .dio_delete = osd_index_iam_delete,
+        .dio_lookup         = osd_index_iam_lookup,
+        .dio_declare_insert = osd_index_declare_iam_insert,
+        .dio_insert         = osd_index_iam_insert,
+        .dio_declare_delete = osd_index_declare_iam_delete,
+        .dio_delete         = osd_index_iam_delete,
         .dio_it     = {
                 .init     = osd_it_iam_init,
                 .fini     = osd_it_iam_fini,
@@ -3800,8 +3775,8 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env,
                                     const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
-        ENTRY;
-        RETURN((struct dt_key *)it->oie_dirent->oied_name);
+
+        return (struct dt_key *)it->oie_dirent->oied_name;
 }
 
 /**
@@ -3814,8 +3789,8 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env,
 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
-        ENTRY;
-        RETURN(it->oie_dirent->oied_namelen);
+
+        return it->oie_dirent->oied_namelen;
 }
 
 
@@ -3832,12 +3807,12 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
  */
 static inline int osd_it_ea_rec(const struct lu_env *env,
                                 const struct dt_it *di,
-                                struct lu_dirent *lde,
-                                __u32 attr)
+                                struct dt_rec *dtrec, __u32 attr)
 {
         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
         struct osd_object       *obj    = it->oie_obj;
         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
+        struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
         int    rc = 0;
 
         ENTRY;
@@ -3865,8 +3840,8 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
-        ENTRY;
-        RETURN(it->oie_dirent->oied_off);
+
+        return it->oie_dirent->oied_off;
 }
 
 /**
@@ -3930,9 +3905,11 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_index_operations osd_index_ea_ops = {
-        .dio_lookup = osd_index_ea_lookup,
-        .dio_insert = osd_index_ea_insert,
-        .dio_delete = osd_index_ea_delete,
+        .dio_lookup         = osd_index_ea_lookup,
+        .dio_declare_insert = osd_index_declare_ea_insert,
+        .dio_insert         = osd_index_ea_insert,
+        .dio_declare_delete = osd_index_declare_ea_delete,
+        .dio_delete         = osd_index_ea_delete,
         .dio_it     = {
                 .init     = osd_it_ea_init,
                 .fini     = osd_it_ea_fini,
@@ -3999,8 +3976,8 @@ static void osd_key_exit(const struct lu_context *ctx,
 /* type constructor/destructor: osd_type_init, osd_type_fini */
 LU_TYPE_INIT_FINI(osd, &osd_key);
 
-static struct lu_context_key osd_key = {
-        .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
+struct lu_context_key osd_key = {
+        .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
         .lct_init = osd_key_init,
         .lct_fini = osd_key_fini,
         .lct_exit = osd_key_exit
@@ -4016,12 +3993,16 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d,
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
 {
         struct osd_thread_info *info = osd_oti_get(env);
+
         ENTRY;
-        if (o->od_obj_area != NULL) {
-                lu_object_put(env, &o->od_obj_area->do_lu);
-                o->od_obj_area = NULL;
+
+        if (o->od_oi_table != NULL)
+                osd_oi_fini(info, o);
+
+        if (o->od_fsops) {
+                fsfilt_put_ops(o->od_fsops);
+                o->od_fsops = NULL;
         }
-        osd_oi_fini(info, &o->od_oi);
 
         RETURN(0);
 }
@@ -4033,8 +4014,16 @@ static int osd_mount(const struct lu_env *env,
         const char               *dev  = lustre_cfg_string(cfg, 0);
         struct lustre_disk_data  *ldd;
         struct lustre_sb_info    *lsi;
+        int                       rc = 0;
 
         ENTRY;
+
+        o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
+        if (o->od_fsops == NULL) {
+                CERROR("Can't find fsfilt_ldiskfs\n");
+                RETURN(-ENOTSUPP);
+        }
+
         if (o->od_mount != NULL) {
                 CERROR("Already mounted (%s)\n", dev);
                 RETURN(-EEXIST);
@@ -4050,18 +4039,24 @@ static int osd_mount(const struct lu_env *env,
         LASSERT(lmi != NULL);
         /* save lustre_mount_info in dt_device */
         o->od_mount = lmi;
+        o->od_mnt = lmi->lmi_mnt;
 
         lsi = s2lsi(lmi->lmi_sb);
         ldd = lsi->lsi_ldd;
 
         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
                 o->od_iop_mode = 0;
-                LCONSOLE_WARN("OSD: IAM mode enabled\n");
+                LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", dev);
         } else
                 o->od_iop_mode = 1;
 
-        o->od_obj_area = NULL;
-        RETURN(0);
+        if (ldd->ldd_flags & LDD_F_SV_TYPE_OST) {
+                rc = osd_compat_init(o);
+                if (rc)
+                        CERROR("%s: can't initialize compats: %d\n", dev, rc);
+        }
+
+        RETURN(rc);
 }
 
 static struct lu_device *osd_device_fini(const struct lu_env *env,
@@ -4070,6 +4065,8 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        osd_compat_fini(osd_dev(d));
+
         shrink_dcache_sb(osd_sb(osd_dev(d)));
         osd_sync(env, lu2dt_dev(d));
 
@@ -4159,46 +4156,25 @@ static int osd_recovery_complete(const struct lu_env *env,
         RETURN(0);
 }
 
-static int osd_prepare(const struct lu_env *env,
-                       struct lu_device *pdev,
+static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
                        struct lu_device *dev)
 {
-        struct osd_device *osd = osd_dev(dev);
-        struct lustre_sb_info *lsi;
-        struct lustre_disk_data *ldd;
-        struct lustre_mount_info  *lmi;
+        struct osd_device      *osd = osd_dev(dev);
         struct osd_thread_info *oti = osd_oti_get(env);
-        struct dt_object *d;
-        int result;
+        int                     result;
 
         ENTRY;
+
         /* 1. initialize oi before any file create or file open */
-        result = osd_oi_init(oti, &osd->od_oi,
-                             &osd->od_dt_dev, lu2md_dev(pdev));
-        if (result != 0)
+        result = osd_oi_init(oti, osd);
+        if (result < 0)
                 RETURN(result);
 
-        lmi = osd->od_mount;
-        lsi = s2lsi(lmi->lmi_sb);
-        ldd = lsi->lsi_ldd;
+        if (!lu_device_is_md(pdev))
+                RETURN(0);
 
         /* 2. setup local objects */
         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
-        if (result)
-                goto out;
-
-        /* 3. open remote object dir */
-        d = dt_store_open(env, lu2dt_dev(dev), "",
-                          remote_obj_dir, &oti->oti_fid);
-        if (!IS_ERR(d)) {
-                osd->od_obj_area = d;
-                result = 0;
-        } else {
-                result = PTR_ERR(d);
-                osd->od_obj_area = NULL;
-        }
-
-out:
         RETURN(result);
 }
 
@@ -4211,7 +4187,7 @@ static const struct lu_object_operations osd_lu_obj_ops = {
         .loo_object_invariant = osd_object_invariant
 };
 
-static const struct lu_device_operations osd_lu_ops = {
+const struct lu_device_operations osd_lu_ops = {
         .ldo_object_alloc      = osd_object_alloc,
         .ldo_process_config    = osd_process_config,
         .ldo_recovery_complete = osd_recovery_complete,
@@ -4236,7 +4212,7 @@ static struct lu_device_type osd_device_type = {
         .ldt_tags     = LU_DEVICE_DT,
         .ldt_name     = LUSTRE_OSD_NAME,
         .ldt_ops      = &osd_device_type_ops,
-        .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+        .ldt_ctx_tags = LCT_LOCAL,
 };
 
 /*
@@ -4246,19 +4222,11 @@ static struct obd_ops osd_obd_device_ops = {
         .o_owner = THIS_MODULE
 };
 
-static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
-        .llod_name      = remote_obj_dir,
-        .llod_oid       = OSD_REM_OBJ_DIR_OID,
-        .llod_is_index  = 1,
-        .llod_feat      = &dt_directory_features,
-};
-
 static int __init osd_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
 
         osd_oi_mod_init();
-        llo_local_obj_register(&llod_osd_rem_obj_dir);
         lprocfs_osd_init_vars(&lvars);
         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
                                    LUSTRE_OSD_NAME, &osd_device_type);
@@ -4266,7 +4234,6 @@ static int __init osd_mod_init(void)
 
 static void __exit osd_mod_exit(void)
 {
-        llo_local_obj_unregister(&llod_osd_rem_obj_dir);
         class_unregister_type(LUSTRE_OSD_NAME);
 }
 
@@ -4274,4 +4241,4 @@ MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
 MODULE_LICENSE("GPL");
 
-cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);
+cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);