Whamcloud - gitweb
LU-1866 lfsck: LFSCK for namespace consistency (3)
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index d4a6811..5fb1cf8 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -60,9 +60,6 @@ static struct lu_name lname_dotdot = {
 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                         const struct lu_name *lname, struct lu_fid* fid,
                         int mask);
-static int mdd_declare_links_add(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
-                                 struct thandle *handle);
 static inline int mdd_links_add(const struct lu_env *env,
                                struct mdd_object *mdd_obj,
                                const struct lu_fid *pfid,
@@ -110,8 +107,8 @@ int mdd_lookup(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
-                          struct lu_fid *fid)
+int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_fid *fid)
 {
         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
 }
@@ -165,16 +162,17 @@ static int mdd_is_parent(const struct lu_env *env,
                         GOTO(out, rc = 1);
                 if (parent)
                         mdd_object_put(env, parent);
-                parent = mdd_object_find(env, mdd, pfid);
-
-                /* cross-ref parent */
-                if (parent == NULL) {
-                        if (pf != NULL)
-                                *pf = *pfid;
-                        GOTO(out, rc = -EREMOTE);
-                } else if (IS_ERR(parent))
-                        GOTO(out, rc = PTR_ERR(parent));
-                p1 = parent;
+
+               parent = mdd_object_find(env, mdd, pfid);
+               if (IS_ERR(parent)) {
+                       GOTO(out, rc = PTR_ERR(parent));
+               } else if (mdd_object_remote(parent)) {
+                       /*FIXME: Because of the restriction of rename in Phase I.
+                        * If the parent is remote, we just assumed lf is not the
+                        * parent of P1 for now */
+                       GOTO(out, rc = 0);
+               }
+               p1 = parent;
         }
         EXIT;
 out:
@@ -296,7 +294,7 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        if (cobj && mdd_object_exists(cobj))
+       if (cobj && mdd_object_exists(cobj))
                 RETURN(-EEXIST);
 
         if (mdd_is_dead_obj(pobj))
@@ -369,7 +367,34 @@ static inline int mdd_is_sticky(const struct lu_env *env,
        if (tmp_la->la_uid == uc->uc_fsuid)
                return 0;
 
-       return !mdd_capable(uc, CFS_CAP_FOWNER);
+       return !md_capable(uc, CFS_CAP_FOWNER);
+}
+
+static int mdd_may_delete_entry(const struct lu_env *env,
+                               struct mdd_object *pobj, int check_perm)
+{
+       ENTRY;
+
+       LASSERT(pobj != NULL);
+       if (!mdd_object_exists(pobj))
+               RETURN(-ENOENT);
+
+       if (mdd_is_dead_obj(pobj))
+               RETURN(-ENOENT);
+
+       if (check_perm) {
+               int rc;
+               rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                           MAY_WRITE | MAY_EXEC,
+                                           MOR_TGT_PARENT);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       if (mdd_is_append(pobj))
+               RETURN(-EPERM);
+
+       RETURN(0);
 }
 
 /*
@@ -383,31 +408,21 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        LASSERT(cobj);
+       if (pobj) {
+               rc = mdd_may_delete_entry(env, pobj, check_perm);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       if (cobj == NULL)
+               RETURN(0);
+
         if (!mdd_object_exists(cobj))
                 RETURN(-ENOENT);
 
         if (mdd_is_dead_obj(cobj))
                 RETURN(-ESTALE);
 
-        if (pobj) {
-                if (!mdd_object_exists(pobj))
-                        RETURN(-ENOENT);
-
-                if (mdd_is_dead_obj(pobj))
-                        RETURN(-ENOENT);
-
-                if (check_perm) {
-                        rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                                    MAY_WRITE | MAY_EXEC,
-                                                    MOR_TGT_PARENT);
-                        if (rc)
-                                RETURN(rc);
-                }
-
-                if (mdd_is_append(pobj))
-                        RETURN(-EPERM);
-        }
 
        if (mdd_is_sticky(env, pobj, cobj))
                 RETURN(-EPERM);
@@ -770,14 +785,10 @@ int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
  * \param tname - target name string
  * \param handle - transacion handle
  */
