Whamcloud - gitweb
add per-thread debugging flags. Use then to control CDEBUG().
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 7c67f16..aef052a 100644 (file)
@@ -69,7 +69,7 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
 
 static int mdd_lookup(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
-                      struct lu_fid* fid)
+                      struct lu_fid* fid, struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -85,6 +85,16 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
 }
 
 /*
+ * For root fid use special function, whcih does not compare version component
+ * of fid. Vresion component is different for root fids on all MDTs.
+ */
+static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
+{
+        return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
+                fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
+}
+
+/*
  * return 1: if lf is the fid of the ancestor of p1;
  * return 0: if not;
  *
@@ -107,15 +117,15 @@ static int mdd_is_parent(const struct lu_env *env,
         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
         pfid = &mdd_env_info(env)->mti_fid;
 
-        /* Do not lookup ".." in root, they do not exist there. */
-        if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
+        /* Check for root first. */
+        if (mdd_is_root(mdd, mdo2fid(p1)))
                 RETURN(0);
 
         for(;;) {
                 rc = mdd_parent_fid(env, p1, pfid);
                 if (rc)
                         GOTO(out, rc);
-                if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
+                if (mdd_is_root(mdd, pfid))
                         GOTO(out, rc = 0);
                 if (lu_fid_eq(pfid, lf))
                         GOTO(out, rc = 1);
@@ -162,14 +172,20 @@ static int mdd_is_subdir(const struct lu_env *env,
                 RETURN(0);
 
         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
-
+        if (rc == 0) {
+                /* found root */
+                fid_zero(sfid);
+        } else if (rc == 1) {
+                /* found @fid is parent */
+                *sfid = *fid;
+                rc = 0;
+        }
         RETURN(rc);
 }
 
-/*Check whether it may create the cobj under the pobj*/
-static int mdd_may_create(const struct lu_env *env,
-                          struct mdd_object *pobj, struct mdd_object *cobj,
-                          int need_check)
+/* Check whether it may create the cobj under the pobj */
+static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
+                          struct mdd_object *cobj, int need_check)
 {
         int rc = 0;
         ENTRY;
@@ -180,18 +196,13 @@ static int mdd_may_create(const struct lu_env *env,
         if (mdd_is_dead_obj(pobj))
                 RETURN(-ENOENT);
 
-        /*check pobj may create or not*/
         if (need_check)
-                rc = mdd_permission_internal_locked(env, pobj,
-                                             MAY_WRITE | MAY_EXEC);
+                rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                                    MAY_WRITE | MAY_EXEC);
 
         RETURN(rc);
 }
 
-/*
- * It's inline, so penalty for filesystems that don't use sticky bit is
- * minimal.
- */
 static inline int mdd_is_sticky(const struct lu_env *env,
                                 struct mdd_object *pobj,
                                 struct mdd_object *cobj)
@@ -206,14 +217,11 @@ static inline int mdd_is_sticky(const struct lu_env *env,
         } else if (tmp_la->la_uid == uc->mu_fsuid) {
                 return 0;
         } else {
-                mdd_read_lock(env, pobj);
                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
-                mdd_read_unlock(env, pobj);
                 if (rc)
                         return rc;
-                else if (!(tmp_la->la_mode & S_ISVTX))
-                        return 0;
-                else if (tmp_la->la_uid == uc->mu_fsuid)
+                else if (!(tmp_la->la_mode & S_ISVTX) ||
+                         (tmp_la->la_uid == uc->mu_fsuid))
                         return 0;
                 else
                         return !mdd_capable(uc, CAP_FOWNER);
@@ -246,7 +254,7 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EBUSY);
 
         } else if (S_ISDIR(mdd_object_type(cobj))) {
-                        RETURN(-EISDIR);
+                RETURN(-EISDIR);
         }
 
         if (pobj) {
@@ -257,9 +265,8 @@ static int mdd_may_delete(const struct lu_env *env,
                         RETURN(-EPERM);
 
                 if (need_check)
-                        rc = mdd_permission_internal_locked(env, pobj,
-                                                            MAY_WRITE |
-                                                            MAY_EXEC);
+                        rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                                     MAY_WRITE | MAY_EXEC);
         }
         RETURN(rc);
 }
