Whamcloud - gitweb
LU-1067 obdecho: Recheck client env ctx for echo md client.
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 3645cdc..dfc5b15 100644 (file)
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 /* llo_* api support */
 #include <md_object.h>
 
+#ifdef HAVE_LDISKFS_PDO
+int ldiskfs_pdo = 1;
+CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
+                "ldiskfs with parallel directory operations");
+#else
+int ldiskfs_pdo = 0;
+#endif
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
@@ -98,6 +112,7 @@ struct osd_object {
         /**
          * to protect index ops.
          */
+        struct htree_lock_head *oo_hl_head;
         cfs_rw_semaphore_t     oo_ext_idx_sem;
         cfs_rw_semaphore_t     oo_sem;
         struct osd_directory  *oo_dir;
@@ -123,15 +138,48 @@ static       struct lu_context_key            osd_key;
 static const struct dt_object_operations      osd_obj_ops;
 static const struct dt_object_operations      osd_obj_ea_ops;
 static const struct dt_body_operations        osd_body_ops;
+static const struct dt_body_operations        osd_body_ops_new;
 static const struct dt_index_operations       osd_index_iam_ops;
 static const struct dt_index_operations       osd_index_ea_ops;
 
+#define OSD_TRACK_DECLARES
+#ifdef OSD_TRACK_DECLARES
+#define OSD_DECLARE_OP(oh, op)   {                               \
+        LASSERT(oh->ot_handle == NULL);                          \
+        ((oh)->ot_declare_ ##op)++; }
+#define OSD_EXEC_OP(handle, op)     {                            \
+        struct osd_thandle *oh;                                  \
+        oh = container_of0(handle, struct osd_thandle, ot_super);\
+        if (((oh)->ot_declare_ ##op) > 0) {                      \
+                ((oh)->ot_declare_ ##op)--;                      \
+        }                                                        \
+        }
+#else
+#define OSD_DECLARE_OP(oh, op)
+#define OSD_EXEC_OP(oh, op)
+#endif
+
 struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
         struct journal_callback ot_jcb;
+        cfs_list_t              ot_dcb_list;
         /* Link to the device, for debugging. */
         struct lu_ref_link     *ot_dev_link;
+        int                     ot_credits;
+
+#ifdef OSD_TRACK_DECLARES
+        unsigned char           ot_declare_attr_set;
+        unsigned char           ot_declare_punch;
+        unsigned char           ot_declare_xattr_set;
+        unsigned char           ot_declare_create;
+        unsigned char           ot_declare_destroy;
+        unsigned char           ot_declare_ref_add;
+        unsigned char           ot_declare_ref_del;
+        unsigned char           ot_declare_write;
+        unsigned char           ot_declare_insert;
+        unsigned char           ot_declare_delete;
+#endif
 
 #if OSD_THANDLE_STATS
         /** time when this handle was allocated */
@@ -142,6 +190,25 @@ struct osd_thandle {
 #endif
 };
 
+/**
+ * Basic transaction credit op
+ */
+enum dt_txn_op {
+        DTO_INDEX_INSERT,
+        DTO_INDEX_DELETE,
+        DTO_INDEX_UPDATE,
+        DTO_OBJECT_CREATE,
+        DTO_OBJECT_DELETE,
+        DTO_ATTR_SET_BASE,
+        DTO_XATTR_SET,
+        DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
+        DTO_WRITE_BASE,
+        DTO_WRITE_BLOCK,
+        DTO_ATTR_SET_CHOWN,
+
+        DTO_NR
+};
+
 /*
  * Helpers.
  */
@@ -223,9 +290,10 @@ osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
         if ((tc = prepare_creds())) {
                 tc->fsuid         = uc->mu_fsuid;
                 tc->fsgid         = uc->mu_fsgid;
-                tc->cap_effective = uc->mu_cap;
                 commit_creds(tc);
         }
+        /* XXX not suboptimal */
+        cfs_curproc_cap_unpack(uc->mu_cap);
 }
 
 static inline void
@@ -307,8 +375,9 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
                 cfs_spin_lock_init(&mo->oo_guard);
                 return l;
-        } else
+        } else {
                 return NULL;
+        }
 }
 
 /*
@@ -346,6 +415,14 @@ static struct inode *osd_iget(struct osd_thread_info *info,
                 CERROR("bad inode %lx\n",inode->i_ino);
                 iput(inode);
                 inode = ERR_PTR(-ENOENT);
+        } else {
+                /* Do not update file c/mtime in ldiskfs.
+                 * NB: we don't have any lock to protect this because we don't
+                 * have reference on osd_object now, but contention with
+                 * another lookup + attr_set can't happen in the tiny window
+                 * between if (...) and set S_NOCMTIME. */
+                if (!(inode->i_flags & S_NOCMTIME))
+                        inode->i_flags |= S_NOCMTIME;
         }
         return inode;
 }
@@ -357,13 +434,12 @@ static int osd_fid_lookup(const struct lu_env *env,
         struct lu_device       *ldev = obj->oo_dt.do_lu.lo_dev;
         struct osd_device      *dev;
         struct osd_inode_id    *id;
-        struct osd_oi          *oi;
         struct inode           *inode;
         int                     result;
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
+        LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
         /*
          * This assertion checks that osd layer sees only local
          * fids. Unfortunately it is somewhat expensive (does a
@@ -376,33 +452,48 @@ static int osd_fid_lookup(const struct lu_env *env,
         info = osd_oti_get(env);
         dev  = osd_dev(ldev);
         id   = &info->oti_id;
-        oi   = &dev->od_oi;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
                 RETURN(-ENOENT);
 
-        result = osd_oi_lookup(info, oi, fid, id);
-        if (result == 0) {
-                inode = osd_iget(info, dev, id);
-                if (!IS_ERR(inode)) {
-                        obj->oo_inode = inode;
-                        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
-                        if (dev->od_iop_mode) {
-                                obj->oo_compat_dot_created = 1;
-                                obj->oo_compat_dotdot_created = 1;
-                        }
+        result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
+        if (result != 0) {
+                if (result == -ENOENT)
                         result = 0;
-                } else
-                        /*
-                         * If fid wasn't found in oi, inode-less object is
-                         * created, for which lu_object_exists() returns
-                         * false. This is used in a (frequent) case when
-                         * objects are created as locking anchors or
-                         * place holders for objects yet to be created.
-                         */
-                        result = PTR_ERR(inode);
-        } else if (result == -ENOENT)
-                result = 0;
+                goto out;
+        }
+
+        inode = osd_iget(info, dev, id);
+        if (IS_ERR(inode)) {
+                /*
+                 * If fid wasn't found in oi, inode-less object is
+                 * created, for which lu_object_exists() returns
+                 * false. This is used in a (frequent) case when
+                 * objects are created as locking anchors or
+                 * place holders for objects yet to be created.
+                 */
+                result = PTR_ERR(inode);
+                goto out;
+        }
+
+        obj->oo_inode = inode;
+        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+        if (dev->od_iop_mode) {
+                obj->oo_compat_dot_created = 1;
+                obj->oo_compat_dotdot_created = 1;
+        }
+
+        if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
+                goto out;
+
+        LASSERT(obj->oo_hl_head == NULL);
+        obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+        if (obj->oo_hl_head == NULL) {
+                obj->oo_inode = NULL;
+                iput(inode);
+                result = -ENOMEM;
+        }
+out:
         LINVRNT(osd_invariant(obj));
 
         RETURN(result);
