Whamcloud - gitweb
LU-80 lov: large stripe count support
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 0bbdaa9..bffa028 100644 (file)
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * Copyright (c) 2011 Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 /* llo_* api support */
 #include <md_object.h>
 
+#ifdef HAVE_LDISKFS_PDO
+int ldiskfs_pdo = 1;
+CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
+                "ldiskfs with parallel directory operations");
+#else
+int ldiskfs_pdo = 0;
+#endif
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
@@ -98,6 +112,7 @@ struct osd_object {
         /**
          * to protect index ops.
          */
+        struct htree_lock_head *oo_hl_head;
         cfs_rw_semaphore_t     oo_ext_idx_sem;
         cfs_rw_semaphore_t     oo_sem;
         struct osd_directory  *oo_dir;
@@ -130,6 +145,7 @@ struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
         struct journal_callback ot_jcb;
+        cfs_list_t              ot_dcb_list;
         /* Link to the device, for debugging. */
         struct lu_ref_link     *ot_dev_link;
 
@@ -213,23 +229,33 @@ static inline void
 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
 {
         struct md_ucred    *uc = md_ucred(env);
+        struct cred        *tc;
 
         LASSERT(uc != NULL);
 
-        save->oc_uid = current->fsuid;
-        save->oc_gid = current->fsgid;
-        save->oc_cap = current->cap_effective;
-        current->fsuid         = uc->mu_fsuid;
-        current->fsgid         = uc->mu_fsgid;
-        current->cap_effective = uc->mu_cap;
+        save->oc_uid = current_fsuid();
+        save->oc_gid = current_fsgid();
+        save->oc_cap = current_cap();
+        if ((tc = prepare_creds())) {
+                tc->fsuid         = uc->mu_fsuid;
+                tc->fsgid         = uc->mu_fsgid;
+                commit_creds(tc);
+        }
+        /* XXX not suboptimal */
+        cfs_curproc_cap_unpack(uc->mu_cap);
 }
 
 static inline void
 osd_pop_ctxt(struct osd_ctxt *save)
 {
-        current->fsuid         = save->oc_uid;
-        current->fsgid         = save->oc_gid;
-        current->cap_effective = save->oc_cap;
+        struct cred *tc;
+
+        if ((tc = prepare_creds())) {
+                tc->fsuid         = save->oc_uid;
+                tc->fsgid         = save->oc_gid;
+                tc->cap_effective = save->oc_cap;
+                commit_creds(tc);
+        }
 }
 #endif
 
@@ -298,8 +324,9 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
                 cfs_init_rwsem(&mo->oo_ext_idx_sem);
                 cfs_spin_lock_init(&mo->oo_guard);
                 return l;
-        } else
+        } else {
                 return NULL;
+        }
 }
 
 /*
@@ -337,6 +364,14 @@ static struct inode *osd_iget(struct osd_thread_info *info,
                 CERROR("bad inode %lx\n",inode->i_ino);
                 iput(inode);
                 inode = ERR_PTR(-ENOENT);
+        } else {
+                /* Do not update file c/mtime in ldiskfs.
+                 * NB: we don't have any lock to protect this because we don't
+                 * have reference on osd_object now, but contention with
+                 * another lookup + attr_set can't happen in the tiny window
+                 * between if (...) and set S_NOCMTIME. */
+                if (!(inode->i_flags & S_NOCMTIME))
+                        inode->i_flags |= S_NOCMTIME;
         }
         return inode;
 }