-static int mdd_changelog_ns_store(const struct lu_env  *env,
-                                 struct mdd_device    *mdd,
-                                 enum changelog_rec_type type,
-                                 unsigned flags,
-                                 struct mdd_object    *target,
-                                 struct mdd_object    *parent,
-                                 const struct lu_name *tname,
-                                 struct thandle *handle)
+int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
+                          enum changelog_rec_type type, unsigned flags,
+                          struct mdd_object *target, struct mdd_object *parent,
+                          const struct lu_name *tname, struct thandle *handle)
 {
        struct llog_changelog_rec *rec;
        struct lu_buf *buf;
@@ -878,7 +889,6 @@ static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
        rec->cr.cr_namelen = tname->ln_namelen;
        memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen);
        if (sname) {
-               LASSERT(sfid != NULL);
                rec->cr.cr_name[tname->ln_namelen] = '\0';
                memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name,
                        sname->ln_namelen);
@@ -971,17 +981,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
-                                     name, handle,
-                                     mdd_object_capa(env, mdd_tobj));
-        if (rc)
-                GOTO(out_unlock, rc);
-
        rc = mdo_ref_add(env, mdd_sobj, handle);
+       if (rc)
+               GOTO(out_unlock, rc);
+
+
+       rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
+                                    name, handle,
+                                    mdd_object_capa(env, mdd_tobj));
        if (rc != 0) {
-               __mdd_index_delete_only(env, mdd_tobj, name, handle,
-                                       mdd_object_capa(env, mdd_tobj));
-                GOTO(out_unlock, rc);
+               mdo_ref_del(env, mdd_sobj, handle);
+               GOTO(out_unlock, rc);
        }
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -1014,15 +1024,15 @@ out_pending:
 }
 
 int mdd_declare_finish_unlink(const struct lu_env *env,
-                              struct mdd_object *obj,
-                              struct md_attr *ma,
-                              struct thandle *handle)
+                             struct mdd_object *obj,
+                             struct md_attr *ma,
+                             struct thandle *handle)
 {
-        int rc;
+       int     rc;
 
-        rc = orph_declare_index_insert(env, obj, handle);
-        if (rc)
-                return rc;
+       rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
+       if (rc)
+               return rc;
 
        return mdo_declare_destroy(env, obj, handle);
 }
@@ -1078,11 +1088,25 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
+static inline int mdd_declare_links_del(const struct lu_env *env,
+                                       struct mdd_object *c,
+                                       struct thandle *handle)
+{
+       int rc = 0;
+
+       /* For directory, the linkEA will be removed together with the object. */
+       if (!S_ISDIR(mdd_object_type(c)))
+               rc = mdd_declare_links_add(env, c, handle);
+
+       return rc;
+}
+
 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                               struct mdd_object *p, struct mdd_object *c,
                               const struct lu_name *name, struct md_attr *ma,
                               struct thandle *handle)
 {
+       struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
         int rc;
 
         rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
@@ -1093,33 +1117,39 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
         if (rc)
                 return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, p, la, handle);
+       if (rc)
+               return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       if (c != NULL) {
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, p, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, c, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_attr_set(env, c, NULL, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdd_declare_finish_unlink(env, c, ma, handle);
-        if (rc)
-                return rc;
+               rc = mdd_declare_finish_unlink(env, c, ma, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdd_declare_links_add(env, c, handle);
-        if (rc)
-                return rc;
+               rc = mdd_declare_links_del(env, c, handle);
+               if (rc != 0)
+                       return rc;
 
-        rc = mdd_declare_changelog_store(env, mdd, name, handle);
+               /* FIXME: need changelog for remove entry */
+               rc = mdd_declare_changelog_store(env, mdd, name, handle);
+       }
 
-        return rc;
+       return rc;
 }
 
 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
@@ -1130,74 +1160,96 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
+       struct mdd_object *mdd_cobj = NULL;
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle    *handle;
-       int rc, is_dir;
+       int rc, is_dir = 0;
         ENTRY;
 
-        if (mdd_object_exists(mdd_cobj) <= 0)
-                RETURN(-ENOENT);
+       /* cobj == NULL means only delete name entry */
+       if (likely(cobj != NULL)) {
+               mdd_cobj = md2mdd_obj(cobj);
+               if (mdd_object_exists(mdd_cobj) == 0)
+                       RETURN(-ENOENT);
+               /* currently it is assume, it could only delete
+                * name entry of remote directory */
+               is_dir = 1;
+       }
 
-        handle = mdd_trans_create(env, mdd);
+       handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
-                                lname, ma, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
+                               lname, ma, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
-        if (dlh == NULL)
+       dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
+       if (dlh == NULL)
                GOTO(stop, rc = -ENOMEM);
-        mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
-       /* fetch cattr */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
-        if (rc)
-                GOTO(cleanup, rc);
+       if (likely(mdd_cobj != NULL)) {
+               mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
+
+               /* fetch cattr */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
 
-       is_dir = S_ISDIR(cattr->la_mode);
+               is_dir = S_ISDIR(cattr->la_mode);
 
-       rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
-        if (rc)
-                GOTO(cleanup, rc);
+       }
 
-       rc = mdo_ref_del(env, mdd_cobj, handle);
-       if (rc != 0) {
-               __mdd_index_insert_only(env, mdd_pobj, mdo2fid(mdd_cobj),
-                                       name, handle,
-                                       mdd_object_capa(env, mdd_pobj));
+       rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
+       if (rc)
                GOTO(cleanup, rc);
-       }
 
        rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
                                mdd_object_capa(env, mdd_pobj));
        if (rc)
                GOTO(cleanup, rc);
 
