Whamcloud - gitweb
add per-thread debugging flags. Use then to control CDEBUG().
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 906a352..aef052a 100644 (file)
@@ -41,6 +41,7 @@
 #include <linux/ldiskfs_fs.h>
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
+#include <lustre_fid.h>
 
 #include "mdd_internal.h"
 
@@ -48,25 +49,27 @@ static const char dot[] = ".";
 static const char dotdot[] = "..";
 
 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
-                        const char *name, const struct lu_fid* fid, int mask);
+                        const char *name, struct lu_fid* fid, int mask);
 static int
 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
-                    const char *name, const struct lu_fid* fid, int mask)
+                    const char *name, struct lu_fid* fid, int mask)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct dynlock_handle *dlh;
         int rc;
 
         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
+        if (dlh == NULL)
+                return -ENOMEM;
         rc = __mdd_lookup(env, pobj, name, fid, mask);
         mdd_pdo_read_unlock(env, mdd_obj, dlh);
 
-       return rc;
+        return rc;
 }
 
 static int mdd_lookup(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
-                      struct lu_fid* fid)
+                      struct lu_fid* fid, struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -82,6 +85,16 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
 }
 
 /*
+ * For root fid use special function, whcih does not compare version component
+ * of fid. Vresion component is different for root fids on all MDTs.
+ */
+static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
+{
+        return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
+                fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
+}
+
+/*
  * return 1: if lf is the fid of the ancestor of p1;
  * return 0: if not;
  *
@@ -104,15 +117,15 @@ static int mdd_is_parent(const struct lu_env *env,
         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
         pfid = &mdd_env_info(env)->mti_fid;
 
-        /* Do not lookup ".." in root, they do not exist there. */
-        if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
+        /* Check for root first. */
+        if (mdd_is_root(mdd, mdo2fid(p1)))
                 RETURN(0);
 
         for(;;) {
                 rc = mdd_parent_fid(env, p1, pfid);
                 if (rc)
                         GOTO(out, rc);
-                if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
+                if (mdd_is_root(mdd, pfid))
                         GOTO(out, rc = 0);
                 if (lu_fid_eq(pfid, lf))
                         GOTO(out, rc = 1);
@@ -159,14 +172,20 @@ static int mdd_is_subdir(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
-
+        if (rc == 0) {
+                /* found root */
+                fid_zero(sfid);
+        } else if (rc == 1) {
+                /* found @fid is parent */
+                *sfid = *fid;
+                rc = 0;
+        }
         RETURN(rc);
 }
 
-/*Check whether it may create the cobj under the pobj*/
-static int mdd_may_create(const struct lu_env *env,
-                          struct mdd_object *pobj, struct mdd_object *cobj,
-                          int need_check)
+/* Check whether it may create the cobj under the pobj */
+static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
+                          struct mdd_object *cobj, int need_check)
 {
         int rc = 0;
         ENTRY;
@@ -177,18 +196,13 @@ static int mdd_may_create(const struct lu_env *env,
         if (mdd_is_dead_obj(pobj))
                 RETURN(-ENOENT);
 
-        /*check pobj may create or not*/
         if (need_check)
-                rc = mdd_permission_internal_locked(env, pobj,
-                                             MAY_WRITE | MAY_EXEC);
+                rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                                    MAY_WRITE | MAY_EXEC);
 
         RETURN(rc);
 }
 
-/*
- * It's inline, so penalty for filesystems that don't use sticky bit is
- * minimal.
- */
 static inline int mdd_is_sticky(const struct lu_env *env,
                                 struct mdd_object *pobj,
                                 struct mdd_object *cobj)
@@ -203,14 +217,11 @@ static inline int mdd_is_sticky(const struct lu_env *env,
         } else if (tmp_la->la_uid == uc->mu_fsuid) {
                 return 0;
         } else {
-                mdd_read_lock(env, pobj);
                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
-                mdd_read_unlock(env, pobj);
                 if (rc)
                         return rc;
-                else if (!(tmp_la->la_mode & S_ISVTX))
-                        return 0;
-                else if (tmp_la->la_uid == uc->mu_fsuid)
+                else if (!(tmp_la->la_mode & S_ISVTX) ||
+                         (tmp_la->la_uid == uc->mu_fsuid))
                         return 0;
                 else
                         return !mdd_capable(uc, CAP_FOWNER);
@@ -243,7 +254,7 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EBUSY);
 
         } else if (S_ISDIR(mdd_object_type(cobj))) {
-                        RETURN(-EISDIR);
+                RETURN(-EISDIR);
         }
 
         if (pobj) {
@@ -254,9 +265,8 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EPERM);
 
                 if (need_check)