@@ -373,27 +408,43 @@ static int osd_fid_lookup(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         result = osd_oi_lookup(info, oi, fid, id);
-        if (result == 0) {
-                inode = osd_iget(info, dev, id);
-                if (!IS_ERR(inode)) {
-                        obj->oo_inode = inode;
-                        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
-                        if (dev->od_iop_mode) {
-                                obj->oo_compat_dot_created = 1;
-                                obj->oo_compat_dotdot_created = 1;
-                        }
+        if (result != 0) {
+                if (result == -ENOENT)
                         result = 0;
-                } else
-                        /*
-                         * If fid wasn't found in oi, inode-less object is
-                         * created, for which lu_object_exists() returns
-                         * false. This is used in a (frequent) case when
-                         * objects are created as locking anchors or
-                         * place holders for objects yet to be created.
-                         */
-                        result = PTR_ERR(inode);
-        } else if (result == -ENOENT)
-                result = 0;
+                goto out;
+        }
+
+        inode = osd_iget(info, dev, id);
+        if (IS_ERR(inode)) {
+                /*
+                 * If fid wasn't found in oi, inode-less object is
+                 * created, for which lu_object_exists() returns
+                 * false. This is used in a (frequent) case when
+                 * objects are created as locking anchors or
+                 * place holders for objects yet to be created.
+                 */
+                result = PTR_ERR(inode);
+                goto out;
+        }
+
+        obj->oo_inode = inode;
+        LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+        if (dev->od_iop_mode) {
+                obj->oo_compat_dot_created = 1;
+                obj->oo_compat_dotdot_created = 1;
+        }
+
+        if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
+                goto out;
+
+        LASSERT(obj->oo_hl_head == NULL);
+        obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+        if (obj->oo_hl_head == NULL) {
+                obj->oo_inode = NULL;
+                iput(inode);
+                result = -ENOMEM;
+        }
+out:
         LINVRNT(osd_invariant(obj));
 
         RETURN(result);
@@ -442,6 +493,8 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
         LINVRNT(osd_invariant(obj));
 
         dt_object_fini(&obj->oo_dt);
+        if (obj->oo_hl_head != NULL)
+                ldiskfs_htree_lock_head_free(obj->oo_hl_head);
         OBD_FREE_PTR(obj);
 }
 
@@ -540,8 +593,8 @@ static long interval_to_usec(cfs_time_t start, cfs_time_t end)
 /**
  * Check whether the we deal with this handle for too long.
  */
-static void __osd_th_check_slow(const struct osd_thandle *oth,
-                                struct osd_device *dev,
+static void __osd_th_check_slow(void *oth, struct osd_device *dev,
+                                cfs_time_t alloced, cfs_time_t started,
                                 cfs_time_t closed)
 {
         cfs_time_t now = cfs_time_current();
@@ -549,31 +602,31 @@ static void __osd_th_check_slow(const struct osd_thandle *oth,
         LASSERT(dev != NULL);
 
         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
-                            interval_to_usec(oth->oth_alloced, oth->oth_started));
+                            interval_to_usec(alloced, started));
         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
-                            interval_to_usec(oth->oth_started, closed));
+                            interval_to_usec(started, closed));
         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
                             interval_to_usec(closed, now));
 
-        if (cfs_time_before(cfs_time_add(oth->oth_alloced, cfs_time_seconds(30)),
-                            now)) {
+        if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
                 CWARN("transaction handle %p was open for too long: "
                       "now "CFS_TIME_T" ,"
                       "alloced "CFS_TIME_T" ,"
                       "started "CFS_TIME_T" ,"
                       "closed "CFS_TIME_T"\n",
-                      oth, now, oth->oth_alloced,
-                      oth->oth_started, closed);
+                      oth, now, alloced, started, closed);
                 libcfs_debug_dumpstack(NULL);
         }
 }
 