-        if (is_dir)
-                /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+       if (likely(mdd_cobj != NULL)) {
+               rc = mdo_ref_del(env, mdd_cobj, handle);
+               if (rc != 0) {
+                       __mdd_index_insert_only(env, mdd_pobj,
+                                               mdo2fid(mdd_cobj),
+                                               name, handle,
+                                               mdd_object_capa(env, mdd_pobj));
+                       GOTO(cleanup, rc);
+               }
 
-       /* fetch updated nlink */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
-       if (rc)
-               GOTO(cleanup, rc);
+               if (is_dir)
+                       /* unlink dot */
+                       mdo_ref_del(env, mdd_cobj, handle);
 
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+               /* fetch updated nlink */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
+       }
 
-        la->la_valid = LA_CTIME | LA_MTIME;
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+
+       la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
+       if (rc)
+               GOTO(cleanup, rc);
+
+       /* Enough for only unlink the entry */
+       if (unlikely(mdd_cobj == NULL)) {
+               mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+               GOTO(stop, rc);
+       }
 
        if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
                 /* update ctime of an unlinked file only if it is still
@@ -1316,21 +1368,12 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
               spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
               spec->sp_cr_flags, spec->no_create);
 
-       if (spec->no_create) {
-               /* replay case */
+       if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
+               /* replay case or lfs setstripe */
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
-       } else  if (!(spec->sp_cr_flags & MDS_OPEN_HAS_OBJS)) {
-               if (spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
-                       /* lfs setstripe */
-                       buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
-                                               spec->u.sp_ea.eadatalen);
-               } else {
-                       buf = &LU_BUF_NULL;
-               }
        } else {
-               /* MDS_OPEN_HAS_OBJS is not used anymore ? */
-               LBUG();
+               buf = &LU_BUF_NULL;
        }
 
        rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
@@ -1366,14 +1409,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         if (unlikely(mdd_is_dead_obj(mdd_obj)))
                 RETURN(-ESTALE);
 
-        rc = mdd_object_exists(mdd_obj);
-        if (unlikely(rc == 0))
-                RETURN(-ESTALE);
-        else if (unlikely(rc < 0)) {
-                CERROR("Object "DFID" locates on remote server\n",
-                        PFID(mdo2fid(mdd_obj)));
-                RETURN(-EINVAL);
-        }
+       if (mdd_object_remote(mdd_obj)) {
+               CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
+                      mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
+       } else if (!mdd_object_exists(mdd_obj)) {
+               RETURN(-ESTALE);
+       }
 
         /* The common filename length check. */
         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
@@ -1400,25 +1441,40 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         RETURN(rc);
 }
 