@@ -432,6 +523,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
         LINVRNT(osd_invariant(obj));
 
         result = osd_fid_lookup(env, obj, lu_object_fid(l));
+        obj->oo_dt.do_body_ops = &osd_body_ops_new;
         if (result == 0) {
                 if (obj->oo_inode != NULL)
                         osd_object_init0(obj);
@@ -451,6 +543,8 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         LINVRNT(osd_invariant(obj));
 
         dt_object_fini(&obj->oo_dt);
+        if (obj->oo_hl_head != NULL)
+                ldiskfs_htree_lock_head_free(obj->oo_hl_head);
         OBD_FREE_PTR(obj);
 }
 
@@ -597,36 +691,38 @@ static void __osd_th_check_slow(void *oth, struct osd_device *dev,
  * Concurrency: doesn't access mutable data.
  */
 static int osd_param_is_sane(const struct osd_device *dev,
-                             const struct txn_param *param)
+                             const struct thandle *th)
 {
-        return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+        struct osd_thandle *oh;
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
 }
 
 /*
  * Concurrency: shouldn't matter.
  */
+#ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
+static void osd_trans_commit_cb(struct super_block *sb,
+                                struct journal_callback *jcb, int error)
+#else
 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
+#endif
 {
         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
         struct thandle     *th  = &oh->ot_super;
-        struct dt_device   *dev = th->th_dev;
-        struct lu_device   *lud = &dev->dd_lu_dev;
+        struct lu_device   *lud = &th->th_dev->dd_lu_dev;
+        struct dt_txn_commit_cb *dcb, *tmp;
 
-        LASSERT(dev != NULL);
         LASSERT(oh->ot_handle == NULL);
 
-        if (error) {
+        if (error)
                 CERROR("transaction @0x%p commit error: %d\n", th, error);
-        } else {
-                struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
-                /*
-                 * This od_env_for_commit is only for commit usage.  see
-                 * "struct dt_device"
-                 */
-                lu_context_enter(&env->le_ctx);
-                dt_txn_hook_commit(env, th);
-                lu_context_exit(&env->le_ctx);
-        }
+
+        dt_txn_hook_commit(th);
+
+        /* call per-transaction callbacks if any */
+        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+                dcb->dcb_func(NULL, th, dcb, error);
 
         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
         lu_device_put(lud);
@@ -637,129 +733,176 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
         OBD_FREE_PTR(oh);
 }
 
+static struct thandle *osd_trans_create(const struct lu_env *env,
+                                        struct dt_device *d)
+{
+        struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_thandle     *oh;
+        struct thandle         *th;
+        ENTRY;
+
+        th = ERR_PTR(-ENOMEM);
+        OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
+        if (oh != NULL) {
+                th = &oh->ot_super;
+                th->th_dev = d;
+                th->th_result = 0;
+                th->th_tags = LCT_TX_HANDLE;
+                oh->ot_credits = 0;
+                oti->oti_dev = osd_dt_dev(d);
+                CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
+                osd_th_alloced(oh);
+        }
+        RETURN(th);
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
-static struct thandle *osd_trans_start(const struct lu_env *env,
-                                       struct dt_device *d,
-                                       struct txn_param *p)
+int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+                    struct thandle *th)
 {
+        struct osd_thread_info *oti = osd_oti_get(env);
         struct osd_device  *dev = osd_dt_dev(d);
         handle_t           *jh;
         struct osd_thandle *oh;
-        struct thandle     *th;
-        int hook_res;
+        int rc;
 
         ENTRY;
 
-        hook_res = dt_txn_hook_start(env, d, p);
-        if (hook_res != 0)
-                RETURN(ERR_PTR(hook_res));
+        LASSERT(current->journal_info == NULL);
 
-        if (osd_param_is_sane(dev, p)) {
-                OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
-                if (oh != NULL) {
-                        struct osd_thread_info *oti = osd_oti_get(env);
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh != NULL);
+        LASSERT(oh->ot_handle == NULL);
 
-                        /*
-                         * XXX temporary stuff. Some abstraction layer should
-                         * be used.
-                         */
-                        oti->oti_dev = dev;
-                        osd_th_alloced(oh);
-                        jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
-                        osd_th_started(oh);
-                        if (!IS_ERR(jh)) {
-                                oh->ot_handle = jh;
-                                th = &oh->ot_super;
-                                th->th_dev = d;
-                                th->th_result = 0;
-                                jh->h_sync = p->tp_sync;
-                                lu_device_get(&d->dd_lu_dev);
-                                oh->ot_dev_link = lu_ref_add
-                                        (&d->dd_lu_dev.ld_reference,
-                                         "osd-tx", th);
-                                /* add commit callback */
-                                lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
-                                lu_context_enter(&th->th_ctx);
-                                osd_journal_callback_set(jh, osd_trans_commit_cb,
-                                                         (struct journal_callback *)&oh->ot_jcb);
-                                        LASSERT(oti->oti_txns == 0);
-                                        LASSERT(oti->oti_r_locks == 0);
-                                        LASSERT(oti->oti_w_locks == 0);
-                                        oti->oti_txns++;
-                        } else {
-                                OBD_FREE_PTR(oh);
-                                th = (void *)jh;
-                        }
-                } else
-                        th = ERR_PTR(-ENOMEM);
-        } else {
-                CERROR("Invalid transaction parameters\n");
-                th = ERR_PTR(-EINVAL);
+        rc = dt_txn_hook_start(env, d, th);
+        if (rc != 0)
+                GOTO(out, rc);
+
+        oh->ot_credits += LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev));
+
+        if (!osd_param_is_sane(dev, th)) {
+                CWARN("%s: too many transaction credits (%d > %d)\n",
+                      d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
+                      osd_journal(dev)->j_max_transaction_buffers);
+#ifdef OSD_TRACK_DECLARES
+                CERROR("  attr_set: %d, punch: %d, xattr_set: %d,\n",
+                       oh->ot_declare_attr_set, oh->ot_declare_punch,
+                       oh->ot_declare_xattr_set);
+                CERROR("  create: %d, ref_add: %d, ref_del: %d, write: %d\n",
+                       oh->ot_declare_create, oh->ot_declare_ref_add,
+                       oh->ot_declare_ref_del, oh->ot_declare_write);
+                CERROR("  insert: %d, delete: %d\n",
+                       oh->ot_declare_insert, oh->ot_declare_delete);
+#endif
         }
 