-#define OSD_CHECK_SLOW_TH(oth, dev, expr)               \
-{                                                       \
-        cfs_time_t __closed = cfs_time_current();       \
-                                                        \
-        expr;                                           \
-        __osd_th_check_slow(oth, dev, __closed);        \
+#define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
+{                                                                       \
+        cfs_time_t __closed = cfs_time_current();                       \
+        cfs_time_t __alloced = oth->oth_alloced;                        \
+        cfs_time_t __started = oth->oth_started;                        \
+                                                                        \
+        expr;                                                           \
+        __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
 }
 
 #else /* OSD_THANDLE_STATS */
@@ -596,28 +649,28 @@ static int osd_param_is_sane(const struct osd_device *dev,
 /*
  * Concurrency: shouldn't matter.
  */
+#ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
+static void osd_trans_commit_cb(struct super_block *sb,
+                                struct journal_callback *jcb, int error)
+#else
 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
+#endif
 {
         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
         struct thandle     *th  = &oh->ot_super;
-        struct dt_device   *dev = th->th_dev;
-        struct lu_device   *lud = &dev->dd_lu_dev;
+        struct lu_device   *lud = &th->th_dev->dd_lu_dev;
+        struct dt_txn_commit_cb *dcb, *tmp;
 
-        LASSERT(dev != NULL);
         LASSERT(oh->ot_handle == NULL);
 
-        if (error) {
+        if (error)
                 CERROR("transaction @0x%p commit error: %d\n", th, error);
-        } else {
-                struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
-                /*
-                 * This od_env_for_commit is only for commit usage.  see
-                 * "struct dt_device"
-                 */
-                lu_context_enter(&env->le_ctx);
-                dt_txn_hook_commit(env, th);
-                lu_context_exit(&env->le_ctx);
-        }
+
+        dt_txn_hook_commit(th);
+
+        /* call per-transaction callbacks if any */
+        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+                dcb->dcb_func(NULL, th, dcb, error);
 
         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
         lu_device_put(lud);
@@ -657,6 +710,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                          * be used.
                          */
                         oti->oti_dev = dev;
+                        CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
                         osd_th_alloced(oh);
                         jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
                         osd_th_started(oh);
@@ -665,7 +719,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                                 th = &oh->ot_super;
                                 th->th_dev = d;
                                 th->th_result = 0;
-                                jh->h_sync = p->tp_sync;
+                                th->th_sync = 0;
                                 lu_device_get(&d->dd_lu_dev);
                                 oh->ot_dev_link = lu_ref_add
                                         (&d->dd_lu_dev.ld_reference,
@@ -673,12 +727,10 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                                 /* add commit callback */
                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
                                 lu_context_enter(&th->th_ctx);
-                                osd_journal_callback_set(jh, osd_trans_commit_cb,
-                                                         (struct journal_callback *)&oh->ot_jcb);
-                                        LASSERT(oti->oti_txns == 0);
-                                        LASSERT(oti->oti_r_locks == 0);
-                                        LASSERT(oti->oti_w_locks == 0);
-                                        oti->oti_txns++;
+                                LASSERT(oti->oti_txns == 0);
+                                LASSERT(oti->oti_r_locks == 0);
+                                LASSERT(oti->oti_w_locks == 0);
+                                oti->oti_txns++;
                         } else {
                                 OBD_FREE_PTR(oh);
                                 th = (void *)jh;
@@ -708,6 +760,15 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
 
+                hdl->h_sync = th->th_sync;
+                /*
+                 * add commit callback
+                 * notice we don't do this in osd_trans_start()
+                 * as underlying transaction can change during truncate
+                 */
+                osd_journal_callback_set(hdl, osd_trans_commit_cb,
+                                         &oh->ot_jcb);
+
                 LASSERT(oti->oti_txns == 1);
                 oti->oti_txns--;
                 LASSERT(oti->oti_r_locks == 0);
@@ -720,10 +781,22 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
                                   result = ldiskfs_journal_stop(hdl));
                 if (result != 0)
                         CERROR("Failure to stop transaction: %d\n", result);
+        } else {
+                OBD_FREE_PTR(oh);
         }
         EXIT;
 }
 
+static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+        struct osd_thandle *oh = container_of0(th, struct osd_thandle,
+                                               ot_super);
+
+        cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+
+        return 0;
+}
+
 /*
  * Concurrency: no concurrent access is possible that late in object
  * life-cycle.
@@ -853,12 +926,27 @@ static void osd_conf_get(const struct lu_env *env,
                          const struct dt_device *dev,
                          struct dt_device_param *param)
 {
+        struct super_block *sb = osd_sb(osd_dt_dev(dev));
+
         /*
          * XXX should be taken from not-yet-existing fs abstraction layer.
          */