-                        rc = mdd_permission_internal_locked(env, pobj,
-                                                            MAY_WRITE |
-                                                            MAY_EXEC);
+                        rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                                     MAY_WRITE | MAY_EXEC);
         }
         RETURN(rc);
 }
@@ -267,76 +277,77 @@ int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
         int rc = 0;
         ENTRY;
 
+        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
+                RETURN(-EPERM);
+
+        if (S_ISDIR(mdd_object_type(src_obj)))
+                RETURN(-EPERM);
+
+        LASSERT(src_obj != tgt_obj);
         if (tgt_obj) {
                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
                 if (rc)
                         RETURN(rc);
         }
 
-        if (S_ISDIR(mdd_object_type(src_obj)))
-                RETURN(-EPERM);
+        RETURN(rc);
+}
 
-        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
-                RETURN(-EPERM);
+const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
+                                   const struct lu_fid *fid)
+{
+        struct mdd_thread_info *info = mdd_env_info(env);
 
-        RETURN(rc);
+        fid_cpu_to_be(&info->mti_fid2, fid);
+        return (const struct dt_rec *)&info->mti_fid2;
 }
 
+
 /* insert new index, add reference if isdir, update times */
-static int __mdd_index_insert(const struct lu_env *env,
-                             struct mdd_object *pobj, const struct lu_fid *lf,
-                             const char *name, int isdir, struct thandle *th,
-                             struct lustre_capa *capa)
+static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
+                              const struct lu_fid *lf, const char *name, int is_dir,
+                              struct thandle *handle, struct lustre_capa *capa)
 {
         struct dt_object *next = mdd_object_child(pobj);
-        struct timeval start;
-        int rc;
+        struct timeval    start;
+        int               rc;
         ENTRY;
-        
-        mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start, 
-                             LPROC_MDD_INDEX_INSERT);
-#if 0
-        struct lu_attr   *la = &mdd_env_info(env)->mti_la;
-#endif
 
-        if (dt_try_as_dir(env, next))
+        mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+                               LPROC_MDD_INDEX_INSERT);
+        if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_insert(env, next,
-                                                    (struct dt_rec *)lf,
-                                                    (struct dt_key *)name,
-                                                    th, capa);
-        else
+                                                    __mdd_fid_rec(env, lf),
+                                                    (const struct dt_key *)name,
+                                                    handle, capa);
+        } else {
                 rc = -ENOTDIR;
+        }
 
         if (rc == 0) {
-                if (isdir) {
+                if (is_dir) {
                         mdd_write_lock(env, pobj);
-                        mdd_ref_add_internal(env, pobj, th);
-                        mdd_write_unlock(env, pobj);                
+                        mdd_ref_add_internal(env, pobj, handle);
+                        mdd_write_unlock(env, pobj);
                 }
-#if 0
-                la->la_valid = LA_MTIME|LA_CTIME;
-                la->la_atime = ma->ma_attr.la_atime;
-                la->la_ctime = ma->ma_attr.la_ctime;
-                rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
-#endif
         }
-        mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
-                           LPROC_MDD_INDEX_INSERT);
-        return rc;
+        mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+                             LPROC_MDD_INDEX_INSERT);
+        RETURN(rc);
 }
 
-static int __mdd_index_delete(const struct lu_env *env,
-                              struct mdd_object *pobj, const char *name,
-                              int is_dir, struct thandle *handle,
+static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
+                              const char *name, int is_dir, struct thandle *handle,
                               struct lustre_capa *capa)
 {
         struct dt_object *next = mdd_object_child(pobj);
-        struct timeval start;
-        int rc;
+        struct timeval    start;
+        int               rc;
         ENTRY;
 
-        mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start, 
-                             LPROC_MDD_INDEX_DELETE);
+        mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+                               LPROC_MDD_INDEX_DELETE);
+
         if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_delete(env, next,
                                                     (struct dt_key *)name,
@@ -348,27 +359,29 @@ static int __mdd_index_delete(const struct lu_env *env,
                 }
         } else
                 rc = -ENOTDIR;