-int mdd_declare_object_initialize(const struct lu_env *env,
-                                 struct mdd_object *child,
-                                 struct lu_attr *attr,
-                                 struct thandle *handle)
+static int mdd_declare_object_initialize(const struct lu_env *env,
+                                        struct mdd_object *parent,
+                                        struct mdd_object *child,
+                                        struct lu_attr *attr,
+                                        struct thandle *handle)
 {
         int rc;
+       ENTRY;
 
+       /*
+        * inode mode has been set in creation time, and it's based on umask,
+        * la_mode and acl, don't set here again! (which will go wrong
+        * because below function doesn't consider umask).
+        * I'd suggest set all object attributes in creation time, see above.
+        */
+       LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
+       attr->la_valid &= ~(LA_MODE | LA_TYPE);
        rc = mdo_declare_attr_set(env, child, attr, handle);
+       attr->la_valid |= LA_MODE | LA_TYPE;
        if (rc == 0 && S_ISDIR(attr->la_mode)) {
                rc = mdo_declare_index_insert(env, child, mdo2fid(child),
                                              dot, handle);
                 if (rc == 0)
                         rc = mdo_declare_ref_add(env, child, handle);
+
+               rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
+                                             dotdot, handle);
         }
 
-        if (rc == 0)
+       if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
+                       fid_is_dot_lustre(mdo2fid(child))))
                 mdd_declare_links_add(env, child, handle);
 
-        return rc;
+       RETURN(rc);
 }
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
@@ -1443,10 +1499,13 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
         * because below function doesn't consider umask).
         * I'd suggest set all object attributes in creation time, see above.
         */
-       attr->la_valid &= ~LA_MODE;
+       LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
+       attr->la_valid &= ~(LA_MODE | LA_TYPE);
        rc = mdd_attr_set_internal(env, child, attr, handle, 0);
-        if (rc != 0)
-                RETURN(rc);
+       /* arguments are supposed to stay the same */
+       attr->la_valid |= LA_MODE | LA_TYPE;
+       if (rc != 0)
+               RETURN(rc);
 
        if (S_ISDIR(attr->la_mode)) {
                 /* Add "." and ".." for newly created dir */
@@ -1460,10 +1519,12 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                 if (rc != 0)
                         mdo_ref_del(env, child, handle);
         }
-        if (rc == 0)
-                mdd_links_add(env, child, pfid, lname, handle, 1);
 
-        RETURN(rc);
+       if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
+                       fid_is_dot_lustre(mdo2fid(child))))
+               mdd_links_add(env, child, pfid, lname, handle, 1);
+
+       RETURN(rc);
 }
 
 /* has not lock on pobj yet */
@@ -1478,7 +1539,6 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         struct lu_fid     *fid       = &info->mti_fid;
         struct mdd_object *obj       = md2mdd_obj(pobj);
         struct mdd_device *m         = mdo2mdd(pobj);
-        int lookup                   = spec->sp_cr_lookup;
         int rc;
         ENTRY;
 
@@ -1486,12 +1546,12 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-        /*
+       /*
          * In some cases this lookup is not needed - we know before if name
          * exists or not because MDT performs lookup for it.
          * name length check is done in lookup.
          */
-        if (lookup) {
+       if (spec->sp_cr_lookup) {
                 /*
                  * Check if the name already exist, though it will be checked in
                  * _index_insert also, for avoiding rolling back if exists
@@ -1554,8 +1614,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                              struct thandle *handle,
                              const struct md_op_spec *spec)
 {
-       struct mdd_thread_info *info = mdd_env_info(env);
-        int            rc = 0;
+       int rc;
 
        rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
         if (rc)
@@ -1575,7 +1634,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                                GOTO(out, rc);
                }
 
-               rc = mdo_declare_attr_set(env, c, &info->mti_pattr, handle);
+               rc = mdo_declare_attr_set(env, c, attr, handle);
                if (rc)
                        GOTO(out, rc);
 
@@ -1592,14 +1651,17 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                        GOTO(out, rc);
         }
 
-       rc = mdd_declare_object_initialize(env, c, attr, handle);
-        if (rc)
-                GOTO(out, rc);
+       rc = mdd_declare_object_initialize(env, p, c, attr, handle);
+       if (rc)
+               GOTO(out, rc);
 
-        rc = mdo_declare_index_insert(env, p, mdo2fid(c),
-                                      name->ln_name, handle);
-        if (rc)
-                GOTO(out, rc);
+       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+               rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+       else
+               rc = mdo_declare_index_insert(env, p, mdo2fid(c),
+                                             name->ln_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
        /* replay case, create LOV EA from client data */
        if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
@@ -1621,9 +1683,11 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                         GOTO(out, rc);
         }
 
-       rc = mdo_declare_attr_set(env, p, attr, handle);
-        if (rc)
-                return rc;
+       if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
+               rc = mdo_declare_attr_set(env, p, attr, handle);
+               if (rc)
+                       return rc;
+       }
 
         rc = mdd_declare_changelog_store(env, mdd, name, handle);
         if (rc)