-        RETURN(th);
+        /*
+         * XXX temporary stuff. Some abstraction layer should
+         * be used.
+         */
+        jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
+        osd_th_started(oh);
+        if (!IS_ERR(jh)) {
+                oh->ot_handle = jh;
+                LASSERT(oti->oti_txns == 0);
+                lu_context_init(&th->th_ctx, th->th_tags);
+                lu_context_enter(&th->th_ctx);
+
+                lu_device_get(&d->dd_lu_dev);
+                oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
+                                             "osd-tx", th);
+
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&d->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+
+                oti->oti_txns++;
+                rc = 0;
+        } else {
+                rc = PTR_ERR(jh);
+        }
+out:
+        RETURN(rc);
 }
 
 /*
  * Concurrency: shouldn't matter.
  */
-static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
 {
-        int result;
-        struct osd_thandle *oh;
+        int                     rc = 0;
+        struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
 
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
+
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
+                hdl->h_sync = th->th_sync;
+
+                /*
+                 * add commit callback
+                 * notice we don't do this in osd_trans_start()
+                 * as underlying transaction can change during truncate
+                 */
+                osd_journal_callback_set(hdl, osd_trans_commit_cb,
+                                         &oh->ot_jcb);
+
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
-                LASSERT(oti->oti_r_locks == 0);
-                LASSERT(oti->oti_w_locks == 0);
-                result = dt_txn_hook_stop(env, th);
-                if (result != 0)
-                        CERROR("Failure in transaction hook: %d\n", result);
+                /*
+                 * XXX: current rule is that we first start tx,
+                 *      then lock object(s), but we can't use
+                 *      this rule for data (due to locking specifics
+                 *      in ldiskfs). also in long-term we'd like to
+                 *      use usually-used (locks;tx) ordering. so,
+                 *      UGLY thing is that we'll use one ordering for
+                 *      data (ofd) and reverse ordering for metadata
+                 *      (mdd). then at some point we'll fix the latter
+                 */
+                if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
+                        LASSERT(oti->oti_r_locks == 0);
+                        LASSERT(oti->oti_w_locks == 0);
+                }
+                rc = dt_txn_hook_stop(env, th);
+                if (rc != 0)
+                        CERROR("Failure in transaction hook: %d\n", rc);
                 oh->ot_handle = NULL;
                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
-                                  result = ldiskfs_journal_stop(hdl));
-                if (result != 0)
-                        CERROR("Failure to stop transaction: %d\n", result);
+                                  rc = ldiskfs_journal_stop(hdl));
+                if (rc != 0)
+                        CERROR("Failure to stop transaction: %d\n", rc);
+        } else {
+                OBD_FREE_PTR(oh);
         }
-        EXIT;
+
+        RETURN(rc);
 }
 
-/*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle.
- */
-static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
+static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 {
-        const struct lu_fid    *fid = lu_object_fid(&obj->oo_dt.do_lu);
-        struct osd_device      *osd = osd_obj2dev(obj);
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct txn_param       *prm = &oti->oti_txn;
-        struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
-        struct thandle         *th;
-        int result;
+        struct osd_thandle *oh = container_of0(th, struct osd_thandle,
+                                               ot_super);
 
-        lu_env_init(env_del_obj, LCT_DT_THREAD);
-        txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
-                            OSD_TXN_INODE_DELETE_CREDITS);
-        th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
-        if (!IS_ERR(th)) {
-                result = osd_oi_delete(osd_oti_get(env_del_obj),
-                                       &osd->od_oi, fid, th);
-                osd_trans_stop(env_del_obj, th);
-        } else
-                result = PTR_ERR(th);
+        cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
 
-        lu_env_fini(env_del_obj);
-        return result;
+        return 0;
 }
 
 /*
@@ -782,16 +925,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
         osd_index_fini(obj);
         if (inode != NULL) {
-                int result;
-
-                if (osd_inode_unlinked(inode)) {
-                        result = osd_inode_remove(env, obj);
-                        if (result != 0)
-                                LU_OBJECT_DEBUG(D_ERROR, env, l,
-                                                "Failed to cleanup: %d\n",
-                                                result);
-                }
-
                 iput(inode);
                 obj->oo_inode = NULL;
         }
@@ -803,11 +936,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 static void osd_object_release(const struct lu_env *env,
                                struct lu_object *l)
 {
-        struct osd_object *o = osd_obj(l);
-
-        LASSERT(!lu_object_is_dying(l->lo_header));
-        if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
-                cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
 }
 
 /*
@@ -862,12 +990,27 @@ static void osd_conf_get(const struct lu_env *env,
                          const struct dt_device *dev,
                          struct dt_device_param *param)
 {
+        struct super_block *sb = osd_sb(osd_dt_dev(dev));
+
         /*
          * XXX should be taken from not-yet-existing fs abstraction layer.
          */
-        param->ddp_max_name_len  = LDISKFS_NAME_LEN;
-        param->ddp_max_nlink     = LDISKFS_LINK_MAX;
-        param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+        param->ddp_max_name_len = LDISKFS_NAME_LEN;
+        param->ddp_max_nlink    = LDISKFS_LINK_MAX;
+        param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+        param->ddp_mntopts      = 0;
+        if (test_opt(sb, XATTR_USER))
+                param->ddp_mntopts |= MNTOPT_USERXATTR;
+        if (test_opt(sb, POSIX_ACL))
+                param->ddp_mntopts |= MNTOPT_ACL;
+
+#if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
+        if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
+                param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
+        else
+#endif
+                param->ddp_max_ea_size = sb->s_blocksize;
+
 }
 
 /**
@@ -917,20 +1060,18 @@ static int osd_commit_async(const struct lu_env *env,
 /*
  * Concurrency: shouldn't matter.
  */
-lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
 
 static void osd_ro(const struct lu_env *env, struct dt_device *d)
 {
+        struct super_block *sb = osd_sb(osd_dt_dev(d));
         ENTRY;
 
         CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
 
-        __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
-                          fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
+        __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
         EXIT;
 }
 
-
 /*
  * Concurrency: serialization provided by callers.
  */
@@ -984,7 +1125,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         /**
          * Unused now
          */
-        [DTO_IDNEX_UPDATE]  = 16,
+        [DTO_INDEX_UPDATE]  = 16,
         /**
          * Create a object. The same as create object in EXT3.
          * DATA_TRANS_BLOCKS(14) +
@@ -993,14 +1134,13 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
          */
         [DTO_OBJECT_CREATE] = 25,
         /**
-         * Unused now
+         * XXX: real credits to be fixed
          */
         [DTO_OBJECT_DELETE] = 25,
         /**
-         * Attr set credits.
-         * 3(inode bits, group, GDT)
+         * Attr set credits (inode)
          */