-        param->ddp_max_name_len  = LDISKFS_NAME_LEN;
-        param->ddp_max_nlink     = LDISKFS_LINK_MAX;
-        param->ddp_block_shift   = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+        param->ddp_max_name_len = LDISKFS_NAME_LEN;
+        param->ddp_max_nlink    = LDISKFS_LINK_MAX;
+        param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+        param->ddp_mntopts      = 0;
+        if (test_opt(sb, XATTR_USER))
+                param->ddp_mntopts |= MNTOPT_USERXATTR;
+        if (test_opt(sb, POSIX_ACL))
+                param->ddp_mntopts |= MNTOPT_ACL;
+
+#if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
+        if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
+                param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
+        else
+#endif
+                param->ddp_max_ea_size = sb->s_blocksize;
+
 }
 
 /**
@@ -1101,6 +1189,7 @@ static const struct dt_device_operations osd_dt_ops = {
         .dt_statfs         = osd_statfs,
         .dt_trans_start    = osd_trans_start,
         .dt_trans_stop     = osd_trans_stop,
+        .dt_trans_cb_add   = osd_trans_cb_add,
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
@@ -1357,7 +1446,7 @@ static int osd_inode_setattr(const struct lu_env *env,
                 iattr.ia_uid = attr->la_uid;
                 iattr.ia_gid = attr->la_gid;
                 osd_push_ctxt(env, save);
-                rc = DQUOT_TRANSFER(inode, &iattr) ? -EDQUOT : 0;
+                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
                 osd_pop_ctxt(save);
                 if (rc != 0)
                         return rc;
@@ -1393,8 +1482,11 @@ static int osd_inode_setattr(const struct lu_env *env,
         if (bits & LA_RDEV)
                 inode->i_rdev   = attr->la_rdev;
 
-        if (bits & LA_FLAGS)
-                inode->i_flags = ll_ext_to_inode_flags(attr->la_flags);
+        if (bits & LA_FLAGS) {
+                /* always keep S_NOCMTIME */
+                inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
+                                 S_NOCMTIME;
+        }
         return 0;
 }
 
@@ -1419,7 +1511,7 @@ static int osd_attr_set(const struct lu_env *env,
         cfs_spin_unlock(&obj->oo_guard);
 
         if (!rc)
-                mark_inode_dirty(obj->oo_inode);
+                obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
         return rc;
 }
 
@@ -1438,6 +1530,8 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
                            struct lu_attr *attr, struct thandle *th)
 {
         osd_object_init0(obj);
+        if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
+                unlock_new_inode(obj->oo_inode);
         return 0;
 }
 
@@ -1478,6 +1572,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
+        LASSERT(obj->oo_hl_head == NULL);
+
+        if (S_ISDIR(mode) && ldiskfs_pdo) {
+                obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+                if (obj->oo_hl_head == NULL)
+                        return -ENOMEM;
+        }
 
         oth = container_of(th, struct osd_thandle, ot_super);
         LASSERT(oth->ot_handle->h_transaction != NULL);
@@ -1499,10 +1600,19 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         osd_pop_ctxt(save);
 #endif
         if (!IS_ERR(inode)) {
+                /* Do not update file c/mtime in ldiskfs.
+                 * NB: don't need any lock because no contention at this
+                 * early stage */
+                inode->i_flags |= S_NOCMTIME;
                 obj->oo_inode = inode;
                 result = 0;
-        } else
+        } else {
+                if (obj->oo_hl_head != NULL) {
+                        ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+                        obj->oo_hl_head = NULL;
+                }
                 result = PTR_ERR(inode);
+        }
         LINVRNT(osd_invariant(obj));
         return result;
 }
@@ -1751,7 +1861,6 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         struct inode           *inode    = obj->oo_inode;
         struct osd_thread_info *info     = osd_oti_get(env);
         struct dentry          *dentry   = &info->oti_child_dentry;