@@ -270,22 +277,23 @@ int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
         int rc = 0;
         ENTRY;
 
+        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
+                RETURN(-EPERM);
+
+        if (S_ISDIR(mdd_object_type(src_obj)))
+                RETURN(-EPERM);
+
+        LASSERT(src_obj != tgt_obj);
         if (tgt_obj) {
                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
                 if (rc)
                         RETURN(rc);
         }
 
-        if (S_ISDIR(mdd_object_type(src_obj)))
-                RETURN(-EPERM);
-
-        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
-                RETURN(-EPERM);
-
         RETURN(rc);
 }
 
-const struct dt_rec *index_fid_key(const struct lu_env *env,
+const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
                                    const struct lu_fid *fid)
 {
         struct mdd_thread_info *info = mdd_env_info(env);
@@ -296,60 +304,50 @@ const struct dt_rec *index_fid_key(const struct lu_env *env,
 
 
 /* insert new index, add reference if isdir, update times */
-static int __mdd_index_insert(const struct lu_env *env,
-                             struct mdd_object *pobj, const struct lu_fid *lf,
-                             const char *name, int isdir, struct thandle *th,
-                             struct lustre_capa *capa)
+static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
+                              const struct lu_fid *lf, const char *name, int is_dir,
+                              struct thandle *handle, struct lustre_capa *capa)
 {
         struct dt_object *next = mdd_object_child(pobj);
-        struct timeval start;
-        int rc;
+        struct timeval    start;
+        int               rc;
         ENTRY;
 
-        mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
-                             LPROC_MDD_INDEX_INSERT);
-#if 0
-        struct lu_attr   *la = &mdd_env_info(env)->mti_la;
-#endif
-
-        if (dt_try_as_dir(env, next))
+        mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+                               LPROC_MDD_INDEX_INSERT);
+        if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_insert(env, next,
-                                                    index_fid_key(env, lf),
+                                                    __mdd_fid_rec(env, lf),
                                                     (const struct dt_key *)name,
-                                                    th, capa);
-        else
+                                                    handle, capa);
+        } else {
                 rc = -ENOTDIR;
+        }
 
         if (rc == 0) {
-                if (isdir) {
+                if (is_dir) {
                         mdd_write_lock(env, pobj);
-                        mdd_ref_add_internal(env, pobj, th);
+                        mdd_ref_add_internal(env, pobj, handle);
                         mdd_write_unlock(env, pobj);
                 }
-#if 0
-                la->la_valid = LA_MTIME|LA_CTIME;
-                la->la_atime = ma->ma_attr.la_atime;
-                la->la_ctime = ma->ma_attr.la_ctime;
-                rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
-#endif
         }
-        mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
-                           LPROC_MDD_INDEX_INSERT);
-        return rc;
+        mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+                             LPROC_MDD_INDEX_INSERT);
+        RETURN(rc);
 }
 
-static int __mdd_index_delete(const struct lu_env *env,
-                              struct mdd_object *pobj, const char *name,
-                              int is_dir, struct thandle *handle,
+static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
+                              const char *name, int is_dir, struct thandle *handle,
                               struct lustre_capa *capa)
 {
         struct dt_object *next = mdd_object_child(pobj);
-        struct timeval start;
-        int rc;
+        struct timeval    start;
+        int               rc;
         ENTRY;
 
-        mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
-                             LPROC_MDD_INDEX_DELETE);
+        mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+                               LPROC_MDD_INDEX_DELETE);
+
         if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_delete(env, next,
                                                     (struct dt_key *)name,
@@ -361,27 +359,29 @@ static int __mdd_index_delete(const struct lu_env *env,
                 }
         } else
                 rc = -ENOTDIR;
-        mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
-                           LPROC_MDD_INDEX_DELETE);
+
+        mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+                             LPROC_MDD_INDEX_DELETE);
         RETURN(rc);
 }
 