-        [DTO_ATTR_SET_BASE] = 3,
+        [DTO_ATTR_SET_BASE] = 1,
         /**
          * Xattr set. The same as xattr of EXT3.
          * DATA_TRANS_BLOCKS(14)
@@ -1010,7 +1150,7 @@ static const int osd_dto_credits_noquota[DTO_NR] = {
         [DTO_XATTR_SET]     = 14,
         [DTO_LOG_REC]       = 14,
         /**
-         * creadits for inode change during write.
+         * credits for inode change during write.
          */
         [DTO_WRITE_BASE]    = 3,
         /**
@@ -1044,7 +1184,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         /**
          * Unused now.
          */
-        [DTO_IDNEX_UPDATE]  = 16,
+        [DTO_INDEX_UPDATE]  = 16,
         /*
          * Create a object. Same as create object in EXT3 filesystem.
          * DATA_TRANS_BLOCKS(16) +
@@ -1091,30 +1231,17 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         [DTO_ATTR_SET_CHOWN]= 68,
 };
 
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
-                          enum dt_txn_op op)
-{
-        LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
-                ARRAY_SIZE(osd_dto_credits_quota));
-        LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
-#ifdef HAVE_QUOTA_SUPPORT
-        if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
-                return osd_dto_credits_quota[op];
-        else
-#endif
-                return osd_dto_credits_noquota[op];
-}
-
 static const struct dt_device_operations osd_dt_ops = {
         .dt_root_get       = osd_root_get,
         .dt_statfs         = osd_statfs,
+        .dt_trans_create   = osd_trans_create,
         .dt_trans_start    = osd_trans_start,
         .dt_trans_stop     = osd_trans_stop,
+        .dt_trans_cb_add   = osd_trans_cb_add,
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
         .dt_commit_async   = osd_commit_async,
-        .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt,
         .dt_init_quota_ctxt= osd_init_quota_ctxt,
 };
@@ -1342,6 +1469,25 @@ static int osd_attr_get(const struct lu_env *env,
         return 0;
 }
 
+static int osd_declare_attr_set(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_attr *attr,
+                                struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+        LASSERT(osd_invariant(obj));
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, attr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
+}
+
 static int osd_inode_setattr(const struct lu_env *env,
                              struct inode *inode, const struct lu_attr *attr)
 {
@@ -1366,7 +1512,7 @@ static int osd_inode_setattr(const struct lu_env *env,
                 iattr.ia_uid = attr->la_uid;
                 iattr.ia_gid = attr->la_gid;
                 osd_push_ctxt(env, save);
-                rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
+                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
                 osd_pop_ctxt(save);
                 if (rc != 0)
                         return rc;
@@ -1402,8 +1548,11 @@ static int osd_inode_setattr(const struct lu_env *env,
         if (bits & LA_RDEV)
                 inode->i_rdev   = attr->la_rdev;
 
-        if (bits & LA_FLAGS)
-                inode->i_flags = ll_ext_to_inode_flags(attr->la_flags);
+        if (bits & LA_FLAGS) {
+                /* always keep S_NOCMTIME */
+                inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
+                                 S_NOCMTIME;
+        }
         return 0;
 }
 
@@ -1423,12 +1572,14 @@ static int osd_attr_set(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, attr_set);
+
         cfs_spin_lock(&obj->oo_guard);
         rc = osd_inode_setattr(env, obj->oo_inode, attr);
         cfs_spin_unlock(&obj->oo_guard);
 
         if (!rc)
-                mark_inode_dirty(obj->oo_inode);
+                obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
         return rc;
 }
 
@@ -1447,6 +1598,8 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
                            struct lu_attr *attr, struct thandle *th)
 {
         osd_object_init0(obj);
+        if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
+                unlock_new_inode(obj->oo_inode);
         return 0;
 }
 
@@ -1487,6 +1640,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
+        LASSERT(obj->oo_hl_head == NULL);
+
+        if (S_ISDIR(mode) && ldiskfs_pdo) {
+                obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+                if (obj->oo_hl_head == NULL)
+                        return -ENOMEM;
+        }
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
@@ -1508,10 +1668,19 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         osd_pop_ctxt(save);
 #endif
         if (!IS_ERR(inode)) {
+                /* Do not update file c/mtime in ldiskfs.
+                 * NB: don't need any lock because no contention at this
+                 * early stage */
+                inode->i_flags |= S_NOCMTIME;
                 obj->oo_inode = inode;
                 result = 0;
-        } else
+        } else {
+                if (obj->oo_hl_head != NULL) {
+                        ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+                        obj->oo_hl_head = NULL;
+                }
                 result = PTR_ERR(inode);
+        }
         LINVRNT(osd_invariant(obj));
         return result;
 }
@@ -1719,10 +1888,37 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
         id->oii_ino = obj->oo_inode->i_ino;
         id->oii_gen = obj->oo_inode->i_generation;
 
-        return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+        return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
                              uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
 }
 
+static int osd_declare_object_create(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_attr *attr,
+                                     struct dt_allocation_hint *hint,
+                                     struct dt_object_format *dof,
+                                     struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        OSD_DECLARE_OP(oh, create);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        /* if this is directory, then we expect . and ..
+         * to be inserted as well */
+        OSD_DECLARE_OP(oh, insert);
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+        return 0;
+}
+
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                              struct lu_attr *attr,
                              struct dt_allocation_hint *hint,
@@ -1741,6 +1937,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, create);
+
         result = __osd_object_create(info, obj, attr, hint, dof, th);
         if (result == 0)
                 result = __osd_oi_insert(env, obj, fid, th);
@@ -1751,6 +1949,74 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
+ * Called to destroy on-disk representation of the object
+ *
+ * Concurrency: must be locked
+ */
+static int osd_declare_object_destroy(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct thandle *th)
+{
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct inode           *inode = obj->oo_inode;
+        struct osd_thandle     *oh;
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+        LASSERT(inode);
+        LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
+
+        OSD_DECLARE_OP(oh, destroy);
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        RETURN(0);
+}
+
+static int osd_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
+{
+        const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj = osd_dt_obj(dt);
+        struct inode           *inode = obj->oo_inode;
+        struct osd_device      *osd = osd_obj2dev(obj);
+        struct osd_thandle     *oh;
+        int                     result;
+        ENTRY;
+
+        oh = container_of0(th, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle);
+        LASSERT(inode);
+
+        if (S_ISDIR(inode->i_mode)) {
+                LASSERT(osd_inode_unlinked(inode) ||
+                        inode->i_nlink == 1);
+                cfs_spin_lock(&obj->oo_guard);
+                inode->i_nlink = 0;
+                cfs_spin_unlock(&obj->oo_guard);
+                inode->i_sb->s_op->dirty_inode(inode);
+        } else {
+                LASSERT(osd_inode_unlinked(inode));
+        }
+
+        OSD_EXEC_OP(th, destroy);
+
+        result = osd_oi_delete(osd_oti_get(env),
+                               osd_fid2oi(osd, fid), fid, th);
+
+        /* XXX: add to ext3 orphan list */
+        /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
+
+        /* not needed in the cache anymore */
+        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+        RETURN(0);
+}
+
+/**
  * Helper function for osd_xattr_set()
  */
 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