-        mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start, 
-                           LPROC_MDD_INDEX_DELETE);
+
+        mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+                             LPROC_MDD_INDEX_DELETE);
         RETURN(rc);
 }
 
-static int __mdd_index_insert_only(const struct lu_env *env,
-                                   struct mdd_object *pobj,
-                                   const struct lu_fid *lf,
-                                   const char *name, struct thandle *th,
-                                   struct lustre_capa *capa)
+static int
+__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
+                        const struct lu_fid *lf, const char *name,
+                        struct thandle *handle, struct lustre_capa *capa)
 {
-        int rc;
         struct dt_object *next = mdd_object_child(pobj);
+        int               rc;
         ENTRY;
 
-        if (dt_try_as_dir(env, next))
+        if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_insert(env, next,
-                                         (struct dt_rec *)lf,
-                                         (struct dt_key *)name, th, capa);
-        else
+                                                    __mdd_fid_rec(env, lf),
+                                                    (const struct dt_key *)name,
+                                                    handle, capa);
+        } else {
                 rc = -ENOTDIR;
+        }
         RETURN(rc);
 }
 
@@ -376,12 +389,12 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const char *name,
                     struct md_attr *ma)
 {
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
         struct mdd_device *mdd = mdo2mdd(src_obj);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct thandle *handle;
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -391,32 +404,43 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                 RETURN(PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
         mdd_write_lock(env, mdd_sobj);
 
         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
                                      name, handle,
                                      mdd_object_capa(env, mdd_tobj));
-        if (rc == 0)
-                mdd_ref_add_internal(env, mdd_sobj, handle);
-
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME;
-        rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
+        mdd_ref_add_internal(env, mdd_sobj, handle);
 
-out:
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
+        if (rc)
+                GOTO(out_unlock, rc);
+
+        la->la_valid = LA_CTIME;
+        rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
+        EXIT;
+out_unlock:
         mdd_write_unlock(env, mdd_sobj);
         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
+out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
+}
+
+static inline void mdd_set_dead_obj(struct mdd_object *obj)
+{
+        if (obj)
+                obj->mod_flags |= DEAD_OBJ;
 }
 
 /* caller should take a lock before calling */
@@ -432,12 +456,18 @@ int mdd_finish_unlink(const struct lu_env *env,
                 /* add new orphan and the object
                  * will be deleted during the object_put() */
                 if (__mdd_orphan_add(env, obj, th) == 0)
-                        set_bit(LU_OBJECT_ORPHAN,
-                                &mdd2lu_obj(obj)->lo_header->loh_flags);
+                        obj->mod_flags |= ORPHAN_OBJ;
 
+                mdd_set_dead_obj(obj);
                 if (obj->mod_count == 0)
                         rc = mdd_object_kill(env, obj, ma);
-        }
+                else
+                        /* clear MA_LOV | MA_COOKIE, if we do not
+                         * unlink it in case we get it somewhere */
+                        ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+        } else
+                ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+
         RETURN(rc);
 }
 
@@ -462,7 +492,7 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
         obj = mdd_object_child(dir);
         iops = &obj->do_index_ops->dio_it;
-        it = iops->init(env, obj, 0);
+        it = iops->init(env, obj, 0, BYPASS_CAPA);
         if (it != NULL) {
                 result = iops->get(env, it, (const void *)"");
                 if (result > 0) {
@@ -508,28 +538,43 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
-static int mdd_unlink(const struct lu_env *env,
-                      struct md_object *pobj, struct md_object *cobj,
-                      const char *name, struct md_attr *ma)
+static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
+                      struct md_object *cobj, const char *name,
+                      struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct thandle    *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle    *handle;
         int rc, is_dir;
         ENTRY;
 
-        rc = mdd_log_txn_param_build(env, mdd_cobj, ma, MDD_TXN_UNLINK_OP);
+        /*
+         * Check -ENOENT early here because we need to get object type
+         * to calculate credits before transaction start
+         */
+        if (!lu_object_exists(&cobj->mo_lu)) {
+                LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
+                                "unlinking as `%s'", name);
+                RETURN(-ENOENT);
+        }
+
+        LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
+                 PFID(lu_object_fid(&cobj->mo_lu)));
+
+        rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
         if (rc)
                 RETURN(rc);
-        
+
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
         mdd_write_lock(env, mdd_cobj);
 
         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