-        struct timespec        *t        = &info->oti_time;
         int                     fs_flags = 0;
         int  rc;
 
@@ -1766,14 +1875,8 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                 fs_flags |= XATTR_CREATE;
 
         dentry->d_inode = inode;
-        *t = inode->i_ctime;
         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
                                    buf->lb_len, fs_flags);
-        /* ctime should not be updated with server-side time. */
-        cfs_spin_lock(&obj->oo_guard);
-        inode->i_ctime = *t;
-        cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -1944,7 +2047,7 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         result = __osd_object_create(info, obj, attr, hint, dof, th);
 
         /* objects under osd root shld have igif fid, so dont add fid EA */
-        if (result == 0 && fid_seq(fid) >= FID_SEQ_DISTRIBUTED_START)
+        if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
                 result = osd_ea_fid_set(env, dt, fid);
 
         if (result == 0)
@@ -1974,7 +2077,7 @@ static void osd_object_ref_add(const struct lu_env *env,
         LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         inode->i_nlink++;
         cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
+        inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
 }
 
@@ -1997,7 +2100,7 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
         cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
+        inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
 }
 
@@ -2078,7 +2181,6 @@ static int osd_xattr_del(const struct lu_env *env,
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_obj_dentry;
-        struct timespec        *t      = &info->oti_time;
         int                     rc;
 
         LASSERT(dt_object_exists(dt));
@@ -2090,13 +2192,7 @@ static int osd_xattr_del(const struct lu_env *env,
                 return -EACCES;
 
         dentry->d_inode = inode;
-        *t = inode->i_ctime;
         rc = inode->i_op->removexattr(dentry, name);
-        /* ctime should not be updated with server-side time. */
-        cfs_spin_lock(&obj->oo_guard);
-        inode->i_ctime = *t;
-        cfs_spin_unlock(&obj->oo_guard);
-        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -2284,18 +2380,30 @@ static int osd_iam_container_init(const struct lu_env *env,
                                   struct osd_object *obj,
                                   struct osd_directory *dir)
 {
+        struct iam_container *bag = &dir->od_container;
         int result;
-        struct iam_container *bag;
 
-        bag    = &dir->od_container;
         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
-        if (result == 0) {
-                result = iam_container_setup(bag);
-                if (result == 0)
-                        obj->oo_dt.do_index_ops = &osd_index_iam_ops;
-                else
-                        iam_container_fini(bag);
+        if (result != 0)
+                return result;
+
+        result = iam_container_setup(bag);
+        if (result != 0)
+                goto out;
+
+        if (osd_obj2dev(obj)->od_iop_mode) {
+                u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag);
+
+                bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode,
+                                                ptr, 0, &result);
         }
+
+ out:
+        if (result == 0)
+                obj->oo_dt.do_index_ops = &osd_index_iam_ops;
+        else
+                iam_container_fini(bag);
+
         return result;
 }
 
@@ -2352,10 +2460,12 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                         else
                                 result = 0;
                         cfs_up_write(&obj->oo_ext_idx_sem);
-                } else
+                } else {
                         result = -ENOMEM;
-        } else
+                }
+        } else {
                 result = 0;
+        }
 
         if (result == 0 && ea_dir == 0) {
                 if (!osd_iam_index_probe(env, obj, feat))
@@ -2604,7 +2714,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle *oh;
         ssize_t            result = 0;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t           save = current->cap_effective;
+        cfs_cap_t           save = cfs_curproc_cap_pack();
 #endif
 
         LASSERT(handle != NULL);
@@ -2616,9 +2726,9 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         LASSERT(oh->ot_handle->h_transaction != NULL);
 #ifdef HAVE_QUOTA_SUPPORT
         if (ignore_quota)
-                current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
         else
-                current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
         /* Write small symlink to inode body as we need to maintain correct
          * on-disk symlinks for ldiskfs.
@@ -2631,7 +2741,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                                                   buf->lb_len, pos,
                                                   oh->ot_handle);
 #ifdef HAVE_QUOTA_SUPPORT
-        current->cap_effective = save;
+        cfs_curproc_cap_unpack(save);
 #endif
         if (result == 0)
                 result = buf->lb_len;
@@ -2723,6 +2833,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle         *oh;
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
+        struct htree_lock          *hlock = NULL;
 
         int rc;
 
@@ -2742,28 +2853,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_write(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
-        if (bh) {
-                struct osd_thread_info *oti = osd_oti_get(env);
-                struct timespec *ctime = &oti->oti_time;
-                struct timespec *mtime = &oti->oti_time2;
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_DEL);
+        } else {
+                cfs_down_write(&obj->oo_ext_idx_sem);
+        }
 
-                *ctime = dir->i_ctime;
-                *mtime = dir->i_mtime;
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
+        if (bh) {
                 rc = ldiskfs_delete_entry(oh->ot_handle,
-                                dir, de, bh);
-                /* xtime should not be updated with server-side time. */
-                cfs_spin_lock(&obj->oo_guard);
-                dir->i_ctime = *ctime;
-                dir->i_mtime = *mtime;
-                cfs_spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(dir);
+                                          dir, de, bh);
                 brelse(bh);