-static int __mdd_index_insert_only(const struct lu_env *env,
-                                   struct mdd_object *pobj,
-                                   const struct lu_fid *lf,
-                                   const char *name, struct thandle *th,
-                                   struct lustre_capa *capa)
+static int
+__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
+                        const struct lu_fid *lf, const char *name,
+                        struct thandle *handle, struct lustre_capa *capa)
 {
-        int rc;
         struct dt_object *next = mdd_object_child(pobj);
+        int               rc;
         ENTRY;
 
-        if (dt_try_as_dir(env, next))
+        if (dt_try_as_dir(env, next)) {
                 rc = next->do_index_ops->dio_insert(env, next,
-                                         index_fid_key(env, lf),
-                                         (const struct dt_key *)name, th, capa);
-        else
+                                                    __mdd_fid_rec(env, lf),
+                                                    (const struct dt_key *)name,
+                                                    handle, capa);
+        } else {
                 rc = -ENOTDIR;
+        }
         RETURN(rc);
 }
 
@@ -389,12 +389,12 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const char *name,
                     struct md_attr *ma)
 {
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
         struct mdd_device *mdd = mdo2mdd(src_obj);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct thandle *handle;
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -410,29 +410,37 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
                                      name, handle,
                                      mdd_object_capa(env, mdd_tobj));
-        if (rc == 0)
-                mdd_ref_add_internal(env, mdd_sobj, handle);
-
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME;
-        rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_unlock, rc);
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
+        mdd_ref_add_internal(env, mdd_sobj, handle);
 
-out:
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
+        if (rc)
+                GOTO(out_unlock, rc);
+
+        la->la_valid = LA_CTIME;
+        rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
+        EXIT;
+out_unlock:
         mdd_write_unlock(env, mdd_sobj);
         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
+}
+
+static inline void mdd_set_dead_obj(struct mdd_object *obj)
+{
+        if (obj)
+                obj->mod_flags |= DEAD_OBJ;
 }
 
 /* caller should take a lock before calling */
@@ -448,16 +456,16 @@ int mdd_finish_unlink(const struct lu_env *env,
                 /* add new orphan and the object
                  * will be deleted during the object_put() */
                 if (__mdd_orphan_add(env, obj, th) == 0)
-                        set_bit(LU_OBJECT_ORPHAN,
-                                &mdd2lu_obj(obj)->lo_header->loh_flags);
+                        obj->mod_flags |= ORPHAN_OBJ;
 
+                mdd_set_dead_obj(obj);
                 if (obj->mod_count == 0)
                         rc = mdd_object_kill(env, obj, ma);
                 else
-                        /* clear MA_LOV | MA_COOKIE, if we do not 
+                        /* clear MA_LOV | MA_COOKIE, if we do not
                          * unlink it in case we get it somewhere */
                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
-        } else 
+        } else
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
 
         RETURN(rc);
@@ -484,7 +492,7 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
         obj = mdd_object_child(dir);
         iops = &obj->do_index_ops->dio_it;
-        it = iops->init(env, obj, 0);
+        it = iops->init(env, obj, 0, BYPASS_CAPA);
         if (it != NULL) {
                 result = iops->get(env, it, (const void *)"");
                 if (result > 0) {
@@ -530,20 +538,33 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
-static int mdd_unlink(const struct lu_env *env,
-                      struct md_object *pobj, struct md_object *cobj,
-                      const char *name, struct md_attr *ma)
+static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
+                      struct md_object *cobj, const char *name,
+                      struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-        struct thandle    *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle    *handle;
         int rc, is_dir;
         ENTRY;
 
-        rc = mdd_log_txn_param_build(env, mdd_cobj, ma, MDD_TXN_UNLINK_OP);
+        /*
+         * Check -ENOENT early here because we need to get object type
+         * to calculate credits before transaction start
+         */
+        if (!lu_object_exists(&cobj->mo_lu)) {
+                LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
+                                "unlinking as `%s'", name);
+                RETURN(-ENOENT);
+        }
+
+        LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
+                 PFID(lu_object_fid(&cobj->mo_lu)));
+
+        rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
         if (rc)
                 RETURN(rc);
 