@@ -1760,7 +2026,6 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         struct inode           *inode    = obj->oo_inode;
         struct osd_thread_info *info     = osd_oti_get(env);
         struct dentry          *dentry   = &info->oti_child_dentry;
-        struct timespec        *t        = &info->oti_time;
         int                     fs_flags = 0;
         int  rc;
 
@@ -1775,14 +2040,8 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                 fs_flags |= XATTR_CREATE;
 
         dentry->d_inode = inode;
-        *t = inode->i_ctime;
         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
                                    buf->lb_len, fs_flags);
-        /* ctime should not be updated with server-side time. */
-        cfs_spin_lock(&obj->oo_guard);
-        inode->i_ctime = *t;
-        cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -1950,6 +2209,8 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, create);
+
         result = __osd_object_create(info, obj, attr, hint, dof, th);
 
         /* objects under osd root shld have igif fid, so dont add fid EA */
@@ -1964,12 +2225,30 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         RETURN(result);
 }
 
+static int osd_declare_object_ref_add(const struct lu_env *env,
+                               struct dt_object *dt,
+                               struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        /* it's possible that object doesn't exist yet */
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_add);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_add(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_add(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -1979,20 +2258,57 @@ static void osd_object_ref_add(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_add);
+
+        /*
+         * DIR_NLINK feature is set for compatibility reasons if:
+         * 1) nlinks > LDISKFS_LINK_MAX, or
+         * 2) nlinks == 2, since this indicates i_nlink was previously 1.
+         *
+         * It is easier to always set this flag (rather than check and set),
+         * since it has less overhead, and the superblock will be dirtied
+         * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
+         * do not actually care whether this flag is set or not.
+         */
         cfs_spin_lock(&obj->oo_guard);
-        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         inode->i_nlink++;
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
+                if (inode->i_nlink >= LDISKFS_LINK_MAX ||
+                    inode->i_nlink == 2)
+                        inode->i_nlink = 1;
+        }
+        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
+        inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+static int osd_declare_object_ref_del(const struct lu_env *env,
+                               struct dt_object *dt,
+                               struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, ref_del);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+        return 0;
 }
 
 /*
  * Concurrency: @dt is write locked.
  */
-static void osd_object_ref_del(const struct lu_env *env,
-                               struct dt_object *dt,
-                               struct thandle *th)
+static int osd_object_ref_del(const struct lu_env *env,
+                              struct dt_object *dt,
+                              struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
@@ -2002,12 +2318,35 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
+        OSD_EXEC_OP(th, ref_del);
+
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
+        /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
+         * then the nlink count is 1. Don't let it be set to 0 or the directory
+         * inode will be deleted incorrectly. */
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
+                inode->i_nlink++;
         cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
+        inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
+
+        return 0;
+}
+
+/*
+ * Get the 64-bit version for an inode.
+ */
+static int osd_object_version_get(const struct lu_env *env,
+                                  struct dt_object *dt, dt_obj_version_t *ver)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        *ver = LDISKFS_I(inode)->i_fs_version;
+        return 0;
 }
 
 /*
@@ -2024,6 +2363,15 @@ static int osd_xattr_get(const struct lu_env *env,
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
 
+        /* version get is not real XATTR but uses xattr API */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_get(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
@@ -2035,6 +2383,47 @@ static int osd_xattr_get(const struct lu_env *env,
         return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
 }
 
+
+static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                                 const struct lu_buf *buf, const char *name,
+                                 int fl, struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* no credits for version */
+                return 0;
+        }
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
+}
+
+/*
+ * Set the 64-bit version for object
+ */
+static void osd_object_version_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   dt_obj_version_t *new_version)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+
+        LDISKFS_I(inode)->i_fs_version = *new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
@@ -2044,9 +2433,19 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
 {
         LASSERT(handle != NULL);
 
+        /* version set is not real XATTR */
+        if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+                /* for version we are just using xattr API but change inode
+                 * field instead */
+                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+                osd_object_version_set(env, dt, buf->lb_buf);
+                return sizeof(dt_obj_version_t);
+        }
+
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
         return __osd_xattr_set(env, dt, buf, name, fl);
 }
 
@@ -2074,6 +2473,25 @@ static int osd_xattr_list(const struct lu_env *env,
         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
 }
 
+static int osd_declare_xattr_del(const struct lu_env *env,
+                                 struct dt_object *dt,
+                                const char *name,
+                                struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, xattr_set);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+        return 0;
+}
+
 /*
  * Concurrency: @dt is write locked.
  */
@@ -2087,7 +2505,6 @@ static int osd_xattr_del(const struct lu_env *env,
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
-        struct timespec        *t      = &info->oti_time;
         int                     rc;
 
         LASSERT(dt_object_exists(dt));
@@ -2098,14 +2515,10 @@ static int osd_xattr_del(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+        OSD_EXEC_OP(handle, xattr_set);
+
         dentry->d_inode = inode;
-        *t = inode->i_ctime;
         rc = inode->i_op->removexattr(dentry, name);
-        /* ctime should not be updated with server-side time. */
-        cfs_spin_lock(&obj->oo_guard);
-        inode->i_ctime = *t;
-        cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -2216,35 +2629,6 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
-/*
- * Get the 64-bit version for an inode.
- */
-static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
-                                               struct dt_object *dt)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
-               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        return LDISKFS_I(inode)->i_fs_version;
-}
-
-/*
- * Set the 64-bit version and return the old version.
- */
-static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
-                                   dt_obj_version_t new_version)
-{
-        struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
-        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
-               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
-        LDISKFS_I(inode)->i_fs_version = new_version;
-        /** Version is set after all inode operations are finished,
-         *  so we should mark it dirty here */
-        inode->i_sb->s_op->dirty_inode(inode);
-}
-
 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
                         void **data)
 {
@@ -2293,18 +2677,30 @@ static int osd_iam_container_init(const struct lu_env *env,
                                   struct osd_object *obj,
                                   struct osd_directory *dir)
 {
+        struct iam_container *bag = &dir->od_container;
         int result;
-        struct iam_container *bag;
 
-        bag    = &dir->od_container;
         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
-        if (result == 0) {
-                result = iam_container_setup(bag);
-                if (result == 0)
-                        obj->oo_dt.do_index_ops = &osd_index_iam_ops;
-                else
-                        iam_container_fini(bag);
+        if (result != 0)
+                return result;
+
+        result = iam_container_setup(bag);
+        if (result != 0)
+                goto out;
+
+        if (osd_obj2dev(obj)->od_iop_mode) {
+                u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag);
+
+                bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode,
+                                                ptr, 0, &result);
         }
+
+ out:
+        if (result == 0)
+                obj->oo_dt.do_index_ops = &osd_index_iam_ops;
+        else
+                iam_container_fini(bag);
+
         return result;
 }
 