-        } else
+        } else {
                 rc = -ENOENT;
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_write(&obj->oo_ext_idx_sem);
 
-        cfs_up_write(&obj->oo_ext_idx_sem);
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -2848,7 +2958,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         struct osd_thandle    *oh;
         struct iam_container  *bag = &obj->oo_dir->od_container;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t              save = current->cap_effective;
+        cfs_cap_t              save = cfs_curproc_cap_pack();
 #endif
         struct osd_thread_info *oti = osd_oti_get(env);
         struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
@@ -2873,9 +2983,9 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         LASSERT(oh->ot_handle->h_transaction != NULL);
 #ifdef HAVE_QUOTA_SUPPORT
         if (ignore_quota)
-                current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
         else
-                current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
         if (S_ISDIR(obj->oo_inode->i_mode))
                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
@@ -2884,7 +2994,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
                         iam_rec, ipd);
 #ifdef HAVE_QUOTA_SUPPORT
-        current->cap_effective = save;
+        cfs_curproc_cap_unpack(save);
 #endif
         osd_ipd_put(env, bag, ipd);
         LINVRNT(osd_invariant(obj));
@@ -2904,6 +3014,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
                             struct inode  *cinode,
                             const char *name,
                             const struct dt_rec *fid,
+                            struct htree_lock *hlock,
                             struct thandle *th)
 {
         struct ldiskfs_dentry_param *ldp;
@@ -2918,13 +3029,13 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
 
         if (fid_is_igif((struct lu_fid *)fid) ||
-            fid_seq((struct lu_fid *)fid) >= FID_SEQ_DISTRIBUTED_START) {
+            fid_is_norm((struct lu_fid *)fid)) {
                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
                 osd_get_ldiskfs_dirent_param(ldp, fid);
                 child->d_fsdata = (void*) ldp;
         } else
                 child->d_fsdata = NULL;
-        rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+        rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
 
         RETURN(rc);
 }
@@ -2972,7 +3083,7 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
 
                 if (!dir->oo_compat_dot_created)
                         return -EINVAL;
-                if (fid_seq((struct lu_fid *) dot_fid) >= FID_SEQ_DISTRIBUTED_START) {
+                if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
                 } else {
@@ -2982,11 +3093,11 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                 /* in case of rename, dotdot is already created */
                 if (dir->oo_compat_dotdot_created) {
                         return __osd_ea_add_rec(info, dir, parent_dir, name,
-                                                dot_dot_fid, th);
+                                                dot_dot_fid, NULL, th);
                 }
 
-                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode,
-                                                dot_ldp, dot_dot_ldp);
+                result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
+                                                inode, dot_ldp, dot_dot_ldp);
                 if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
         }