@@ -561,25 +582,29 @@ static int mdd_unlink(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
+
+        current->debugging1 |= 0x1; /* XXX enable lvar_enoent_debug
+                                     * debugging */
         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
                                 mdd_object_capa(env, mdd_pobj));
+        current->debugging1 &= ~0x1;
         if (rc)
                 GOTO(cleanup, rc);
 
         mdd_ref_del_internal(env, mdd_cobj, handle);
-        *la_copy = ma->ma_attr;
         if (is_dir) {
                 /* unlink dot */
                 mdd_ref_del_internal(env, mdd_cobj, handle);
-        } else {
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
-                if (rc)
-                        GOTO(cleanup, rc);
         }
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        la->la_valid = LA_CTIME;
+        rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -589,54 +614,44 @@ static int mdd_unlink(const struct lu_env *env,
                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
                                    strlen("unlinked"), "unlinked", 0,
                                    NULL, NULL);
+        EXIT;
 cleanup:
         mdd_write_unlock(env, mdd_cobj);
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
-/*
- * Partial operation. Be aware, this is called with write lock taken, so we use
- * locksless version of __mdd_lookup() here.
- */
 static int mdd_ni_sanity_check(const struct lu_env *env,
                                struct md_object *pobj,
                                const char *name,
                                const struct lu_fid *fid)
 {
-        struct mdd_object *obj       = md2mdd_obj(pobj);
-#if 0
-        int rc;
-#endif
+        struct mdd_object *obj = md2mdd_obj(pobj);
         ENTRY;
 
         /* EEXIST check */
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-         /* The exist of the name will be checked in _index_insert. */
-#if 0
-        rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
-        if (rc != -ENOENT)
-                RETURN(rc ? : -EEXIST);
-        else
-                RETURN(0);
-#endif
-        RETURN(mdd_permission_internal_locked(env, obj,
+        /* The exist of the name will be checked in _index_insert. */
+        RETURN(mdd_permission_internal_locked(env, obj, NULL,
                                               MAY_WRITE | MAY_EXEC));
 }
 
-static int mdd_name_insert(const struct lu_env *env,
-                           struct md_object *pobj,
+/*
+ * Partial operation.
+ */
+static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
                            const char *name, const struct lu_fid *fid,
-                           int isdir)
+                           int is_dir)
 {
+        struct lu_attr   *la = &mdd_env_info(env)->mti_la;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
         struct mdd_device *mdd = mdo2mdd(pobj);
-        struct thandle *handle;
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -652,53 +667,51 @@ static int mdd_name_insert(const struct lu_env *env,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
-                                BYPASS_CAPA);
-
+        rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
+                                handle, BYPASS_CAPA);
+        if (rc == 0) {
+                la->la_ctime = la->la_atime = CURRENT_SECONDS;
+                la->la_valid = LA_ATIME | LA_CTIME;
+                rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+        }
+        EXIT;
 out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
 out_trans:
         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
-/*
- * Be aware, this is called with write lock taken, so we use locksless version
- * of __mdd_lookup() here.
- */
 static int mdd_nr_sanity_check(const struct lu_env *env,
                                struct md_object *pobj,
                                const char *name)
 {
-        struct mdd_object *obj       = md2mdd_obj(pobj);
-#if 0
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_fid     *fid       = &info->mti_fid;
-        int rc;
-#endif
+        struct mdd_object *obj = md2mdd_obj(pobj);
         ENTRY;
 
         /* EEXIST check */
-        if (mdd_is_dead_obj(obj))
+        if (mdd_is_dead_obj(obj)) {
+                CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
                 RETURN(-ENOENT);
+        }
 
-         /* The exist of the name will be checked in _index_delete. */
-#if 0
-        rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
-        RETURN(rc);
-#endif
-        RETURN(mdd_permission_internal_locked(env, obj,
+        /* Name presense will be checked in _index_delete. */
+        RETURN(mdd_permission_internal_locked(env, obj, NULL,
                                               MAY_WRITE | MAY_EXEC));
 }
 
+/*
+ * Partial operation.
+ */
 static int mdd_name_remove(const struct lu_env *env,
                            struct md_object *pobj,
                            const char *name, int is_dir)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
-        struct thandle *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -714,15 +727,22 @@ static int mdd_name_remove(const struct lu_env *env,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
-                                BYPASS_CAPA);
+        rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
+                                handle, BYPASS_CAPA);
+        if (rc)
+                GOTO(out_unlock, rc);
 
+        la->la_ctime = la->la_mtime = CURRENT_SECONDS;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+        EXIT;
 out_unlock:
         mdd_pdo_write_unlock(env, mdd_obj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
+
 static int mdd_rt_sanity_check(const struct lu_env *env,
                                struct mdd_object *tgt_pobj,
                                struct mdd_object *tobj,
@@ -754,11 +774,12 @@ static int mdd_rename_tgt(const struct lu_env *env,
                           const struct lu_fid *lf, const char *name,
                           struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
-        struct thandle *handle;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
+        struct thandle *handle;
         int rc;
         ENTRY;
 
@@ -773,13 +794,15 @@ static int mdd_rename_tgt(const struct lu_env *env,
         if (mdd_tobj)
                 mdd_write_lock(env, mdd_tobj);
 
-        /*TODO rename sanity checking*/
+        /* XXX: Rename sanity checking. */
         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* if rename_tgt is called then we should just re-insert name with
-         * correct fid, no need to dec/inc parent nlink if obj is dir */
+        /*
+         * If rename_tgt is called then we should just re-insert name with
+         * correct fid, no need to dec/inc parent nlink if obj is dir.
+         */
         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
         if (rc)
                 GOTO(cleanup, rc);
@@ -789,48 +812,49 @@ static int mdd_rename_tgt(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
-        if (tobj && lu_object_exists(&tobj->mo_lu))
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        if (tobj && lu_object_exists(&tobj->mo_lu)) {
                 mdd_ref_del_internal(env, mdd_tobj, handle);
+                la->la_valid = LA_CTIME;
+                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
+        }
+        EXIT;
 cleanup:
         if (tobj)
                 mdd_write_unlock(env, mdd_tobj);
         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        RETURN(rc);
+        return rc;
 }
 
 /*
- * The permission has been checked when obj created,
- * no need check again.
+ * The permission has been checked when obj created, no need check again.
  */
 static int mdd_cd_sanity_check(const struct lu_env *env,
                                struct mdd_object *obj)
 {
-        int rc = 0;
         ENTRY;
 
         /* EEXIST check */
         if (!obj || mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-#if 0
-        mdd_read_lock(env, obj);
-        rc = mdd_permission_internal(env, obj, MAY_WRITE);
-        mdd_read_unlock(env, obj);
-#endif
-
-        RETURN(rc);
+        RETURN(0);
 
 }
 
-static int mdd_create_data(const struct lu_env *env,
-                           struct md_object *pobj, struct md_object *cobj,
-                           const struct md_create_spec *spec,
+static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
+                           struct md_object *cobj, const struct md_op_spec *spec,
                            struct md_attr *ma)
 {
         struct mdd_device *mdd = mdo2mdd(cobj);
-        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
+        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(cobj);
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
@@ -844,20 +868,22 @@ static int mdd_create_data(const struct lu_env *env,
                 RETURN(rc);
 
         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
-                        !(spec->sp_cr_flags & FMODE_WRITE))
+            !(spec->sp_cr_flags & FMODE_WRITE))
                 RETURN(0);
-        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
-                            attr);
+
+        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
+                            spec, attr);
         if (rc)
                 RETURN(rc);
 
         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(rc = PTR_ERR(handle));
+                GOTO(out_free, rc = PTR_ERR(handle));
 
         /*
          * XXX: Setting the lov ea is not locked but setting the attr is locked?
+         * Should this be fixed?
          */
 
         /* Replay creates has objects already */
@@ -873,9 +899,10 @@ static int mdd_create_data(const struct lu_env *env,
         if (rc == 0)
                rc = mdd_attr_get_internal_locked(env, son, ma);
 
+        mdd_trans_stop(env, mdd, rc, handle);
+out_free:
         /* Finish mdd_lov_create() stuff. */
         mdd_lov_create_finish(env, mdd, rc);
-        mdd_trans_stop(env, mdd, rc, handle);
         if (lmm)
                 OBD_FREE(lmm, lmm_size);
         RETURN(rc);
@@ -885,15 +912,15 @@ static int
 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
              const char *name, struct lu_fid* fid, int mask)
 {
+        const struct dt_key *key = (const struct dt_key *)name;
         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
         struct dt_object    *dir = mdd_object_child(mdd_obj);
         struct dt_rec       *rec = (struct dt_rec *)fid;
-        const struct dt_key *key = (const struct dt_key *)name;
-        struct timeval      start;
+        struct timeval       start;
         int rc;
         ENTRY;
 
-        mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
+        mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
         if (mdd_is_dead_obj(mdd_obj))
                 RETURN(-ESTALE);
 
@@ -906,12 +933,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                 LBUG();
         }
 
-#if 0
-        if (mask == MAY_EXEC)
-                rc = mdd_exec_permission_lite(env, mdd_obj);
-        else
-#endif
-        rc = mdd_permission_internal_locked(env, mdd_obj, mask);
+        rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
         if (rc)
                 RETURN(rc);
 
@@ -923,7 +945,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         } else
                 rc = -ENOTDIR;
 
-        mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
+        mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
         RETURN(rc);
 }
 
@@ -934,7 +956,9 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
         int rc;
         ENTRY;
 
-        /* update attributes for child.
+        /*
+         * Update attributes for child.
+         *
          * FIXME:
          *  (1) the valid bits should be converted between Lustre and Linux;
          *  (2) maybe, the child attributes should be set in OSD when creation.
@@ -945,7 +969,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                 RETURN(rc);
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
-                /* add . and .. for newly created dir */
+                /* Add "." and ".." for newly created dir */
                 mdd_ref_add_internal(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
@@ -971,7 +995,9 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
 static int mdd_create_sanity_check(const struct lu_env *env,
                                    struct md_object *pobj,
-                                   const char *name, struct md_attr *ma)
+                                   const char *name,
+                                   struct md_attr *ma,
+                                   int lookup)
 {
         struct mdd_thread_info *info = mdd_env_info(env);
         struct lu_attr    *la        = &info->mti_la;
@@ -985,19 +1011,31 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         /*
-         * Check if the name already exist, though it will be checked
-         * in _index_insert also, for avoiding rolling back if exists
-         * _index_insert.
+         * In some cases this lookup is not needed - we know before if name
+         * exists or not because MDT performs lookup for it.
          */
-        rc = __mdd_lookup_locked(env, pobj, name, fid,
-                                 MAY_WRITE | MAY_EXEC);
-        if (rc != -ENOENT)
-                RETURN(rc ? : -EEXIST);
+        /* XXX disable that lookup temporary */
+        if (0 && lookup) {
+                /*
+                 * Check if the name already exist, though it will be checked in
+                 * _index_insert also, for avoiding rolling back if exists
+                 * _index_insert.
+                 */
+                rc = __mdd_lookup_locked(env, pobj, name, fid,
+                                         MAY_WRITE | MAY_EXEC);
+                if (rc != -ENOENT)
+                        RETURN(rc ? : -EEXIST);
+        } else {
+                /*
+                 * Check if has WRITE permission for the parent.
+                 */
+                rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
+                if (rc)
+                        RETURN(rc);
+        }
 
         /* sgid check */
-        mdd_read_lock(env, obj);
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-        mdd_read_unlock(env, obj);
         if (rc != 0)
                 RETURN(rc);
 
@@ -1010,8 +1048,12 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         }
 
         switch (ma->ma_attr.la_mode & S_IFMT) {
+        case S_IFDIR: {
+                struct mdd_device *mdd = mdo2mdd(pobj);
+                if (la->la_nlink >= mdd->mdd_dt_conf.ddp_max_nlink)
+                        RETURN(-EMLINK);
+        }
         case S_IFREG:
-        case S_IFDIR:
         case S_IFLNK:
         case S_IFCHR:
         case S_IFBLK:
@@ -1032,13 +1074,13 @@ static int mdd_create_sanity_check(const struct lu_env *env,
 static int mdd_create(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
                       struct md_object *child,
-                      struct md_create_spec *spec,
+                      struct md_op_spec *spec,
                       struct md_attr* ma)
 {
-        struct mdd_device *mdd = mdo2mdd(pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(child);
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
         struct thandle    *handle;
@@ -1047,7 +1089,8 @@ static int mdd_create(const struct lu_env *env,
         struct timeval  start;
         ENTRY;
 
-        mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
+        mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
+
         /*
          * Two operations have to be performed:
          *
@@ -1084,13 +1127,15 @@ static int mdd_create(const struct lu_env *env,
          *     2. insert            (__mdd_index_insert(), lookup again)
          */
 
-        /* sanity checks before big job */
-        rc = mdd_create_sanity_check(env, pobj, name, ma);
+        /* Sanity checks before big job. */
+        rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
         if (rc)
                 RETURN(rc);
 
-        /* no RPC inside the transaction, so OST objects should be created at
-         * first */
+        /*
+         * No RPC inside the transaction, so OST objects should be created at
+         * first.
+         */
         if (S_ISREG(attr->la_mode)) {
                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
                                     spec, attr);
@@ -1101,14 +1146,14 @@ static int mdd_create(const struct lu_env *env,
         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
         handle = mdd_trans_start(env, mdd);
         if (IS_ERR(handle))
-                RETURN(PTR_ERR(handle));
+                GOTO(out_free, rc = PTR_ERR(handle));
 
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
 
         /*
-         * XXX check that link can be added to the parent in mkdir case.
+         * XXX: Check that link can be added to the parent in mkdir case.
          */
 
         mdd_write_lock(env, son);
@@ -1150,7 +1195,8 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(cleanup, rc);
 
         inserted = 1;
-        /* replay creates has objects already */
+
+        /* Replay creates has objects already. */
         if (spec->u.sp_ea.no_lov_create) {
                 CDEBUG(D_INFO, "we already have lov ea\n");
                 LASSERT(lmm == NULL);
@@ -1163,7 +1209,7 @@ static int mdd_create(const struct lu_env *env,
                 GOTO(cleanup, rc);
         }
         if (lmm && lmm_size > 0) {
-                /* set Lov here, do not get lmm again later */
+                /* Set Lov here, do not get lmm again later */
                 memcpy(ma->ma_lmm, lmm, lmm_size);
                 ma->ma_lmm_size = lmm_size;
                 ma->ma_valid |= MA_LOV;
@@ -1179,20 +1225,22 @@ static int mdd_create(const struct lu_env *env,
                 buf = mdd_buf_get_const(env, target_name, sym_len);
                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
                                                 mdd_object_capa(env, son));
+
                 if (rc == sym_len)
                         rc = 0;
                 else
-                        rc = -EFAULT;
+                        GOTO(cleanup, rc = -EFAULT);
         }
 
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
+        *la = ma->ma_attr;
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
-        /* return attr back */
+        /* Return attr back. */
         rc = mdd_attr_get_internal_locked(env, son, ma);
+        EXIT;
 cleanup:
         if (rc && created) {
                 int rc2 = 0;
@@ -1211,15 +1259,17 @@ cleanup:
                         mdd_write_unlock(env, son);
                 }
         }
-        /* finish mdd_lov_create() stuff */
-        mdd_lov_create_finish(env, mdd, rc);
-        if (lmm && !spec->u.sp_ea.no_lov_create)
-                OBD_FREE(lmm, lmm_size);
+
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
         mdd_trans_stop(env, mdd, rc, handle);
-        mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
-        RETURN(rc);
+out_free:
+        if (lmm && !spec->u.sp_ea.no_lov_create)
+                OBD_FREE(lmm, lmm_size);
+        /* Finish mdd_lov_create() stuff */
+        mdd_lov_create_finish(env, mdd, rc);
+        mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
+        return rc;
 }
 
 /*
@@ -1232,7 +1282,8 @@ enum rename_order {
         MDD_RN_TGTSRC
 };
 
-static int mdd_rename_order(const struct lu_env *env, struct mdd_device *mdd,
+static int mdd_rename_order(const struct lu_env *env,
+                            struct mdd_device *mdd,
                             struct mdd_object *src_pobj,
                             struct mdd_object *tgt_pobj)
 {
@@ -1276,7 +1327,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                 RETURN(-ENOENT);
 
         /* The sobj maybe on the remote, check parent permission only here */
-        rc = mdd_permission_internal_locked(env, src_pobj,
+        rc = mdd_permission_internal_locked(env, src_pobj, NULL,
                                             MAY_WRITE | MAY_EXEC);
         if (rc)
                 RETURN(rc);
@@ -1285,14 +1336,12 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                 rc = mdd_may_create(env, tgt_pobj, NULL,
                                     (src_pobj != tgt_pobj));
         } else {
-                mdd_read_lock(env, tobj);
                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
                                     (src_pobj != tgt_pobj));
                 if (rc == 0)
                         if (S_ISDIR(mdd_object_type(tobj))
                             && mdd_dir_is_empty(env, tobj))
                                 rc = -ENOTEMPTY;
-                mdd_read_unlock(env, tobj);
         }
 
         RETURN(rc);
@@ -1304,12 +1353,12 @@ static int mdd_rename(const struct lu_env *env,
                       struct md_object *tobj, const char *tname,
                       struct md_attr *ma)
 {
-        struct mdd_device *mdd = mdo2mdd(src_pobj);
+        struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
+        struct mdd_device *mdd = mdo2mdd(src_pobj);
         struct mdd_object *mdd_sobj = NULL;
         struct mdd_object *mdd_tobj = NULL;
-        struct lu_attr    *la_copy = &mdd_env_info(env)->mti_la_for_fix;
         struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
         int is_dir;
@@ -1335,7 +1384,7 @@ static int mdd_rename(const struct lu_env *env,
         if (rc < 0)
                 GOTO(cleanup_unlocked, rc);
 
-        /* get locks in determined order */
+        /* Get locks in determined order */
         if (rc == MDD_RN_SAME) {
                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
                 /* check hashes to determine do we need one lock or two */
@@ -1377,25 +1426,26 @@ static int mdd_rename(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
+        *la = ma->ma_attr;
         mdd_sobj = mdd_object_find(env, mdd, lf);
-        *la_copy = ma->ma_attr;
-        la_copy->la_valid = LA_CTIME;
         if (mdd_sobj) {
-                /*XXX: how to update ctime for remote sobj? */
-                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
-                                                  handle, 1);
+                la->la_valid = LA_CTIME;
+
+                /* XXX: How to update ctime for remote sobj? */
+                rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
                 if (rc)
                         GOTO(cleanup, rc);
         }
         if (tobj && lu_object_exists(&tobj->mo_lu)) {
                 mdd_write_lock(env, mdd_tobj);
                 mdd_ref_del_internal(env, mdd_tobj, handle);
-                /* remove dot reference */
+
+                /* Remove dot reference. */
                 if (is_dir)
                         mdd_ref_del_internal(env, mdd_tobj, handle);
 
-                la_copy->la_valid = LA_CTIME;
-                rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+                la->la_valid = LA_CTIME;
+                rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
                 if (rc)
                         GOTO(cleanup, rc);
 
@@ -1405,17 +1455,18 @@ static int mdd_rename(const struct lu_env *env,
                         GOTO(cleanup, rc);
         }
 
-        la_copy->la_valid = LA_CTIME | LA_MTIME;
-        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
+        la->la_valid = LA_CTIME | LA_MTIME;
+        rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
         if (mdd_spobj != mdd_tpobj) {
-                la_copy->la_valid = LA_CTIME | LA_MTIME;
-                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
+                la->la_valid = LA_CTIME | LA_MTIME;
+                rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
                                                   handle, 0);
         }
 
+        EXIT;
 cleanup:
         if (likely(tdlh) && sdlh != tdlh)
                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
@@ -1425,7 +1476,7 @@ cleanup_unlocked:
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
                 mdd_object_put(env, mdd_sobj);
-        RETURN(rc);
+        return rc;
 }
 
 struct md_dir_operations mdd_dir_ops = {