@@ -2361,10 +2757,12 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                         else
                                 result = 0;
                         cfs_up_write(&obj->oo_ext_idx_sem);
-                } else
+                } else {
                         result = -ENOMEM;
-        } else
+                }
+        } else {
                 result = 0;
+        }
 
         if (result == 0 && ea_dir == 0) {
                 if (!osd_iam_index_probe(env, obj, feat))
@@ -2376,27 +2774,33 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 }
 
 static const struct dt_object_operations osd_obj_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
 /**
@@ -2404,27 +2808,33 @@ static const struct dt_object_operations osd_obj_ops = {
  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_object_operations osd_obj_ea_ops = {
-        .do_read_lock    = osd_object_read_lock,
-        .do_write_lock   = osd_object_write_lock,
-        .do_read_unlock  = osd_object_read_unlock,
-        .do_write_unlock = osd_object_write_unlock,
-        .do_write_locked = osd_object_write_locked,
-        .do_attr_get     = osd_attr_get,
-        .do_attr_set     = osd_attr_set,
-        .do_ah_init      = osd_ah_init,
-        .do_create       = osd_object_ea_create,
-        .do_index_try    = osd_index_try,
-        .do_ref_add      = osd_object_ref_add,
-        .do_ref_del      = osd_object_ref_del,
-        .do_xattr_get    = osd_xattr_get,
-        .do_xattr_set    = osd_xattr_set,
-        .do_xattr_del    = osd_xattr_del,
-        .do_xattr_list   = osd_xattr_list,
-        .do_capa_get     = osd_capa_get,
-        .do_object_sync  = osd_object_sync,
-        .do_version_get  = osd_object_version_get,
-        .do_version_set  = osd_object_version_set,
-        .do_data_get     = osd_data_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_write_locked      = osd_object_write_locked,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_ah_init           = osd_ah_init,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_ea_create,
+        .do_declare_destroy   = osd_declare_object_destroy,
+        .do_destroy           = osd_object_destroy,
+        .do_index_try         = osd_index_try,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
+        .do_declare_xattr_set = osd_declare_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
+        .do_declare_xattr_del = osd_declare_xattr_del,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
+        .do_object_sync       = osd_object_sync,
+        .do_data_get          = osd_data_get,
 };
 
 /*
@@ -2603,6 +3013,23 @@ static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
         return err;
 }
 
+static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
+                                 const loff_t size, loff_t pos,
+                                 struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, write);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+
+        return 0;
+}
+
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, loff_t *pos,
                          struct thandle *handle, struct lustre_capa *capa,
@@ -2613,7 +3040,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle *oh;
         ssize_t            result = 0;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t           save = current->cap_effective;
+        cfs_cap_t           save = cfs_curproc_cap_pack();
 #endif
 
         LASSERT(handle != NULL);
@@ -2625,9 +3052,9 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         LASSERT(oh->ot_handle->h_transaction != NULL);
 #ifdef HAVE_QUOTA_SUPPORT
         if (ignore_quota)
-                current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
         else
-                current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
         /* Write small symlink to inode body as we need to maintain correct
          * on-disk symlinks for ldiskfs.
@@ -2640,18 +3067,42 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                                                   buf->lb_len, pos,
                                                   oh->ot_handle);
 #ifdef HAVE_QUOTA_SUPPORT
-        current->cap_effective = save;
+        cfs_curproc_cap_unpack(save);
 #endif
         if (result == 0)
                 result = buf->lb_len;
         return result;
 }
 
+/*
+ * in some cases we may need declare methods for objects being created
+ * e.g., when we create symlink
+ */
+static const struct dt_body_operations osd_body_ops_new = {
+        .dbo_declare_write = osd_declare_write,
+};
+
 static const struct dt_body_operations osd_body_ops = {
-        .dbo_read  = osd_read,
-        .dbo_write = osd_write
+        .dbo_read          = osd_read,
+        .dbo_declare_write = osd_declare_write,
+        .dbo_write         = osd_write
 };
 
+static int osd_index_declare_iam_delete(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
+{
+        struct osd_thandle    *oh;
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        return 0;
+}
 
 /**
  *      delete a (key, value) pair from index \a dt specified by \a key
@@ -2685,6 +3136,8 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
                 RETURN(-EACCES);
 
+        OSD_EXEC_OP(handle, delete);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -2699,6 +3152,25 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_ea_delete(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, delete);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+        return 0;
+}
+
 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
                                           struct dt_rec *fid)
 {
@@ -2732,6 +3204,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle         *oh;
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
+        struct htree_lock          *hlock = NULL;
 
         int rc;
 
@@ -2741,6 +3214,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         LASSERT(dt_object_exists(dt));
         LASSERT(handle != NULL);
 
+        OSD_EXEC_OP(handle, delete);
+
         oh = container_of(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle != NULL);
         LASSERT(oh->ot_handle->h_transaction != NULL);
@@ -2751,28 +3226,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_write(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
-        if (bh) {
-                struct osd_thread_info *oti = osd_oti_get(env);
-                struct timespec *ctime = &oti->oti_time;
-                struct timespec *mtime = &oti->oti_time2;
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_DEL);
+        } else {
+                cfs_down_write(&obj->oo_ext_idx_sem);
+        }
 
-                *ctime = dir->i_ctime;
-                *mtime = dir->i_mtime;
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
+        if (bh) {
                 rc = ldiskfs_delete_entry(oh->ot_handle,
-                                dir, de, bh);
-                /* xtime should not be updated with server-side time. */
-                cfs_spin_lock(&obj->oo_guard);
-                dir->i_ctime = *ctime;
-                dir->i_mtime = *mtime;
-                cfs_spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(dir);
+                                          dir, de, bh);
                 brelse(bh);