@@ -537,25 +582,29 @@ static int mdd_unlink(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
+
+        current->debugging1 |= 0x1; /* XXX enable lvar_enoent_debug
+                                     * debugging */
         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
                                 mdd_object_capa(env, mdd_pobj));
+        current->debugging1 &= ~0x1;
         if (rc)
                 GOTO(cleanup, rc);
 
         mdd_ref_del_internal(env, mdd_cobj, handle);
-        *la_copy = ma->ma_attr;
         if (is_dir) {
                 /* unlink dot */
                 mdd_ref_del_internal(env, mdd_cobj, handle);
-        } else {
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
-                if (rc)
-                        GOTO(cleanup, rc);
         }
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        la->la_valid = LA_CTIME;
+        rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -565,53 +614,44 @@ static int mdd_unlink(const struct lu_env *env,
                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
                                    strlen("unlinked"), "unlinked", 0,
                                    NULL, NULL);
+        EXIT;
 cleanup:
         mdd_write_unlock(env, mdd_cobj);
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
-/*
- * Partial operation. Be aware, this is called with write lock taken, so we use
- * locksless version of __mdd_lookup() here.
- */
 static int mdd_ni_sanity_check(const struct lu_env *env,
                                struct md_object *pobj,
                                const char *name,
                                const struct lu_fid *fid)
 {
-        struct mdd_object *obj       = md2mdd_obj(pobj);
-#if 0
-        int rc;
-#endif
+        struct mdd_object *obj = md2mdd_obj(pobj);
         ENTRY;
 
         /* EEXIST check */
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-         /* The exist of the name will be checked in _index_insert. */
-#if 0
-        rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
-        if (rc != -ENOENT)
-                RETURN(rc ? : -EEXIST);
-        else
-                RETURN(0);
-#endif
-        RETURN(mdd_permission_internal_locked(env, obj,
+        /* The exist of the name will be checked in _index_insert. */
+        RETURN(mdd_permission_internal_locked(env, obj, NULL,
                                               MAY_WRITE | MAY_EXEC));
 }
 
-static int mdd_name_insert(const struct lu_env *env,
-                           struct md_object *pobj,
+/*
+ * Partial operation.
+ */
+static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
                            const char *name, const struct lu_fid *fid,
-                           int isdir)
+                           int is_dir)
 {
+        struct lu_attr   *la = &mdd_env_info(env)->mti_la;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device *mdd = mdo2mdd(pobj);
-        struct thandle *handle;
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -621,56 +661,57 @@ static int mdd_name_insert(const struct lu_env *env,
                 RETURN(PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
         rc = mdd_ni_sanity_check(env, pobj, name, fid);
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
-                                BYPASS_CAPA);
-
+        rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
+                                handle, BYPASS_CAPA);
+        if (rc == 0) {
+                la->la_ctime = la->la_atime = CURRENT_SECONDS;
+                la->la_valid = LA_ATIME | LA_CTIME;
+                rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+        }
+        EXIT;
 out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
+out_trans:
         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
-/*
- * Be aware, this is called with write lock taken, so we use locksless version
- * of __mdd_lookup() here.
- */
 static int mdd_nr_sanity_check(const struct lu_env *env,
                                struct md_object *pobj,
                                const char *name)
 {
-        struct mdd_object *obj       = md2mdd_obj(pobj);
-#if 0
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_fid     *fid       = &info->mti_fid;
-        int rc;
-#endif
+        struct mdd_object *obj = md2mdd_obj(pobj);
         ENTRY;
 
         /* EEXIST check */
-        if (mdd_is_dead_obj(obj))
+        if (mdd_is_dead_obj(obj)) {
+                CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
                 RETURN(-ENOENT);
+        }
 
-         /* The exist of the name will be checked in _index_delete. */
-#if 0
-        rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
-        RETURN(rc);
-#endif
-        RETURN(mdd_permission_internal_locked(env, obj,
+        /* Name presense will be checked in _index_delete. */
+        RETURN(mdd_permission_internal_locked(env, obj, NULL,
                                               MAY_WRITE | MAY_EXEC));
 }
 
+/*
+ * Partial operation.
+ */
 static int mdd_name_remove(const struct lu_env *env,
                            struct md_object *pobj,
                            const char *name, int is_dir)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
-        struct thandle *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -680,18 +721,28 @@ static int mdd_name_remove(const struct lu_env *env,
                 RETURN(PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
         rc = mdd_nr_sanity_check(env, pobj, name);
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
-                                BYPASS_CAPA);
+        rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
+                                handle, BYPASS_CAPA);
+        if (rc)
+                GOTO(out_unlock, rc);
 
+        la->la_ctime = la->la_mtime = CURRENT_SECONDS;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+        EXIT;
 out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
+out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
+
 static int mdd_rt_sanity_check(const struct lu_env *env,
                                struct mdd_object *tgt_pobj,
                                struct mdd_object *tobj,
@@ -723,11 +774,12 @@ static int mdd_rename_tgt(const struct lu_env *env,
                           const struct lu_fid *lf, const char *name,
                           struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
-        struct thandle *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -737,16 +789,20 @@ static int mdd_rename_tgt(const struct lu_env *env,
                 RETURN(PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
         if (mdd_tobj)
                 mdd_write_lock(env, mdd_tobj);
 
-        /*TODO rename sanity checking*/
+        /* XXX: Rename sanity checking. */
         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* if rename_tgt is called then we should just re-insert name with
-         * correct fid, no need to dec/inc parent nlink if obj is dir */
+        /*
+         * If rename_tgt is called then we should just re-insert name with
+         * correct fid, no need to dec/inc parent nlink if obj is dir.
+         */
         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
         if (rc)
                 GOTO(cleanup, rc);
@@ -756,47 +812,49 @@ static int mdd_rename_tgt(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
-        if (tobj && lu_object_exists(&tobj->mo_lu))
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        if (tobj && lu_object_exists(&tobj->mo_lu)) {
                 mdd_ref_del_internal(env, mdd_tobj, handle);
+                la->la_valid = LA_CTIME;
+                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
+        }
+        EXIT;
 cleanup:
         if (tobj)
                 mdd_write_unlock(env, mdd_tobj);
         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
+out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
 /*
- * The permission has been checked when obj created,
- * no need check again.
+ * The permission has been checked when obj created, no need check again.
  */
 static int mdd_cd_sanity_check(const struct lu_env *env,
                                struct mdd_object *obj)
 {
-        int rc = 0;
         ENTRY;
 
         /* EEXIST check */
         if (!obj || mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-#if 0
-        mdd_read_lock(env, obj);
-        rc = mdd_permission_internal(env, obj, MAY_WRITE);
-        mdd_read_unlock(env, obj);
-#endif
-
-        RETURN(rc);
+        RETURN(0);
 
 }
 
-static int mdd_create_data(const struct lu_env *env,
-                           struct md_object *pobj, struct md_object *cobj,
-                           const struct md_create_spec *spec,
+static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
+                           struct md_object *cobj, const struct md_op_spec *spec,
                            struct md_attr *ma)
 {
         struct mdd_device *mdd = mdo2mdd(cobj);
-        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
+        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(cobj);
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
@@ -810,20 +868,22 @@ static int mdd_create_data(const struct lu_env *env,
                 RETURN(rc);
 
         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
-                        !(spec->sp_cr_flags & FMODE_WRITE))
+            !(spec->sp_cr_flags & FMODE_WRITE))
                 RETURN(0);
-        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
-                            attr);
+
+        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
+                            spec, attr);
         if (rc)
                 RETURN(rc);
 
         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(rc = PTR_ERR(handle));
+                GOTO(out_free, rc = PTR_ERR(handle));
 
         /*
          * XXX: Setting the lov ea is not locked but setting the attr is locked?
+         * Should this be fixed?
          */
 
         /* Replay creates has objects already */
@@ -839,9 +899,10 @@ static int mdd_create_data(const struct lu_env *env,
         if (rc == 0)
                rc = mdd_attr_get_internal_locked(env, son, ma);
 
+        mdd_trans_stop(env, mdd, rc, handle);
+out_free:
         /* Finish mdd_lov_create() stuff. */
         mdd_lov_create_finish(env, mdd, rc);
-        mdd_trans_stop(env, mdd, rc, handle);
         if (lmm)
                 OBD_FREE(lmm, lmm_size);
         RETURN(rc);
@@ -849,17 +910,17 @@ static int mdd_create_data(const struct lu_env *env,
 
 static int
 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
-             const char *name, const struct lu_fid* fid, int mask)
+             const char *name, struct lu_fid* fid, int mask)
 {
+        const struct dt_key *key = (const struct dt_key *)name;
         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
         struct dt_object    *dir = mdd_object_child(mdd_obj);
         struct dt_rec       *rec = (struct dt_rec *)fid;
-        const struct dt_key *key = (const struct dt_key *)name;
-        struct timeval      start;
+        struct timeval       start;
         int rc;
         ENTRY;
 
-        mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
+        mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
         if (mdd_is_dead_obj(mdd_obj))
                 RETURN(-ESTALE);
 
@@ -872,33 +933,32 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                 LBUG();
         }
 
-#if 0
-        if (mask == MAY_EXEC)
-                rc = mdd_exec_permission_lite(env, mdd_obj);
-        else
-#endif
-        rc = mdd_permission_internal_locked(env, mdd_obj, mask);
+        rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
         if (rc)
                 RETURN(rc);
 
-        if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
+        if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
                                                    mdd_object_capa(env, mdd_obj));
-        else
+                if (rc == 0)
+                        fid_be_to_cpu(fid, fid);
+        } else
                 rc = -ENOTDIR;
 
-        mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
+        mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
         RETURN(rc);
 }
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
-                          struct mdd_object *child, struct md_attr *ma, 
+                          struct mdd_object *child, struct md_attr *ma,
                           struct thandle *handle)
 {
         int rc;
         ENTRY;
 
-        /* update attributes for child.
+        /*
+         * Update attributes for child.
+         *
          * FIXME:
          *  (1) the valid bits should be converted between Lustre and Linux;
          *  (2) maybe, the child attributes should be set in OSD when creation.
@@ -909,7 +969,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                 RETURN(rc);
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
-                /* add . and .. for newly created dir */
+                /* Add "." and ".." for newly created dir */
                 mdd_ref_add_internal(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
@@ -935,7 +995,9 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
 static int mdd_create_sanity_check(const struct lu_env *env,
                                    struct md_object *pobj,
-                                   const char *name, struct md_attr *ma)
+                                   const char *name,
+                                   struct md_attr *ma,
+                                   int lookup)
 {
         struct mdd_thread_info *info = mdd_env_info(env);
         struct lu_attr    *la        = &info->mti_la;
@@ -949,19 +1011,31 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         /*
-         * Check if the name already exist, though it will be checked
-         * in _index_insert also, for avoiding rolling back if exists
-         * _index_insert.
+         * In some cases this lookup is not needed - we know before if name
+         * exists or not because MDT performs lookup for it.
          */
-        rc = __mdd_lookup_locked(env, pobj, name, fid,
-                                 MAY_WRITE | MAY_EXEC);
-        if (rc != -ENOENT)
-                RETURN(rc ? : -EEXIST);
+        /* XXX disable that lookup temporary */
+        if (0 && lookup) {
+                /*
+                 * Check if the name already exist, though it will be checked in
+                 * _index_insert also, for avoiding rolling back if exists
+                 * _index_insert.
+                 */
+                rc = __mdd_lookup_locked(env, pobj, name, fid,
+                                         MAY_WRITE | MAY_EXEC);
+                if (rc != -ENOENT)
+                        RETURN(rc ? : -EEXIST);
+        } else {
+                /*
+                 * Check if has WRITE permission for the parent.
+                 */
+                rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
+                if (rc)
+                        RETURN(rc);
+        }
 
         /* sgid check */
-        mdd_read_lock(env, obj);
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-        mdd_read_unlock(env, obj);
         if (rc != 0)
                 RETURN(rc);
 
@@ -974,8 +1048,12 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         }
 
         switch (ma->ma_attr.la_mode & S_IFMT) {
+        case S_IFDIR: {
+                struct mdd_device *mdd = mdo2mdd(pobj);
+                if (la->la_nlink >= mdd->mdd_dt_conf.ddp_max_nlink)
+                        RETURN(-EMLINK);
+        }
         case S_IFREG:
-        case S_IFDIR:
         case S_IFLNK:
         case S_IFCHR:
         case S_IFBLK:
@@ -996,13 +1074,13 @@ static int mdd_create_sanity_check(const struct lu_env *env,
 static int mdd_create(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
                       struct md_object *child,
-                      struct md_create_spec *spec,
+                      struct md_op_spec *spec,
                       struct md_attr* ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(child);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
         struct thandle    *handle;
@@ -1011,7 +1089,8 @@ static int mdd_create(const struct lu_env *env,
         struct timeval  start;
         ENTRY;
 
-        mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
+        mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
+
         /*
          * Two operations have to be performed:
          *
@@ -1048,13 +1127,15 @@ static int mdd_create(const struct lu_env *env,
          *     2. insert            (__mdd_index_insert(), lookup again)
          */
 
-        /* sanity checks before big job */
-        rc = mdd_create_sanity_check(env, pobj, name, ma);
+        /* Sanity checks before big job. */
+        rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
         if (rc)
                 RETURN(rc);
 
-        /* no RPC inside the transaction, so OST objects should be created at
-         * first */
+        /*
+         * No RPC inside the transaction, so OST objects should be created at
+         * first.
+         */
         if (S_ISREG(attr->la_mode)) {
                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
                                     spec, attr);
@@ -1065,12 +1146,14 @@ static int mdd_create(const struct lu_env *env,
         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_free, rc = PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+        if (dlh == NULL)
+                GOTO(out_trans, rc = -ENOMEM);
 
         /*
-         * XXX check that link can be added to the parent in mkdir case.
+         * XXX: Check that link can be added to the parent in mkdir case.
          */
 
         mdd_write_lock(env, son);
@@ -1095,7 +1178,7 @@ static int mdd_create(const struct lu_env *env,
 #endif
 
         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
-                                     son, ma, handle);
+                                   son, ma, handle);
         mdd_write_unlock(env, son);
         if (rc)
                 /*
@@ -1112,10 +1195,11 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         inserted = 1;
-        /* replay creates has objects already */
+
+        /* Replay creates has objects already. */
         if (spec->u.sp_ea.no_lov_create) {
                 CDEBUG(D_INFO, "we already have lov ea\n");
-                LASSERT(lmm != NULL);
+                LASSERT(lmm == NULL);
                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
                 lmm_size = spec->u.sp_ea.eadatalen;
         }
@@ -1125,10 +1209,10 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(cleanup, rc);
         }
         if (lmm && lmm_size > 0) {
-                /* set Lov here, do not get lmm again later */
-                memcpy(ma->ma_lmm, lmm, lmm_size); 
+                /* Set Lov here, do not get lmm again later */
+                memcpy(ma->ma_lmm, lmm, lmm_size);
                 ma->ma_lmm_size = lmm_size;
-                ma->ma_valid |= MA_LOV; 
+                ma->ma_valid |= MA_LOV;
         }
 
         if (S_ISLNK(attr->la_mode)) {
@@ -1141,20 +1225,22 @@ static int mdd_create(const struct lu_env *env,
                 buf = mdd_buf_get_const(env, target_name, sym_len);
                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
                                                 mdd_object_capa(env, son));
+
                 if (rc == sym_len)
                         rc = 0;
                 else
-                        rc = -EFAULT;
+                        GOTO(cleanup, rc = -EFAULT);
         }
 
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* return attr back */
+        /* Return attr back. */
         rc = mdd_attr_get_internal_locked(env, son, ma);
+        EXIT;
 cleanup:
         if (rc && created) {
                 int rc2 = 0;
@@ -1173,19 +1259,22 @@ cleanup:
                         mdd_write_unlock(env, son);
                 }
         }
-        /* finish mdd_lov_create() stuff */
-        mdd_lov_create_finish(env, mdd, rc);
-        if (lmm)
-                OBD_FREE(lmm, lmm_size);
+
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
-        RETURN(rc);
+out_free:
+        if (lmm && !spec->u.sp_ea.no_lov_create)
+                OBD_FREE(lmm, lmm_size);
+        /* Finish mdd_lov_create() stuff */
+        mdd_lov_create_finish(env, mdd, rc);
+        mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
+        return rc;
 }
 
-/* 
+/*
  * Get locks on parents in proper order
- * RETURN: < 0 - error, rename_order if successful 
+ * RETURN: < 0 - error, rename_order if successful
  */
 enum rename_order {
         MDD_RN_SAME,
@@ -1193,7 +1282,8 @@ enum rename_order {
         MDD_RN_TGTSRC
 };
 
-static int mdd_rename_order(const struct lu_env *env, struct mdd_device *mdd,
+static int mdd_rename_order(const struct lu_env *env,
+                            struct mdd_device *mdd,
                             struct mdd_object *src_pobj,
                             struct mdd_object *tgt_pobj)
 {
@@ -1212,7 +1302,7 @@ static int mdd_rename_order(const struct lu_env *env, struct mdd_device *mdd,
         } else {
                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
                 if (rc == -EREMOTE)
-                        rc == 0;
+                        rc = 0;
 
                 if (rc == 1)
                         rc = MDD_RN_TGTSRC;
@@ -1237,7 +1327,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         /* The sobj maybe on the remote, check parent permission only here */
-        rc = mdd_permission_internal_locked(env, src_pobj,
+        rc = mdd_permission_internal_locked(env, src_pobj, NULL,
                                             MAY_WRITE | MAY_EXEC);
         if (rc)
                 RETURN(rc);
@@ -1246,14 +1336,12 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                 rc = mdd_may_create(env, tgt_pobj, NULL,
                                     (src_pobj != tgt_pobj));
         } else {
-                mdd_read_lock(env, tobj);
                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
                                     (src_pobj != tgt_pobj));
                 if (rc == 0)
                         if (S_ISDIR(mdd_object_type(tobj))
                             && mdd_dir_is_empty(env, tobj))
                                 rc = -ENOTEMPTY;
-                mdd_read_unlock(env, tobj);
         }
 
         RETURN(rc);
@@ -1265,16 +1353,16 @@ static int mdd_rename(const struct lu_env *env,
                       struct md_object *tobj, const char *tname,
                       struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(src_pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
+        struct mdd_device *mdd = mdo2mdd(src_pobj);
         struct mdd_object *mdd_sobj = NULL;
         struct mdd_object *mdd_tobj = NULL;
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
         int is_dir;
-        int rc, one_lock;
+        int rc;
         ENTRY;
 
         LASSERT(ma->ma_attr.la_mode & S_IFMT);
@@ -1296,14 +1384,14 @@ static int mdd_rename(const struct lu_env *env,
         if (rc < 0)
                 GOTO(cleanup_unlocked, rc);
 
-        /* get locks in determined order */
+        /* Get locks in determined order */
         if (rc == MDD_RN_SAME) {
                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
                 /* check hashes to determine do we need one lock or two */
                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
                 else
-                        tdlh = NULL;
+                        tdlh = sdlh;
         } else if (rc == MDD_RN_SRCTGT) {
                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
@@ -1311,7 +1399,9 @@ static int mdd_rename(const struct lu_env *env,
                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
         }
-        
+        if (sdlh == NULL || tdlh == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+
         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
                                      lf, is_dir, mdd_tobj);
         if (rc)
@@ -1336,25 +1426,26 @@ static int mdd_rename(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
+        *la = ma->ma_attr;
         mdd_sobj = mdd_object_find(env, mdd, lf);
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME;
         if (mdd_sobj) {
-                /*XXX: how to update ctime for remote sobj? */
-                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
-                                                  handle, 1);
+                la->la_valid = LA_CTIME;
+
+                /* XXX: How to update ctime for remote sobj? */
+                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
                 if (rc)
                         GOTO(cleanup, rc);
         }
         if (tobj && lu_object_exists(&tobj->mo_lu)) {
                 mdd_write_lock(env, mdd_tobj);
                 mdd_ref_del_internal(env, mdd_tobj, handle);
-                /* remove dot reference */
+
+                /* Remove dot reference. */
                 if (is_dir)
                         mdd_ref_del_internal(env, mdd_tobj, handle);
 
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+                la->la_valid = LA_CTIME;
+                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
                 if (rc)
                         GOTO(cleanup, rc);
 
@@ -1364,26 +1455,28 @@ static int mdd_rename(const struct lu_env *env,
                         GOTO(cleanup, rc);
         }
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         if (mdd_spobj != mdd_tpobj) {
-                la_copy->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
+                la->la_valid = LA_CTIME | LA_MTIME;
+                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
                                                   handle, 0);
         }
 
+        EXIT;
 cleanup:
-        mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
-        if (tdlh)
+        if (likely(tdlh) && sdlh != tdlh)
                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
+        if (likely(sdlh))
+                mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
 cleanup_unlocked:
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
                 mdd_object_put(env, mdd_sobj);
-        RETURN(rc);
+        return rc;
 }
 
 struct md_dir_operations mdd_dir_ops = {