@@ -3007,15 +3118,37 @@ static int osd_ea_add_rec(const struct lu_env *env,
                           struct thandle *th)
 {
         struct osd_thread_info    *info   = osd_oti_get(env);
+        struct htree_lock         *hlock;
         int rc;
 
+        hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
+
         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
-                                                   name[2] =='\0')))
+                                                   name[2] =='\0'))) {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, 0);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
                                         fid, th);
+        } else {
+                if (hlock != NULL) {
+                        ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
+                                           pobj->oo_inode, LDISKFS_HLOCK_ADD);
+                } else {
+                        cfs_down_write(&pobj->oo_ext_idx_sem);
+                }
+
+                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
+                                      hlock, th);
+        }
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
         else
-                rc = __osd_ea_add_rec(info, pobj, cinode, name, fid, th);
+                cfs_up_write(&pobj->oo_ext_idx_sem);
 
         return rc;
 }
@@ -3036,6 +3169,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         struct ldiskfs_dir_entry_2 *de;
         struct buffer_head         *bh;
         struct lu_fid              *fid = (struct lu_fid *) rec;
+        struct htree_lock          *hlock = NULL;
         int ino;
         int rc;
 
@@ -3044,8 +3178,15 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
-        bh = ll_ldiskfs_find_entry(dir, dentry, &de);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   dir, LDISKFS_HLOCK_LOOKUP);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
+        bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
                 rc = osd_get_fid_from_dentry(de, rec);
@@ -3054,10 +3195,14 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                 brelse(bh);
                 if (rc != 0)
                         rc = osd_ea_fid_get(env, obj, ino, fid);
-        } else
+        } else {
                 rc = -ENOENT;
+        }
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
         RETURN (rc);
 }
 
@@ -3138,7 +3283,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
         const char               *name  = (const char *)key;
         struct osd_object        *child;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t                 save  = current->cap_effective;
+        cfs_cap_t                 save  = cfs_curproc_cap_pack();
 #endif
         int rc;
 
@@ -3153,32 +3298,17 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
 
         child = osd_object_find(env, dt, fid);
         if (!IS_ERR(child)) {
-                struct inode *inode = obj->oo_inode;
-                struct osd_thread_info *oti = osd_oti_get(env);
-                struct timespec *ctime = &oti->oti_time;
-                struct timespec *mtime = &oti->oti_time2;
-
-                *ctime = inode->i_ctime;
-                *mtime = inode->i_mtime;
 #ifdef HAVE_QUOTA_SUPPORT
                 if (ignore_quota)
-                        current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+                        cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
                 else
-                        current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+                        cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
 #endif
-                cfs_down_write(&obj->oo_ext_idx_sem);
                 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
-                cfs_up_write(&obj->oo_ext_idx_sem);
 #ifdef HAVE_QUOTA_SUPPORT
-                current->cap_effective = save;
+                cfs_curproc_cap_unpack(save);
 #endif
                 osd_object_put(env, child);
-                /* xtime should not be updated with server-side time. */
-                cfs_spin_lock(&obj->oo_guard);
-                inode->i_ctime = *ctime;
-                inode->i_mtime = *mtime;
-                cfs_spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
         } else {
                 rc = PTR_ERR(child);
         }
@@ -3194,8 +3324,9 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
  */
 
 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