-        } else
+        } else {
                 rc = -ENOENT;
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_write(&obj->oo_ext_idx_sem);
 
-        cfs_up_write(&obj->oo_ext_idx_sem);
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -2836,6 +3310,26 @@ static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
+static int osd_index_declare_iam_insert(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct dt_rec *rec,
+                                        const struct dt_key *key,
+                                        struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        return 0;
+}
+
 /**
  *      Inserts (key, value) pair in \a dt index object.
  *
@@ -2857,7 +3351,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle    *oh;
         struct iam_container  *bag = &obj->oo_dir->od_container;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t              save = current->cap_effective;
+        cfs_cap_t              save = cfs_curproc_cap_pack();
 #endif
         struct osd_thread_info *oti = osd_oti_get(env);
         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
@@ -2873,6 +3367,8 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                 return -EACCES;
 
+        OSD_EXEC_OP(th, insert);
+
         ipd = osd_idx_ipd_get(env, bag);
         if (unlikely(ipd == NULL))
                 RETURN(-ENOMEM);
@@ -2882,9 +3378,9 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         LASSERT(oh->ot_handle->h_transaction != NULL);
 #ifdef HAVE_QUOTA_SUPPORT
         if (ignore_quota)
-                current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
         else
-                current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
         if (S_ISDIR(obj->oo_inode->i_mode))
                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
@@ -2893,7 +3389,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
                         iam_rec, ipd);
 #ifdef HAVE_QUOTA_SUPPORT
-        current->cap_effective = save;
+        cfs_curproc_cap_unpack(save);
 #endif
         osd_ipd_put(env, bag, ipd);
         LINVRNT(osd_invariant(obj));
@@ -2913,6 +3409,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
                             struct inode  *cinode,
                             const char *name,
                             const struct dt_rec *fid,
+                            struct htree_lock *hlock,
                             struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
@@ -2927,13 +3424,13 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
 
         if (fid_is_igif((struct lu_fid *)fid) ||
-            fid_seq((struct lu_fid *)fid) >= FID_SEQ_NORMAL) {
+            fid_is_norm((struct lu_fid *)fid)) {
                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
                 osd_get_ldiskfs_dirent_param(ldp, fid);
                 child->d_fsdata = (void*) ldp;
         } else
                 child->d_fsdata = NULL;
-        rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+        rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
 
         RETURN(rc);
 }
@@ -2991,11 +3488,11 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                 /* in case of rename, dotdot is already created */
                 if (dir->oo_compat_dotdot_created) {
                         return __osd_ea_add_rec(info, dir, parent_dir, name,
-                                                dot_dot_fid, th);
+                                                dot_dot_fid, NULL, th);
                 }
 
-                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
-                                                dot_ldp, dot_dot_ldp);
+                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+                                                inode, dot_ldp, dot_dot_ldp);
                 if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
         }
@@ -3016,15 +3513,37 @@ static int osd_ea_add_rec(const struct lu_env *env,
                           struct thandle *th)
 {
         struct osd_thread_info    *info   = osd_oti_get(env);
+        struct htree_lock         *hlock;
         int rc;
 
+        hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
+
         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
-                                                   name[2] =='\0')))
+                                                   name[2] =='\0'))) {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, 0);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
                                         fid, th);
+        } else {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, LDISKFS_HLOCK_ADD);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
+
+                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
+                                      hlock, th);
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
         else
-                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
+                cfs_up_write(&pobj->oo_ext_idx_sem);
 
         return rc;
 }
@@ -3045,6 +3564,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct lu_fid              *fid = (struct lu_fid *) rec;
+        struct htree_lock          *hlock = NULL;
         int ino;
         int rc;
 
@@ -3053,8 +3573,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_LOOKUP);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
                 rc = osd_get_fid_from_dentry(de, rec);
@@ -3063,10 +3590,14 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                 brelse(bh);
                 if (rc != 0)
                         rc = osd_ea_fid_get(env, obj, ino, fid);
-        } else
+        } else {
                 rc = -ENOENT;
+        }
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
         RETURN (rc);
 }
 
@@ -3126,6 +3657,26 @@ static inline void osd_object_put(const struct lu_env *env,
         lu_object_put(env, &obj->oo_dt.do_lu);
 }
 
+static int osd_index_declare_ea_insert(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key,
+                                       struct thandle *handle)
+{
+        struct osd_thandle *oh;
+
+        LASSERT(dt_object_exists(dt));
+        LASSERT(handle != NULL);
+
+        oh = container_of0(handle, struct osd_thandle, ot_super);
+        LASSERT(oh->ot_handle == NULL);
+
+        OSD_DECLARE_OP(oh, insert);
+        oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+        return 0;
+}
+
 /**
  * Index add function for interoperability mode (b11826).
  * It will add the directory entry.This entry is needed to
@@ -3147,7 +3698,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
         const char               *name  = (const char *)key;
         struct osd_object        *child;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t                 save  = current->cap_effective;
+        cfs_cap_t                 save  = cfs_curproc_cap_pack();
 #endif
         int rc;
 
@@ -3162,32 +3713,17 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
 
         child = osd_object_find(env, dt, fid);
         if (!IS_ERR(child)) {
-                struct inode *inode = obj->oo_inode;
-                struct osd_thread_info *oti = osd_oti_get(env);
-                struct timespec *ctime = &oti->oti_time;
-                struct timespec *mtime = &oti->oti_time2;
-
-                *ctime = inode->i_ctime;
-                *mtime = inode->i_mtime;
 #ifdef HAVE_QUOTA_SUPPORT
                 if (ignore_quota)
-                        current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                        cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
                 else
-                        current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                        cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
-                cfs_down_write(&obj->oo_ext_idx_sem);
                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
-                cfs_up_write(&obj->oo_ext_idx_sem);
 #ifdef HAVE_QUOTA_SUPPORT
-                current->cap_effective = save;
+                cfs_curproc_cap_unpack(save);
 #endif
                 osd_object_put(env, child);
-                /* xtime should not be updated with server-side time. */
-                cfs_spin_lock(&obj->oo_guard);
-                inode->i_ctime = *ctime;
-                inode->i_mtime = *mtime;
-                cfs_spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
         } else {
                 rc = PTR_ERR(child);
         }
@@ -3203,8 +3739,9 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
  */
 
 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