@@ -1633,7 +1697,6 @@ out:
         return rc;
 }
 
-
 /*
  * Create object and insert it into namespace.
  */
@@ -1641,19 +1704,19 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
                      struct md_op_spec *spec, struct md_attr* ma)
 {
-        struct mdd_thread_info *info = mdd_env_info(env);
-        struct lu_attr         *la = &info->mti_la_for_fix;
-        struct mdd_object      *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object      *son = md2mdd_obj(child);
-        struct mdd_device      *mdd = mdo2mdd(pobj);
-        struct lu_attr         *attr = &ma->ma_attr;
-        struct thandle         *handle;
-       struct lu_attr         *pattr = &info->mti_pattr;
-        struct dynlock_handle  *dlh;
-        const char             *name = lname->ln_name;
-       int rc, created = 0, initialized = 0, inserted = 0;
-        int got_def_acl = 0;
-        ENTRY;
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct lu_attr          *la = &info->mti_la_for_fix;
+       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
+       struct mdd_object       *son = md2mdd_obj(child);
+       struct mdd_device       *mdd = mdo2mdd(pobj);
+       struct lu_attr          *attr = &ma->ma_attr;
+       struct thandle          *handle;
+       struct lu_attr          *pattr = &info->mti_pattr;
+       struct dynlock_handle   *dlh;
+       const char              *name = lname->ln_name;
+       int                      rc, created = 0, initialized = 0, inserted = 0;
+       int                      got_def_acl = 0;
+       ENTRY;
 
         /*
          * Two operations have to be performed:
@@ -1733,21 +1796,21 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(out_stop, rc);
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
-        if (dlh == NULL)
-                GOTO(out_trans, rc = -ENOMEM);
+       dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
+       if (dlh == NULL)
+               GOTO(out_trans, rc = -ENOMEM);
 
-        mdd_write_lock(env, son, MOR_TGT_CHILD);
-       rc = mdd_object_create_internal(env, mdd_pobj, son, attr, handle, spec);
-        if (rc) {
-                mdd_write_unlock(env, son);
-                GOTO(cleanup, rc);
-        }
+       mdd_write_lock(env, son, MOR_TGT_CHILD);
+       rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
+       if (rc) {
+               mdd_write_unlock(env, son);
+               GOTO(cleanup, rc);
+       }
 
-        created = 1;
+       created = 1;
 
 #ifdef CONFIG_FS_POSIX_ACL
-        if (got_def_acl) {
+       if (got_def_acl) {
                struct lu_buf *acl_buf;
 
                acl_buf = mdd_buf_get(env, info->mti_xattr_buf, got_def_acl);
@@ -1756,10 +1819,10 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                        mdd_write_unlock(env, son);
                        GOTO(cleanup, rc);
                }
-        }
+       }
 #endif
 
-        rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
+       rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
                                   son, attr, handle, spec);
 
        /*
@@ -1768,8 +1831,8 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         *      MDT calls this xattr_set(LOV) in a different transaction.
         *      probably this way we code can be made better.
         */
-       if (rc == 0 &&
-                       (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
+       if (rc == 0 && (spec->no_create ||
+                       (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
                const struct lu_buf *buf;
 
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
@@ -1777,23 +1840,30 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
                                BYPASS_CAPA);
        }
-        mdd_write_unlock(env, son);
-        if (rc)
-                /*
-                 * Object has no links, so it will be destroyed when last
-                 * reference is released. (XXX not now.)
-                 */
-                GOTO(cleanup, rc);
 
-        initialized = 1;
+       if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+               rc = __mdd_orphan_add(env, son, handle);
 
-        rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
-                                name, S_ISDIR(attr->la_mode), handle,
-                                mdd_object_capa(env, mdd_pobj));
-        if (rc)
-                GOTO(cleanup, rc);
+       mdd_write_unlock(env, son);
+
+       if (rc != 0)
+               /*
+                * Object has no links, so it will be destroyed when last
+                * reference is released. (XXX not now.)
+                */
+               GOTO(cleanup, rc);
+
+       initialized = 1;
+
+       if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
+               rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
+                                       name, S_ISDIR(attr->la_mode), handle,
+                                       mdd_object_capa(env, mdd_pobj));
 
-        inserted = 1;
+       if (rc != 0)
+               GOTO(cleanup, rc);
+
+       inserted = 1;
 
         if (S_ISLNK(attr->la_mode)) {
                struct lu_ucred  *uc = lu_ucred_assert(env);
@@ -1815,11 +1885,16 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                         GOTO(cleanup, rc = -EFAULT);
         }
 
+       /* volatile file creation does not update parent directory times */
+       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+               GOTO(cleanup, rc = 0);
+
+       /* update parent directory mtime/ctime */
        *la = *attr;
-        la->la_valid = LA_CTIME | LA_MTIME;
+       la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
+       if (rc)
+               GOTO(cleanup, rc);
 
         EXIT;
 cleanup:
@@ -1827,9 +1902,12 @@ cleanup:
                int rc2;
 
                if (inserted != 0) {
-                       rc2 = __mdd_index_delete(env, mdd_pobj, name,
-                                                S_ISDIR(attr->la_mode),
-                                                handle, BYPASS_CAPA);
+                       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+                               rc2 = __mdd_orphan_del(env, son, handle);
+                       else
+                               rc2 = __mdd_index_delete(env, mdd_pobj, name,
+                                                        S_ISDIR(attr->la_mode),
+                                                        handle, BYPASS_CAPA);
                        if (rc2 != 0)
                                goto out_stop;
                }
@@ -1857,7 +1935,7 @@ cleanup:
 
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
-        if (rc == 0)
+       if (rc == 0 && fid_is_client_mdt_visible(mdo2fid(son)))
                rc = mdd_changelog_ns_store(env, mdd,
                        S_ISDIR(attr->la_mode) ? CL_MKDIR :
                        S_ISREG(attr->la_mode) ? CL_CREATE :
@@ -2371,30 +2449,12 @@ cleanup_unlocked:
 
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
-        if (mdd_sobj)
-                mdd_object_put(env, mdd_sobj);
 out_pending:
-        return rc;
+       mdd_object_put(env, mdd_sobj);
+       return rc;
 }
 
-/**
- * The data that link search is done on.
- */
-struct mdd_link_data {
-       /**
-        * Buffer to keep link EA body.
-        */
-       struct lu_buf           *ml_buf;
-       /**
-        * The matched header, entry and its lenght in the EA
-        */
-       struct link_ea_header   *ml_leh;
-       struct link_ea_entry    *ml_lee;
-       int                      ml_reclen;
-};
-
-static int mdd_links_new(const struct lu_env *env,
-                        struct mdd_link_data *ldata)
+int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata)
 {
        ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
        if (ldata->ml_buf->lb_buf == NULL)
@@ -2412,8 +2472,7 @@ static int mdd_links_new(const struct lu_env *env,
  *
  * \retval 0 or error
  */
-int mdd_links_read(const struct lu_env *env,
-                  struct mdd_object *mdd_obj,
+int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
                   struct mdd_link_data *ldata)
 {
        struct lustre_capa *capa;
@@ -2479,10 +2538,8 @@ struct lu_buf *mdd_links_get(const struct lu_env *env,
        return rc ? ERR_PTR(rc) : ldata.ml_buf;
 }
 
-static int mdd_links_write(const struct lu_env *env,
-                          struct mdd_object *mdd_obj,
-                          struct mdd_link_data *ldata,
-                          struct thandle *handle)
+int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
+                   struct mdd_link_data *ldata, struct thandle *handle)
 {
        const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf,
                                                     ldata->ml_leh->leh_len);
@@ -2502,6 +2559,8 @@ static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
         int             reclen;
 
         fid_cpu_to_be(&tmpfid, pfid);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
+               tmpfid.f_ver = ~0;
         memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
         memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
         reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
@@ -2521,9 +2580,9 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
         lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
 }
 
-static int mdd_declare_links_add(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
-                                 struct thandle *handle)
+int mdd_declare_links_add(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         struct thandle *handle)
 {
         int rc;
 
@@ -2535,27 +2594,15 @@ static int mdd_declare_links_add(const struct lu_env *env,
         return rc;
 }
 
-/* For pathologic linkers, we don't want to spend lots of time scanning the
- * link ea.  Limit ourseleves to something reasonable; links not in the EA
- * can be looked up via (slower) parent lookup.
- */
-#define LINKEA_MAX_COUNT 128
-
 /** Add a record to the end of link ea buf */
-static int mdd_links_add_buf(const struct lu_env *env,
-                            struct mdd_link_data *ldata,
-                            const struct lu_name *lname,
-                            const struct lu_fid *pfid)
+int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
+                     const struct lu_name *lname, const struct lu_fid *pfid)
 {
        LASSERT(ldata->ml_leh != NULL);
 
        if (lname == NULL || pfid == NULL)
                return -EINVAL;
 
-       /* Make sure our buf is big enough for the new one */
-       if (ldata->ml_leh->leh_reccount > LINKEA_MAX_COUNT)
-               return -EOVERFLOW;
-
        ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
        if (ldata->ml_leh->leh_len + ldata->ml_reclen >
            ldata->ml_buf->lb_len) {
@@ -2575,9 +2622,8 @@ static int mdd_links_add_buf(const struct lu_env *env,
 }
 
 /** Del the current record from the link ea buf */
-static void mdd_links_del_buf(const struct lu_env *env,
-                             struct mdd_link_data *ldata,
-                             const struct lu_name *lname)
+void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
+                      const struct lu_name *lname)
 {
        LASSERT(ldata->ml_leh != NULL);
 
@@ -2603,11 +2649,9 @@ static void mdd_links_del_buf(const struct lu_env *env,
  * \retval -ENOENT link does not exist
  * \retval -ve on error
  */
-static int mdd_links_find(const struct lu_env  *env,
-                         struct mdd_object    *mdd_obj,
-                         struct mdd_link_data *ldata,
-                         const struct lu_name *lname,
-                         const struct lu_fid  *pfid)
+int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
+                  struct mdd_link_data *ldata, const struct lu_name *lname,
+                  const struct lu_fid  *pfid)
 {
        struct lu_name *tmpname = &mdd_env_info(env)->mti_name2;
        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
@@ -2666,6 +2710,14 @@ static int __mdd_links_add(const struct lu_env *env,
                        return -EEXIST;
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
+               struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
+
+               *tfid = *pfid;
+               tfid->f_ver = ~0;
+               mdd_links_add_buf(env, ldata, lname, tfid);
+       }
+
        return mdd_links_add_buf(env, ldata, lname, pfid);
 }
 
@@ -2706,6 +2758,9 @@ static int mdd_links_rename(const struct lu_env *env,
        int rc = 0;
        ENTRY;
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
+               return 0;
+
        LASSERT(oldpfid != NULL || newpfid != NULL);
 
        if (mdd_obj->mod_flags & DEAD_OBJ)
@@ -2745,16 +2800,22 @@ out:
        if (rc == 0)
                rc = rc2;
        if (rc) {
+               int error = 1;
+               if (rc == -EOVERFLOW || rc == - ENOENT)
+                       error = 0;
                if (oldpfid == NULL)
-                       CERROR("link_ea add '%.*s' failed %d "DFID"\n",
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea add '%.*s' failed %d "DFID"\n",
                               newlname->ln_namelen, newlname->ln_name,
                               rc, PFID(mdd_object_fid(mdd_obj)));
                else if (newpfid == NULL)
-                       CERROR("link_ea del '%.*s' failed %d "DFID"\n",
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea del '%.*s' failed %d "DFID"\n",
                               oldlname->ln_namelen, oldlname->ln_name,
                               rc, PFID(mdd_object_fid(mdd_obj)));
                else
-                       CERROR("link_ea rename '%.*s'->'%.*s' failed %d "
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea rename '%.*s'->'%.*s' failed %d "
                               DFID"\n",
                               oldlname->ln_namelen, oldlname->ln_name,
                               newlname->ln_namelen, newlname->ln_name,