-                                 struct dt_object *dt,
-                                 struct lustre_capa *capa)
+                                     struct dt_object *dt,
+                                     __u32 unused,
+                                     struct lustre_capa *capa)
 {
         struct osd_it_iam         *it;
         struct osd_thread_info *oti = osd_oti_get(env);
@@ -3447,6 +3578,7 @@ static const struct dt_index_operations osd_index_iam_ops = {
  */
 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
                                     struct dt_object *dt,
+                                    __u32 attr,
                                     struct lustre_capa *capa)
 {
         struct osd_object       *obj  = osd_dt_obj(dt);
@@ -3468,6 +3600,10 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
         it->oie_obj             = obj;
         it->oie_file.f_pos      = 0;
         it->oie_file.f_dentry   = obj_dentry;
+        if (attr & LUDA_64BITHASH)
+                it->oie_file.f_flags = O_64BITHASH;
+        else
+                it->oie_file.f_flags = O_32BITHASH;
         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
         it->oie_file.f_op         = obj->oo_inode->i_fop;
         it->oie_file.private_data = NULL;
@@ -3587,22 +3723,34 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
  * \retval   0 on success
  * \retval -ve on error
  */
-static int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct lu_env *env,
+                               const struct dt_it *di)
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
         struct inode       *inode = obj->oo_inode;
-        int                result = 0;
+        struct htree_lock  *hlock = NULL;
+        int                 result = 0;
 
         ENTRY;
         it->oie_dirent = it->oie_buf;
         it->oie_rd_dirent = 0;
 
-        cfs_down_read(&obj->oo_ext_idx_sem);
+        if (obj->oo_hl_head != NULL) {
+                hlock = osd_oti_get(env)->oti_hlock;
+                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                   inode, LDISKFS_HLOCK_READDIR);
+        } else {
+                cfs_down_read(&obj->oo_ext_idx_sem);
+        }
+
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
-        cfs_up_read(&obj->oo_ext_idx_sem);
+        if (hlock != NULL)
+                ldiskfs_htree_unlock(hlock);
+        else
+                cfs_up_read(&obj->oo_ext_idx_sem);
 
         if (it->oie_rd_dirent == 0) {
                 result = -EIO;
@@ -3643,7 +3791,7 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
                 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
                         rc = +1;
                 else
-                        rc = osd_ldiskfs_it_fill(di);
+                        rc = osd_ldiskfs_it_fill(env, di);
         }
 
         RETURN(rc);
@@ -3748,7 +3896,7 @@ static int osd_it_ea_load(const struct lu_env *env,
         ENTRY;
         it->oie_file.f_pos = hash;
 
-        rc =  osd_ldiskfs_it_fill(di);
+        rc =  osd_ldiskfs_it_fill(env, di);
         if (rc == 0)
                 rc = +1;
 
@@ -3813,19 +3961,26 @@ static void *osd_key_init(const struct lu_context *ctx,
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
-        if (info != NULL) {
-                OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
-                if (info->oti_it_ea_buf != NULL) {
-                        info->oti_env = container_of(ctx, struct lu_env,
-                                                     le_ctx);
-                } else {
-                        OBD_FREE_PTR(info);
-                        info = ERR_PTR(-ENOMEM);
-                }
-        } else {
-                info = ERR_PTR(-ENOMEM);
-        }
+        if (info == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+        if (info->oti_it_ea_buf == NULL)
+                goto out_free_info;
+
+        info->oti_env = container_of(ctx, struct lu_env, le_ctx);
+
+        info->oti_hlock = ldiskfs_htree_lock_alloc();
+        if (info->oti_hlock == NULL)
+                goto out_free_ea;
+
         return info;
+
+ out_free_ea:
+        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+ out_free_info:
+        OBD_FREE_PTR(info);
+        return ERR_PTR(-ENOMEM);
 }
 
 static void osd_key_fini(const struct lu_context *ctx,
@@ -3833,6 +3988,8 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
         struct osd_thread_info *info = data;
 
+        if (info->oti_hlock != NULL)
+                ldiskfs_htree_lock_free(info->oti_hlock);
         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
         OBD_FREE_PTR(info);
 }
@@ -3861,17 +4018,7 @@ static struct lu_context_key osd_key = {
 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
-        int rc;
-        struct lu_context *ctx;
-
-        /* context for commit hooks */
-        ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
-        rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
-        if (rc == 0) {
-                rc = osd_procfs_init(osd_dev(d), name);
-                ctx->lc_cookie = 0x3;
-        }
-        return rc;
+        return osd_procfs_init(osd_dev(d), name);
 }
 
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
@@ -3945,7 +4092,6 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
                                  osd_dev(d)->od_mount->lmi_mnt);
         osd_dev(d)->od_mount = NULL;
 
-        lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
         RETURN(NULL);
 }