-                                 struct dt_object *dt,
-                                 struct lustre_capa *capa)
+                                     struct dt_object *dt,
+                                     __u32 unused,
+                                     struct lustre_capa *capa)
 {
         struct osd_it_iam         *it;
         struct osd_thread_info *oti = osd_oti_get(env);
@@ -3366,13 +3903,14 @@ static inline void osd_it_pack_dirent(struct lu_dirent *ent,
  */
 static int osd_it_iam_rec(const struct lu_env *env,
                           const struct dt_it *di,
-                          struct lu_dirent *lde,
+                          struct dt_rec *dtrec,
                           __u32 attr)
 {
         struct osd_it_iam *it        = (struct osd_it_iam *)di;
         struct osd_thread_info *info = osd_oti_get(env);
         struct lu_fid     *fid       = &info->oti_fid;
         const struct osd_fid_pack *rec;
+        struct lu_dirent *lde = (struct lu_dirent *)dtrec;
         char *name;
         int namelen;
         __u64 hash;
@@ -3431,9 +3969,11 @@ static int osd_it_iam_load(const struct lu_env *env,
 }
 
 static const struct dt_index_operations osd_index_iam_ops = {
-        .dio_lookup = osd_index_iam_lookup,
-        .dio_insert = osd_index_iam_insert,
-        .dio_delete = osd_index_iam_delete,
+        .dio_lookup         = osd_index_iam_lookup,
+        .dio_declare_insert = osd_index_declare_iam_insert,
+        .dio_insert         = osd_index_iam_insert,
+        .dio_declare_delete = osd_index_declare_iam_delete,
+        .dio_delete         = osd_index_iam_delete,
         .dio_it     = {
                 .init     = osd_it_iam_init,
                 .fini     = osd_it_iam_fini,
@@ -3456,6 +3996,7 @@ static const struct dt_index_operations osd_index_iam_ops = {
  */
 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
                                     struct dt_object *dt,
+                                    __u32 attr,
                                     struct lustre_capa *capa)
 {
         struct osd_object       *obj  = osd_dt_obj(dt);
@@ -3477,6 +4018,10 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
         it->oie_obj             = obj;
         it->oie_file.f_pos      = 0;
         it->oie_file.f_dentry   = obj_dentry;
+        if (attr & LUDA_64BITHASH)
+                it->oie_file.f_flags = O_64BITHASH;
+        else
+                it->oie_file.f_flags = O_32BITHASH;
         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
         it->oie_file.f_op         = obj->oo_inode->i_fop;
         it->oie_file.private_data = NULL;
@@ -3596,22 +4141,34 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
  * \retval   0 on success
  * \retval -ve on error
  */
-static int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct lu_env *env,
+                               const struct dt_it *di)
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
         struct inode       *inode = obj->oo_inode;
-        int                result = 0;
+        struct htree_lock  *hlock = NULL;
+        int                 result = 0;
 
         ENTRY;
         it->oie_dirent = it->oie_buf;
         it->oie_rd_dirent = 0;
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   inode, LDISKFS_HLOCK_READDIR);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
 
         if (it->oie_rd_dirent == 0) {
                 result = -EIO;
@@ -3652,7 +4209,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
                         rc = +1;
                 else
-                        rc = osd_ldiskfs_it_fill(di);
+                        rc = osd_ldiskfs_it_fill(env, di);
         }
 
         RETURN(rc);
@@ -3701,12 +4258,13 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
  */
 static inline int osd_it_ea_rec(const struct lu_env *env,
                                 const struct dt_it *di,
-                                struct lu_dirent *lde,
+                                struct dt_rec *dtrec,
                                 __u32 attr)
 {
         struct osd_it_ea        *it     = (struct osd_it_ea *)di;
         struct osd_object       *obj    = it->oie_obj;
         struct lu_fid           *fid    = &it->oie_dirent->oied_fid;
+        struct lu_dirent        *lde    = (struct lu_dirent *)dtrec;
         int    rc = 0;
 
         ENTRY;
@@ -3757,7 +4315,7 @@ static int osd_it_ea_load(const struct lu_env *env,
         ENTRY;
         it->oie_file.f_pos = hash;
 
-        rc =  osd_ldiskfs_it_fill(di);
+        rc =  osd_ldiskfs_it_fill(env, di);
         if (rc == 0)
                 rc = +1;
 
@@ -3799,9 +4357,11 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
  */
 static const struct dt_index_operations osd_index_ea_ops = {
-        .dio_lookup = osd_index_ea_lookup,
-        .dio_insert = osd_index_ea_insert,
-        .dio_delete = osd_index_ea_delete,
+        .dio_lookup         = osd_index_ea_lookup,
+        .dio_declare_insert = osd_index_declare_ea_insert,
+        .dio_insert         = osd_index_ea_insert,
+        .dio_declare_delete = osd_index_declare_ea_delete,
+        .dio_delete         = osd_index_ea_delete,
         .dio_it     = {
                 .init     = osd_it_ea_init,
                 .fini     = osd_it_ea_fini,
@@ -3822,19 +4382,26 @@ static void *osd_key_init(const struct lu_context *ctx,
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
-        if (info != NULL) {
-                OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
-                if (info->oti_it_ea_buf != NULL) {
-                        info->oti_env = container_of(ctx, struct lu_env,
-                                                     le_ctx);
-                } else {
-                        OBD_FREE_PTR(info);
-                        info = ERR_PTR(-ENOMEM);
-                }
-        } else {
-                info = ERR_PTR(-ENOMEM);
-        }
+        if (info == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+        if (info->oti_it_ea_buf == NULL)
+                goto out_free_info;
+
+        info->oti_env = container_of(ctx, struct lu_env, le_ctx);
+
+        info->oti_hlock = ldiskfs_htree_lock_alloc();
+        if (info->oti_hlock == NULL)
+                goto out_free_ea;
+
         return info;
+
+ out_free_ea:
+        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ out_free_info:
+        OBD_FREE_PTR(info);
+        return ERR_PTR(-ENOMEM);
 }
 
 static void osd_key_fini(const struct lu_context *ctx,
@@ -3842,6 +4409,8 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
         struct osd_thread_info *info = data;
 
+        if (info->oti_hlock != NULL)
+                ldiskfs_htree_lock_free(info->oti_hlock);
         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
         OBD_FREE_PTR(info);
 }
@@ -3870,17 +4439,7 @@ static struct lu_context_key osd_key = {
 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
-        int rc;
-        struct lu_context *ctx;
-
-        /* context for commit hooks */
-        ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
-        rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
-        if (rc == 0) {
-                rc = osd_procfs_init(osd_dev(d), name);
-                ctx->lc_cookie = 0x3;
-        }
-        return rc;
+        return osd_procfs_init(osd_dev(d), name);
 }
 
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
@@ -3891,7 +4450,8 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
                 lu_object_put(env, &o->od_obj_area->do_lu);
                 o->od_obj_area = NULL;
         }
-        osd_oi_fini(info, &o->od_oi);
+        if (o->od_oi_table != NULL)
+                osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
 
         RETURN(0);
 }
@@ -3954,7 +4514,6 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
                                  osd_dev(d)->od_mount->lmi_mnt);
         osd_dev(d)->od_mount = NULL;
 
-        lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
         RETURN(NULL);
 }
 
@@ -4044,11 +4603,14 @@ static int osd_prepare(const struct lu_env *env,
 
         ENTRY;
         /* 1. initialize oi before any file create or file open */
-        result = osd_oi_init(oti, &osd->od_oi,
+        result = osd_oi_init(oti, &osd->od_oi_table,
                              &osd->od_dt_dev, lu2md_dev(pdev));
-        if (result != 0)
+        if (result < 0)
                 RETURN(result);
 
+        LASSERT(result > 0);
+        osd->od_oi_count = result;
+
         lmi = osd->od_mount;
         lsi = s2lsi(lmi->lmi_sb);
         ldd = lsi->lsi_ldd;