Whamcloud - gitweb
b=23289 no need to always start transaction for attr_set(atime) on close
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 3c992d9..d581b82 100644 (file)
@@ -1,29 +1,43 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  linux/mdt/mdt_open.c
- *  Lustre Metadata Target (mdt) open/close file handling
+ * GPL HEADER START
  *
- *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
- *   Author: Huang Hua <huanghua@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdt/mdt_open.c
+ *
+ * Lustre Metadata Target (mdt) open/close file handling
+ *
+ * Author: Huang Hua <huanghua@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
@@ -31,7 +45,7 @@
 #endif
 #define DEBUG_SUBSYSTEM S_MDS
 
-#include <linux/lustre_acl.h>
+#include <lustre_acl.h>
 #include <lustre_mds.h>
 #include "mdt_internal.h"
 
@@ -49,8 +63,8 @@ struct mdt_file_data *mdt_mfd_new(void)
 
         OBD_ALLOC_PTR(mfd);
         if (mfd != NULL) {
-                INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
-                INIT_LIST_HEAD(&mfd->mfd_list);
+                CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+                CFS_INIT_LIST_HEAD(&mfd->mfd_list);
                 class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
         }
         RETURN(mfd);
@@ -71,10 +85,9 @@ struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
         LASSERT(handle != NULL);
         mfd = class_handle2object(handle->cookie);
         /* during dw/setattr replay the mfd can be found by old handle */
-        if (mfd == NULL &&
-            lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+        if (mfd == NULL && req_is_replay(req)) {
                 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
-                list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+                cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                         if (mfd->mfd_old_handle.cookie == handle->cookie)
                                 RETURN (mfd);
                 }
@@ -86,7 +99,7 @@ struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
 /* free mfd */
 void mdt_mfd_free(struct mdt_file_data *mfd)
 {
-        LASSERT(list_empty(&mfd->mfd_list));
+        LASSERT(cfs_list_empty(&mfd->mfd_list));
         OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle);
 }
 
@@ -109,63 +122,76 @@ static int mdt_create_data(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
-static int mdt_epoch_opened(struct mdt_object *mo)
+static int mdt_ioepoch_opened(struct mdt_object *mo)
 {
-        return mo->mot_epochcount;
+        return mo->mot_ioepoch_count;
 }
 
-int mdt_sizeonmds_enabled(struct mdt_object *mo)
+int mdt_object_is_som_enabled(struct mdt_object *mo)
 {
         return !mo->mot_ioepoch;
 }
 
-/* Re-enable Size-on-MDS. */
-void mdt_sizeonmds_enable(struct mdt_thread_info *info,
-                          struct mdt_object *mo)
+/**
+ * Re-enable Size-on-MDS.
+ * Call under ->mot_ioepoch_sem.
+ */
+static void mdt_object_som_enable(struct mdt_object *mo, __u64 ioepoch)
 {
-       spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
-       if (info->mti_epoch->ioepoch == mo->mot_ioepoch) {
-                LASSERT(!mdt_epoch_opened(mo)); 
+        if (ioepoch == mo->mot_ioepoch) {
+                LASSERT(!mdt_ioepoch_opened(mo));
                 mo->mot_ioepoch = 0;
                 mo->mot_flags = 0;
-       }
-       spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
+        }
 }
 
-/* Open the epoch. Epoch open is allowed if @writecount is not negative.
- * The epoch and writecount handling is performed under the mdt_ioepoch_lock. */
-int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
+/**
+ * Open the IOEpoch. It is allowed if @writecount is not negative.
+ * The epoch and writecount handling is performed under the mot_ioepoch_sem.
+ */
+int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
+                     int created)
 {
         struct mdt_device *mdt = info->mti_mdt;
         int cancel = 0;
         int rc = 0;
         ENTRY;
 
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) || 
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
             !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
                 RETURN(0);
 
-        spin_lock(&mdt->mdt_ioepoch_lock);
-        if (mdt_epoch_opened(o)) {
+        cfs_down(&o->mot_ioepoch_sem);
+        if (mdt_ioepoch_opened(o)) {
                 /* Epoch continues even if there is no writers yet. */
                 CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
                        o->mot_ioepoch, PFID(mdt_object_fid(o)));
         } else {
-                if (info->mti_replayepoch > mdt->mdt_ioepoch)
+                /* XXX: ->mdt_ioepoch is not initialized at the mount */
+                cfs_spin_lock(&mdt->mdt_ioepoch_lock);
+                if (mdt->mdt_ioepoch < info->mti_replayepoch)
                         mdt->mdt_ioepoch = info->mti_replayepoch;
+
+                if (info->mti_replayepoch)
+                        o->mot_ioepoch = info->mti_replayepoch;
+                else if (++mdt->mdt_ioepoch == IOEPOCH_INVAL)
+                        o->mot_ioepoch = ++mdt->mdt_ioepoch;
                 else
-                        mdt->mdt_ioepoch++;
-                o->mot_ioepoch = info->mti_replayepoch ?
-                        info->mti_replayepoch : mdt->mdt_ioepoch;
+                        o->mot_ioepoch = mdt->mdt_ioepoch;
+
+                cfs_spin_unlock(&mdt->mdt_ioepoch_lock);
+
                 CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
-                       mdt->mdt_ioepoch, PFID(mdt_object_fid(o)));
+                       o->mot_ioepoch, PFID(mdt_object_fid(o)));
+                if (created)
+                        o->mot_flags |= MOF_SOM_CREATED;
                 cancel = 1;
         }
-        o->mot_epochcount++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        o->mot_ioepoch_count++;
+        cfs_up(&o->mot_ioepoch_sem);
 
-        /* Cancel Size-on-MDS attributes on clients if not truncate.
-         * In the later case, mdt_reint_setattr will do it. */
+        /* Cancel Size-on-MDS attributes cached on clients for the open case.
+         * In the truncate case, see mdt_reint_setattr(). */
         if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
                 struct mdt_lock_handle  *lh = &info->mti_lh[MDT_LH_CHILD];
                 mdt_lock_reg_init(lh, LCK_EX);
@@ -177,148 +203,343 @@ int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o)
         RETURN(rc);
 }
 
-/* Update the on-disk attributes if needed and re-enable Size-on-MDS caching. */
-static int mdt_sizeonmds_update(struct mdt_thread_info *info,
-                                struct mdt_object *o)
+/**
+ * Update SOM on-disk attributes.
+ * If enabling, write update inodes and lustre-ea with the proper IOEpoch,
+ * mountid and attributes. If disabling, zero IOEpoch id in lustre-ea.
+ * Call under ->mot_ioepoch_sem.
+ */
+static int mdt_som_attr_set(struct mdt_thread_info *info,
+                            struct mdt_object *obj, __u64 ioepoch, int enable)
 {
+        struct md_attr *ma = &info->mti_attr;
+        int rc;
         ENTRY;
 
-        CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
-               o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_epochcount);
-
-        if (info->mti_attr.ma_attr.la_valid & LA_SIZE) {
-                /* Do Size-on-MDS attribute update.
-                 * Size-on-MDS is re-enabled inside. */
-                /* XXX: since we have opened the file, it is unnecessary
-                 * to check permission when close it. Between the "open"
-                 * and "close", maybe someone has changed the file mode
-                 * or flags, or the file created mode do not permit wirte,
-                 * and so on. Just set MDS_PERM_BYPASS for all the cases. */
-                info->mti_attr.ma_attr_flags |= MDS_PERM_BYPASS;
-                info->mti_attr.ma_attr.la_valid &= LA_SIZE | LA_BLOCKS |
-                                                LA_ATIME | LA_MTIME | LA_CTIME;
-                RETURN(mdt_attr_set(info, o, 0));
-        } else
-                mdt_sizeonmds_enable(info, o);
-        RETURN(0);
+        CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64
+               " on "DFID".\n", enable ? "update" : "disabling",
+               ioepoch, PFID(mdt_object_fid(obj)));
+
+        ma->ma_valid |= MA_SOM;
+        ma->ma_som = &info->mti_u.som.data;
+        if (enable) {
+                struct mdt_device *mdt = info->mti_mdt;
+                struct lu_attr *la = &ma->ma_attr;
+
+                ma->ma_som->msd_ioepoch = ioepoch;
+                ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
+                ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
+                                         la->la_blocks : 0;
+                ma->ma_som->msd_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
+                ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
+        } else {
+                ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
+                ma->ma_attr.la_valid &= LA_ATIME;
+        }
+
+        /* Since we have opened the file, it is unnecessary
+         * to check permission when close it. Between the "open"
+         * and "close", maybe someone has changed the file mode
+         * or flags, or the file created mode do not permit wirte,
+         * and so on. Just set MDS_PERM_BYPASS for all the cases. */
+        ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+
+        rc = mdt_attr_set(info, obj, ma, 0);
+        RETURN(rc);
+}
+
+/** Perform the eviction specific actions on ioepoch close. */
+static inline int mdt_ioepoch_close_on_eviction(struct mdt_thread_info *info,
+                                                struct mdt_object *o)
+{
+        int rc = 0;
+
+        cfs_down(&o->mot_ioepoch_sem);
+        CDEBUG(D_INODE, "Eviction. Closing IOepoch "LPU64" on "DFID". "
+               "Count %d\n", o->mot_ioepoch, PFID(mdt_object_fid(o)),
+               o->mot_ioepoch_count);
+        o->mot_ioepoch_count--;
+
+        /* If eviction occured set MOF_SOM_RECOV,
+         * if no other epoch holders, disable SOM on disk. */
+        o->mot_flags |= MOF_SOM_CHANGE | MOF_SOM_RECOV;
+        if (!mdt_ioepoch_opened(o)) {
+                rc = mdt_som_attr_set(info, o, o->mot_ioepoch, MDT_SOM_DISABLE);
+                mdt_object_som_enable(o, o->mot_ioepoch);
+        }
+        cfs_up(&o->mot_ioepoch_sem);
+        RETURN(rc);
 }
 
-/* Epoch closes.
- * Returns 1 if epoch does not close.
- * Returns 0 if epoch closes.
- * Returns -EAGAIN if epoch closes but an Size-on-MDS Update is still needed
- * from the client. */
-static int mdt_epoch_close(struct mdt_thread_info *info, struct mdt_object *o)
+/**
+ * Perform the replay specific actions on ioepoch close.
+ * Skip SOM attribute update if obtained and just forget about the inode state
+ * for the last ioepoch holder. The SOM cache is invalidated on MDS failure.
+ */
+static inline int mdt_ioepoch_close_on_replay(struct mdt_thread_info *info,
+                                              struct mdt_object *o)
 {
-        int eviction = (mdt_info_req(info) == NULL ? 1 : 0);
-        struct lu_attr *la = &info->mti_attr.ma_attr;
-        int achange = 0;
-        int opened;
-        int rc = 1;
+        int rc = MDT_IOEPOCH_CLOSED;
         ENTRY;
 
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
-            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
-                RETURN(0);
+        cfs_down(&o->mot_ioepoch_sem);
+        CDEBUG(D_INODE, "Replay. Closing epoch "LPU64" on "DFID". Count %d\n",
+               o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
+        o->mot_ioepoch_count--;
 
-        spin_lock(&info->mti_mdt->mdt_ioepoch_lock);
+        /* Get an info from the replayed request if client is supposed
+         * to send an Attibute Update, reconstruct @rc if so */
+        if (info->mti_ioepoch->flags & MF_SOM_AU)
+                rc = MDT_IOEPOCH_GETATTR;
 
-        /* Epoch closes only if client tells about it or eviction occures. */
-        if (eviction || (info->mti_epoch->flags & MF_EPOCH_CLOSE)) {
-                LASSERT(o->mot_epochcount);
-                o->mot_epochcount--;
+        if (!mdt_ioepoch_opened(o))
+                mdt_object_som_enable(o, info->mti_ioepoch->ioepoch);
+        cfs_up(&o->mot_ioepoch_sem);
 
-                CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
-                       o->mot_ioepoch, PFID(mdt_object_fid(o)),
-                       o->mot_epochcount);
+        RETURN(rc);
+}
 
-                if (!eviction)
-                        achange = (info->mti_epoch->flags & MF_SOM_CHANGE);
+/**
+ * Regular file IOepoch close.
+ * Closes the ioepoch, checks the object state, apply obtained attributes and
+ * re-enable SOM on the object, if possible. Also checks if the recovery is
+ * needed and packs OBD_MD_FLGETATTRLOCK flag into the reply to force the client
+ * to obtain SOM attributes under the server-side OST locks.
+ *
+ * Return value:
+ * MDT_IOEPOCH_CLOSED if ioepoch is closed.
+ * MDT_IOEPOCH_GETATTR if ioepoch is closed but another SOM update is needed.
+ */
+static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
+                                        struct mdt_object *o)
+{
+        struct md_attr *tmp_ma;
+        struct lu_attr *la;
+        int achange, opened;
+        int recovery = 0;
+        int rc = 0, ret = MDT_IOEPOCH_CLOSED;
+        ENTRY;
 
-                rc = 0;
-                if (!eviction && !mdt_epoch_opened(o)) {
-                        /* Epoch ends. Is an Size-on-MDS update needed? */
-                        if (o->mot_flags & MF_SOM_CHANGE) {
-                                /* Some previous writer changed the attribute.
-                                 * Do not believe to the current Size-on-MDS
-                                 * update, re-ask client. */
-                                rc = -EAGAIN;
-                        } else if (!(la->la_valid & LA_SIZE) && achange) {
-                                /* Attributes were changed by the last writer
-                                 * only but no Size-on-MDS update is received.*/
-                                rc = -EAGAIN;
-                        }
+        la = &info->mti_attr.ma_attr;
+        achange = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
+
+        cfs_down(&o->mot_ioepoch_sem);
+        o->mot_ioepoch_count--;
+
+        tmp_ma = &info->mti_u.som.attr;
+        tmp_ma->ma_lmm = info->mti_attr.ma_lmm;
+        tmp_ma->ma_lmm_size = info->mti_attr.ma_lmm_size;
+        tmp_ma->ma_som = &info->mti_u.som.data;
+        tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM;
+        tmp_ma->ma_valid = 0;
+        rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma);
+        if (rc)
+                GOTO(error_up, rc);
+
+        /* Check the on-disk SOM state. */
+        if (o->mot_flags & MOF_SOM_RECOV)
+                recovery = 1;
+        else if (!(o->mot_flags & MOF_SOM_CREATED) &&
+                 !(tmp_ma->ma_valid & MA_SOM))
+                recovery = 1;
+
+        CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
+               o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
+
+        opened = mdt_ioepoch_opened(o);
+        /**
+         * If IOEpoch is not opened, check if a Size-on-MDS update is needed.
+         * Skip the check for file with no LOV  or for unlink files.
+         */
+        if (!opened && tmp_ma->ma_valid & MA_LOV &&
+            !(tmp_ma->ma_valid & MA_INODE && tmp_ma->ma_attr.la_nlink == 0)) {
+                if (recovery)
+                        /* If some previous writer was evicted, re-ask the
+                         * client for attributes. Even if attributes are
+                         * provided, we cannot believe in them.
+                         * Another use case is that there is no SOM cache on
+                         * disk -- first access with SOM or there was an MDS
+                         * failure. */
+                        ret = MDT_IOEPOCH_GETATTR;
+                else if (o->mot_flags & MOF_SOM_CHANGE)
+                        /* Some previous writer changed the attribute.
+                         * Do not believe to the current Size-on-MDS
+                         * update, re-ask client. */
+                        ret = MDT_IOEPOCH_GETATTR;
+                else if (!(la->la_valid & LA_SIZE) && achange)
+                        /* Attributes were changed by the last writer
+                         * only but no Size-on-MDS update is received.*/
+                        ret = MDT_IOEPOCH_GETATTR;
+        }
+
+        if (achange || ret == MDT_IOEPOCH_GETATTR)
+                o->mot_flags |= MOF_SOM_CHANGE;
+
+        /* If epoch ends and relable SOM attributes are obtained, update them.
+         * Create SOM ea for new files even if there is no attributes obtained
+         * (0-length file). */
+        if (ret == MDT_IOEPOCH_CLOSED && !opened) {
+                if (achange || o->mot_flags & MOF_SOM_CREATED) {
+                        LASSERT(achange || !(la->la_valid & LA_SIZE));
+                        rc = mdt_som_attr_set(info, o, o->mot_ioepoch,
+                                              MDT_SOM_ENABLE);
+                        /* Avoid the following setattrs of these attributes,
+                         * e.g. for atime update. */
+                        info->mti_attr.ma_valid = 0;
                 }
+                mdt_object_som_enable(o, o->mot_ioepoch);
+        }
 
-                if (achange || eviction)
-                        o->mot_flags |= MF_SOM_CHANGE;
+        cfs_up(&o->mot_ioepoch_sem);
+        /* If recovery is needed, tell the client to perform GETATTR under
+         * the lock. */
+        if (ret == MDT_IOEPOCH_GETATTR && recovery) {
+                struct mdt_body *rep;
+                rep = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+                rep->valid |= OBD_MD_FLGETATTRLOCK;
         }
 
-        opened = mdt_epoch_opened(o);
-        spin_unlock(&info->mti_mdt->mdt_ioepoch_lock);
+        RETURN(rc ? : ret);
+
+error_up:
+        cfs_up(&o->mot_ioepoch_sem);
+        return rc;
+}
 
-        /* If eviction occurred, do nothing. */
-        if ((rc == 0) && !opened && !eviction) {
-                /* Epoch ends and wanted Size-on-MDS update is obtained. */
-                rc = mdt_sizeonmds_update(info, o);
-                /* Avoid the following setattrs of these attributes, e.g.
-                 * for atime update. */
-                info->mti_attr.ma_valid = 0;
+/**
+ * Close IOEpoch (opened file or FMODE_EPOCH state). It happens if:
+ * - a client closes the IOEpoch;
+ * - a client eviction occured.
+ * Return values:
+ * MDT_IOEPOCH_OPENED if the client does not close IOEpoch.
+ * MDT_IOEPOCH_CLOSED if the client closes IOEpoch.
+ * MDT_IOEPOCH_GETATTR if the client closes IOEpoch but another SOM attribute
+ * update is needed.
+ */
+static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
+{
+        struct ptlrpc_request *req = mdt_info_req(info);
+        ENTRY;
+
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
+                RETURN(0);
+
+        LASSERT(o->mot_ioepoch_count);
+        LASSERT(info->mti_ioepoch == NULL ||
+                info->mti_ioepoch->ioepoch == o->mot_ioepoch);
+
+        /* IOEpoch is closed only if client tells about it or eviction occures.
+         * In the replay case, always close the epoch. */
+        if (req == NULL)
+                RETURN(mdt_ioepoch_close_on_eviction(info, o));
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                RETURN(mdt_ioepoch_close_on_replay(info, o));
+        if (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)
+                RETURN(mdt_ioepoch_close_reg(info, o));
+        /* IO epoch is not closed. */
+        RETURN(MDT_IOEPOCH_OPENED);
+}
+
+/**
+ * Close FMODE_SOM state, when IOEpoch is already closed and we are waiting for
+ * attribute update. It happens if:
+ * - SOM Attribute Update is obtained;
+ * - the client failed to obtain it and informs MDS about it;
+ * - a client eviction occured.
+ * Apply obtained attributes for the 1st case, wipe out the on-disk SOM
+ * cache otherwise.
+ */
+int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
+{
+        struct ptlrpc_request *req = mdt_info_req(info);
+        __u64 ioepoch = 0;
+        int act = MDT_SOM_ENABLE;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(!req || info->mti_ioepoch);
+        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
+                RETURN(0);
+
+        /* No size whereas MF_SOM_CHANGE is set means client failed to
+         * obtain ost attributes, drop the SOM cache on disk if so. */
+        if (!req ||
+            (info->mti_ioepoch &&
+             info->mti_ioepoch->flags & MF_SOM_CHANGE &&
+             !(info->mti_attr.ma_attr.la_valid & LA_SIZE)))
+                act = MDT_SOM_DISABLE;
+
+        cfs_down(&o->mot_ioepoch_sem);
+        /* Mark the object it is the recovery state if we failed to obtain
+         * SOM attributes. */
+        if (act == MDT_SOM_DISABLE)
+                o->mot_flags |= MOF_SOM_RECOV;
+
+        if (!mdt_ioepoch_opened(o)) {
+                ioepoch =  info->mti_ioepoch ?
+                        info->mti_ioepoch->ioepoch : o->mot_ioepoch;
+
+                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+                        rc = mdt_som_attr_set(info, o, ioepoch, act);
+                mdt_object_som_enable(o, ioepoch);
         }
+        cfs_up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-int mdt_write_read(struct mdt_device *mdt, struct mdt_object *o)
+int mdt_write_read(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        cfs_down(&o->mot_ioepoch_sem);
         rc = o->mot_writecount;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        cfs_up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-int mdt_write_get(struct mdt_device *mdt, struct mdt_object *o)
+int mdt_write_get(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        cfs_down(&o->mot_ioepoch_sem);
         if (o->mot_writecount < 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        cfs_up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-static void mdt_write_put(struct mdt_device *mdt, struct mdt_object *o)
+void mdt_write_put(struct mdt_object *o)
 {
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        cfs_down(&o->mot_ioepoch_sem);
         o->mot_writecount--;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        cfs_up(&o->mot_ioepoch_sem);
         EXIT;
 }
 
-static int mdt_write_deny(struct mdt_device *mdt, struct mdt_object *o)
+static int mdt_write_deny(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        cfs_down(&o->mot_ioepoch_sem);
         if (o->mot_writecount > 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount--;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        cfs_up(&o->mot_ioepoch_sem);
         RETURN(rc);
 }
 
-static void mdt_write_allow(struct mdt_device *mdt, struct mdt_object *o)
+static void mdt_write_allow(struct mdt_object *o)
 {
         ENTRY;
-        spin_lock(&mdt->mdt_ioepoch_lock);
+        cfs_down(&o->mot_ioepoch_sem);
         o->mot_writecount++;
-        spin_unlock(&mdt->mdt_ioepoch_lock);
+        cfs_up(&o->mot_ioepoch_sem);
         EXIT;
 }
 
@@ -329,29 +550,28 @@ static void mdt_empty_transno(struct mdt_thread_info* info)
         struct ptlrpc_request *req = mdt_info_req(info);
 
         ENTRY;
-        /* transaction is occured already */
+        /* transaction has occurred already */
         if (lustre_msg_get_transno(req->rq_repmsg) != 0) {
                 EXIT;
                 return;
         }
 
-        spin_lock(&mdt->mdt_transno_lock);
+        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
         if (info->mti_transno == 0) {
-                info->mti_transno = ++ mdt->mdt_last_transno;
+                info->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
         } else {
                 /* should be replay */
-                if (info->mti_transno > mdt->mdt_last_transno)
-                        mdt->mdt_last_transno = info->mti_transno;
+                if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
+                        mdt->mdt_lut.lut_last_transno = info->mti_transno;
         }
-        spin_unlock(&mdt->mdt_transno_lock);
+        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
 
-        CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
+        CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
                         info->mti_transno,
                         req->rq_export->exp_obd->obd_last_committed);
 
         req->rq_transno = info->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
-        lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
         EXIT;
 }
 
@@ -366,7 +586,7 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
-                        struct mdt_object *o, int flags, int created)
+                        struct mdt_object *o, __u64 flags, int created)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
@@ -381,7 +601,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         isreg = S_ISREG(la->la_mode);
         isdir = S_ISDIR(la->la_mode);
-        if ((isreg && !(ma->ma_valid & MA_LOV))) {
+        if (isreg && !(ma->ma_valid & MA_LOV)) {
                 /*
                  * No EA, check whether it is will set regEA and dirEA since in
                  * above attr get, these size might be zero, so reset it, to
@@ -409,13 +629,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         }
 
         if (flags & FMODE_WRITE) {
-                rc = mdt_write_get(info->mti_mdt, o);
+                rc = mdt_write_get(o);
                 if (rc == 0) {
-                        mdt_epoch_open(info, o);
+                        mdt_ioepoch_open(info, o, created);
                         repbody->ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-                rc = mdt_write_deny(info->mti_mdt, o);
+                rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -446,34 +666,45 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 mfd->mfd_xid = req->rq_xid;
 
                 /* replay handle */
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                if (req_is_replay(req)) {
                         struct mdt_file_data *old_mfd;
                         /* Check wheather old cookie already exist in
                          * the list, becasue when do recovery, client
                          * might be disconnected from server, and
-                         * restart replay, so there maybe some orphan 
+                         * restart replay, so there maybe some orphan
                          * mfd here, we should remove them */
                         LASSERT(info->mti_rr.rr_handle != NULL);
                         old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
                         if (old_mfd) {
-                                CDEBUG(D_HA, "del orph mfd %p cookie" LPX64"\n",
-                                       mfd, info->mti_rr.rr_handle->cookie);
-                                spin_lock(&med->med_open_lock);
+                                CDEBUG(D_HA, "del orph mfd %p fid=("DFID") "
+                                       "cookie=" LPX64"\n", mfd,
+                                       PFID(mdt_object_fid(mfd->mfd_object)),
+                                       info->mti_rr.rr_handle->cookie);
+                                cfs_spin_lock(&med->med_open_lock);
                                 class_handle_unhash(&old_mfd->mfd_handle);
-                                list_del_init(&old_mfd->mfd_list);
-                                spin_unlock(&med->med_open_lock);
-                                mdt_mfd_free(old_mfd);
+                                cfs_list_del_init(&old_mfd->mfd_list);
+                                cfs_spin_unlock(&med->med_open_lock);
+                                mdt_mfd_close(info, old_mfd);
                         }
                         CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
                                info->mti_rr.rr_handle->cookie);
                         mfd->mfd_old_handle.cookie =
                                                 info->mti_rr.rr_handle->cookie;
                 }
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
-                spin_unlock(&med->med_open_lock);
-
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+
+                if (req->rq_export->exp_disconnected) {
+                        cfs_spin_lock(&med->med_open_lock);
+                        class_handle_unhash(&mfd->mfd_handle);
+                        cfs_list_del_init(&mfd->mfd_list);
+                        cfs_spin_unlock(&med->med_open_lock);
+                        mdt_mfd_close(info, mfd);
+                } else {
+                        cfs_spin_lock(&med->med_open_lock);
+                        cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+                        cfs_spin_unlock(&med->med_open_lock);
+                }
+
                 mdt_empty_transno(info);
         } else
                 rc = -ENOMEM;
@@ -484,18 +715,18 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
 static int mdt_finish_open(struct mdt_thread_info *info,
                            struct mdt_object *p, struct mdt_object *o,
-                           int flags, int created, struct ldlm_reply *rep)
+                           __u64 flags, int created, struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
+        struct obd_export       *exp = req->rq_export;
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
-        struct mdt_device       *mdt = info->mti_mdt;
         struct md_attr          *ma  = &info->mti_attr;
         struct lu_attr          *la  = &ma->ma_attr;
         struct mdt_file_data    *mfd;
         struct mdt_body         *repbody;
         int                      rc = 0;
         int                      isreg, isdir, islnk;
-        struct list_head        *t;
+        cfs_list_t              *t;
         ENTRY;
 
         LASSERT(ma->ma_valid & MA_INODE);
@@ -507,7 +738,7 @@ static int mdt_finish_open(struct mdt_thread_info *info,
         islnk = S_ISLNK(la->la_mode);
         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
 
-        if (med->med_rmtclient) {
+        if (exp_connect_rmtclient(exp)) {
                 void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
 
                 rc = mdt_pack_remote_perm(info, o, buf);
@@ -520,7 +751,7 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                 }
         }
 #ifdef CONFIG_FS_POSIX_ACL
-        else if (req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) {
+        else if (exp->exp_connect_flags & OBD_CONNECT_ACL) {
                 const struct lu_env *env = info->mti_env;
                 struct md_object *next = mdt_object_child(o);
                 struct lu_buf *buf = &info->mti_buf;
@@ -550,26 +781,26 @@ static int mdt_finish_open(struct mdt_thread_info *info,
         }
 #endif
 
-        if (mdt->mdt_opts.mo_mds_capa) {
+        if (info->mti_mdt->mdt_opts.mo_mds_capa &&
+            exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
-                capa->lc_uid = 0;
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
                 repbody->valid |= OBD_MD_FLMDSCAPA;
         }
-        if (mdt->mdt_opts.mo_oss_capa &&
+        if (info->mti_mdt->mdt_opts.mo_oss_capa &&
+            exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
             S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | capa_open_opc(flags);
-                capa->lc_uid = 0;
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
@@ -611,15 +842,15 @@ static int mdt_finish_open(struct mdt_thread_info *info,
 
         mfd = NULL;
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                spin_lock(&med->med_open_lock);
-                list_for_each(t, &med->med_open_head) {
-                        mfd = list_entry(t, struct mdt_file_data, mfd_list);
+                cfs_spin_lock(&med->med_open_lock);
+                cfs_list_for_each(t, &med->med_open_head) {
+                        mfd = cfs_list_entry(t, struct mdt_file_data, mfd_list);
                         if (mfd->mfd_xid == req->rq_xid) {
                                 break;
                         }
                         mfd = NULL;
                 }
-                spin_unlock(&med->med_open_lock);
+                cfs_spin_unlock(&med->med_open_lock);
 
                 if (mfd != NULL) {
                         repbody->handle.cookie = mfd->mfd_handle.h_cookie;
@@ -640,8 +871,8 @@ static int mdt_finish_open(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
-extern void mdt_req_from_mcd(struct ptlrpc_request *req,
-                             struct mdt_client_data *mcd);
+extern void mdt_req_from_lcd(struct ptlrpc_request *req,
+                             struct lsd_client_data *lcd);
 
 void mdt_reconstruct_open(struct mdt_thread_info *info,
                           struct mdt_lock_handle *lhc)
@@ -650,8 +881,8 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_device       *mdt  = info->mti_mdt;
         struct req_capsule      *pill = info->mti_pill;
         struct ptlrpc_request   *req  = mdt_info_req(info);
-        struct mdt_export_data  *med  = &req->rq_export->exp_mdt_data;
-        struct mdt_client_data  *mcd  = med->med_mcd;
+        struct tg_export_data   *ted  = &req->rq_export->exp_target_data;
+        struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
         __u32                   flags = info->mti_spec.sp_cr_flags;
@@ -669,11 +900,14 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
         ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
                                                RCL_SERVER);
-        ma->ma_need = MA_INODE | MA_LOV;
+        ma->ma_need = MA_INODE;
+        if (ma->ma_lmm_size > 0)
+                ma->ma_need |= MA_LOV;
+
         ma->ma_valid = 0;
 
-        mdt_req_from_mcd(req, med->med_mcd);
-        mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
+        mdt_req_from_lcd(req, lcd);
+        mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
 
         CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
                 ldlm_rep->lock_policy_res1, req->rq_status);
@@ -684,16 +918,36 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                 GOTO(out, rc = req->rq_status);
 
         if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
+                struct obd_export *exp = req->rq_export;
                 /*
                  * We failed after creation, but we do not know in which step
                  * we failed. So try to check the child object.
                  */
                 parent = mdt_object_find(env, mdt, rr->rr_fid1);
-                LASSERT(!IS_ERR(parent));
-
+                if (IS_ERR(parent)) {
+                        rc = PTR_ERR(parent);
+                        LCONSOLE_WARN("Parent "DFID" lookup error %d."
+                                      " Evicting client %s with export %s.\n",
+                                      PFID(mdt_object_fid(parent)), rc,
+                                      obd_uuid2str(&exp->exp_client_uuid),
+                                      obd_export_nid2str(exp));
+                        mdt_export_evict(exp);
+                        EXIT;
+                        return;
+                }
                 child = mdt_object_find(env, mdt, rr->rr_fid2);
-                LASSERT(!IS_ERR(child));
-
+                if (IS_ERR(child)) {
+                        rc = PTR_ERR(child);
+                        LCONSOLE_WARN("Child "DFID" lookup error %d."
+                                      " Evicting client %s with export %s.\n",
+                                      PFID(mdt_object_fid(child)), rc,
+                                      obd_uuid2str(&exp->exp_client_uuid),
+                                      obd_export_nid2str(exp));
+                        mdt_object_put(env, parent);
+                        mdt_export_evict(exp);
+                        EXIT;
+                        return;
+                }
                 rc = mdt_object_exists(child);
                 if (rc > 0) {
                         struct md_object *next;
@@ -770,6 +1024,68 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         RETURN(rc);
 }
 
+static int mdt_open_anon_by_fid(struct mdt_thread_info* info,
+                                struct ldlm_reply *rep, 
+                                struct mdt_lock_handle *lhc)
+{
+        __u32                    flags = info->mti_spec.sp_cr_flags;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        struct md_attr          *ma = &info->mti_attr;
+        struct mdt_object       *o;
+        int                      rc;
+        ldlm_mode_t              lm;
+        ENTRY;
+
+        o = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
+        if (IS_ERR(o))
+                RETURN(rc = PTR_ERR(o));
+
+        rc = mdt_object_exists(o);
+        if (rc == 0) {
+                mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
+                                    DISP_LOOKUP_NEG));
+                GOTO(out, rc = -ENOENT);
+        } else if (rc < 0) {
+                CERROR("NFS remote open shouldn't happen.\n");
+                GOTO(out, rc);
+        }
+
+        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+                                        DISP_LOOKUP_EXECD |
+                                        DISP_LOOKUP_POS));
+
+        if (flags & FMODE_WRITE)
+                lm = LCK_CW;
+        else if (flags & MDS_FMODE_EXEC)
+                lm = LCK_PR;
+        else
+                lm = LCK_CR;
+
+        mdt_lock_handle_init(lhc);
+        mdt_lock_reg_init(lhc, lm);
+        rc = mdt_object_lock(info, o, lhc,
+                             MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
+                             MDT_CROSS_LOCK);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
+        if (rc)
+                GOTO(out, rc);
+
+        if (flags & MDS_OPEN_LOCK)
+                mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
+
+        if (!(flags & MDS_OPEN_LOCK) || rc)
+                mdt_object_unlock(info, o, lhc, 1);
+
+        GOTO(out, rc);
+out:
+        mdt_object_put(info->mti_env, o);
+        return rc;
+}
+
 int mdt_pin(struct mdt_thread_info* info)
 {
         ENTRY;
@@ -835,40 +1151,49 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         struct mdt_body         *repbody;
         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
         struct md_attr          *ma = &info->mti_attr;
-        __u32                    create_flags = info->mti_spec.sp_cr_flags;
+        __u64                    create_flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct lu_name          *lname;
-        int                      result;
+        int                      result, rc;
         int                      created = 0;
+        __u32                    msg_flags;
         ENTRY;
 
         OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
                                (obd_timeout + 1) / 4);
 
+        mdt_counter_incr(req->rq_export, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
                                                RCL_SERVER);
-        ma->ma_need = MA_INODE | MA_LOV;
+        ma->ma_need = MA_INODE;
+        if (ma->ma_lmm_size > 0)
+                ma->ma_need |= MA_LOV;
+
         ma->ma_valid = 0;
 
         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
         ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
 
-        /* TODO: JOIN file */
-        if (create_flags & MDS_OPEN_JOIN_FILE) {
-                CERROR("JOIN file will be supported soon\n");
+        if (unlikely(create_flags & MDS_OPEN_JOIN_FILE)) {
+                CERROR("file join is not supported anymore.\n");
                 GOTO(out, result = err_serious(-EOPNOTSUPP));
         }
+        msg_flags = lustre_msg_get_flags(req->rq_reqmsg);
+
+        if ((create_flags & (MDS_OPEN_HAS_EA | MDS_OPEN_HAS_OBJS)) &&
+            info->mti_spec.u.sp_ea.eadata == NULL)
+                GOTO(out, result = err_serious(-EINVAL));
 
         CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
-               "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
+               "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
                PFID(rr->rr_fid1), rr->rr_name,
                PFID(rr->rr_fid2), create_flags,
-               ma->ma_attr.la_mode, lustre_msg_get_flags(req->rq_reqmsg));
+               ma->ma_attr.la_mode, msg_flags);
 
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
+        if (req_is_replay(req) ||
             (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
                 /* This is a replay request or from liblustre with ea. */
                 result = mdt_open_by_fid(info, ldlm_rep);
@@ -884,11 +1209,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                  * via a regular replay.
                  */
                 if (!(create_flags & MDS_OPEN_CREAT)) {
-                        DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
+                        DEBUG_REQ(D_ERROR, req,
+                                  "OPEN & CREAT not in open replay.");
                         GOTO(out, result = -EFAULT);
                 }
                 CDEBUG(D_INFO, "Open replay did find object, continue as "
                        "regular open\n");
+        } else if (rr->rr_namelen == 0 && !info->mti_cross_ref &&
+                   create_flags & MDS_OPEN_LOCK) {
+                result = mdt_open_anon_by_fid(info, ldlm_rep, lhc);
+                GOTO(out, result);
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
@@ -914,10 +1244,14 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
 
+        /* get and check version of parent */
+        result = mdt_version_get_check(info, parent, 0);
+        if (result)
+                GOTO(out_parent, result);
+
         fid_zero(child_fid);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-
         result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                             lname, child_fid, &info->mti_spec);
         LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
@@ -954,8 +1288,20 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
+        /** check version of child  */
+        rc = mdt_version_get_check(info, child, 1);
+        if (rc)
+                GOTO(out_child, result = rc);
+
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
+                /* save versions in reply */
+                mdt_version_get_save(info, parent, 0);
+                mdt_version_get_save(info, child, 1);
+
+                /* version of child will be changed */
+                info->mti_mos = child;
+
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
@@ -968,6 +1314,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                  * not exist.
                  */
                 info->mti_spec.sp_cr_lookup = 0;
+                info->mti_spec.sp_feat = &dt_directory_features;
 
                 result = mdo_create(info->mti_env,
                                     mdt_object_child(parent),
@@ -991,8 +1338,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                  * The object is on remote node, return its FID for remote open.
                  */
                 if (result == -EREMOTE) {
-                        int rc;
-
                         /*
                          * Check if this lock already was sent to client and
                          * this is resent case. For resent case do not take lock
@@ -1003,8 +1348,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                                 struct ldlm_lock *lock;
 
-                                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
-                                        MSG_RESENT);
+                                LASSERT(msg_flags & MSG_RESENT);
 
                                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                                 if (!lock) {
@@ -1032,37 +1376,68 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 }
         }
 
+        LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
+
+        /* get openlock if this is not replay and if a client requested it */
+        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
+                ldlm_mode_t lm;
+
+                if (create_flags & FMODE_WRITE)
+                        lm = LCK_CW;
+                else if (create_flags & MDS_FMODE_EXEC)
+                        lm = LCK_PR;
+                else
+                        lm = LCK_CR;
+                mdt_lock_handle_init(lhc);
+                mdt_lock_reg_init(lhc, lm);
+                rc = mdt_object_lock(info, child, lhc,
+                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
+                                     MDT_CROSS_LOCK);
+                if (rc) {
+                        result = rc;
+                        GOTO(out_child, result);
+                } else {
+                        result = -EREMOTE;
+                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+                }
+        }
+
         /* Try to open it now. */
-        result = mdt_finish_open(info, parent, child, create_flags,
-                                 created, ldlm_rep);
-
-        if (result != 0 && created) {
-                int rc2;
-                ma->ma_need = 0;
-                ma->ma_valid = 0;
-                ma->ma_cookie_size = 0;
-                info->mti_no_need_trans = 1;
-                rc2 = mdo_unlink(info->mti_env,
-                                 mdt_object_child(parent),
-                                 mdt_object_child(child),
-                                 lname,
-                                 &info->mti_attr);
-                if (rc2 != 0)
-                        CERROR("Error in cleanup of open\n");
+        rc = mdt_finish_open(info, parent, child, create_flags,
+                             created, ldlm_rep);
+        if (rc) {
+                result = rc;
+                if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                        /* openlock was acquired and mdt_finish_open failed -
+                           drop the openlock */
+                        mdt_object_unlock(info, child, lhc, 1);
+                if (created) {
+                        ma->ma_need = 0;
+                        ma->ma_valid = 0;
+                        ma->ma_cookie_size = 0;
+                        info->mti_no_need_trans = 1;
+                        rc = mdo_unlink(info->mti_env,
+                                        mdt_object_child(parent),
+                                        mdt_object_child(child),
+                                        lname,
+                                        &info->mti_attr);
+                        if (rc != 0)
+                                CERROR("Error in cleanup of open\n");
+                }
         }
         EXIT;
 out_child:
         mdt_object_put(info->mti_env, child);
 out_parent:
-        mdt_object_unlock_put(info, parent, lh, result);
+        mdt_object_unlock_put(info, parent, lh, result || !created);
 out:
-        if (result)
+        if (result && result != -EREMOTE)
                 lustre_msg_set_transno(req->rq_repmsg, 0);
         return result;
 }
 
 #define MFD_CLOSED(mode) (((mode) & ~(FMODE_EPOCH | FMODE_SOM | \
-                                      FMODE_EPOCHLCK)) == FMODE_CLOSED)
+                                      FMODE_TRUNC)) == FMODE_CLOSED)
 
 static int mdt_mfd_closed(struct mdt_file_data *mfd)
 {
@@ -1074,25 +1449,29 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
         struct mdt_object *o = mfd->mfd_object;
         struct md_object *next = mdt_object_child(o);
         struct md_attr *ma = &info->mti_attr;
-        int rc = 0, ret = 0;
+        int ret = MDT_IOEPOCH_CLOSED;
+        int rc = 0;
         int mode;
         ENTRY;
 
         mode = mfd->mfd_mode;
 
-        if ((mode & FMODE_WRITE) || (mode & FMODE_EPOCHLCK)) {
-                mdt_write_put(info->mti_mdt, o);
-                ret = mdt_epoch_close(info, o);
+        if ((mode & FMODE_WRITE) || (mode & FMODE_TRUNC)) {
+                mdt_write_put(o);
+                ret = mdt_ioepoch_close(info, o);
         } else if (mode & MDS_FMODE_EXEC) {
-                mdt_write_allow(info->mti_mdt, o);
+                mdt_write_allow(o);
         } else if (mode & FMODE_EPOCH) {
-                ret = mdt_epoch_close(info, o);
-        } 
-        
+                ret = mdt_ioepoch_close(info, o);
+        } else if (mode & FMODE_SOM) {
+                ret = mdt_som_au_close(info, o);
+        }
+
         /* Update atime on close only. */
         if ((mode & MDS_FMODE_EXEC || mode & FMODE_READ || mode & FMODE_WRITE)
             && (ma->ma_valid & MA_INODE) && (ma->ma_attr.la_valid & LA_ATIME)) {
                 /* Set the atime only. */
+                ma->ma_valid = MA_INODE;
                 ma->ma_attr.la_valid = LA_ATIME;
                 rc = mo_attr_set(info->mti_env, next, ma);
         }
@@ -1102,33 +1481,27 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
         if (!MFD_CLOSED(mode))
                 rc = mo_close(info->mti_env, next, ma);
-        else if (ret == -EAGAIN)
-                rc = mo_attr_get(info->mti_env, next, ma);
 
-        /* If the object is unlinked, do not try to re-enable SIZEONMDS */
-        if ((ret == -EAGAIN) && (ma->ma_valid & MA_INODE) &&
-            (ma->ma_attr.la_nlink == 0)) {
-                ret = 0;
-        }
-
-        if ((ret == -EAGAIN) || (ret == 1)) {
+        if (ret == MDT_IOEPOCH_GETATTR || ret == MDT_IOEPOCH_OPENED) {
                 struct mdt_export_data *med;
 
-                /* The epoch has not closed or Size-on-MDS update is needed.
+                /* The IOepoch is still opened or SOM update is needed.
                  * Put mfd back into the list. */
                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
-                mdt_mfd_set_mode(mfd, (ret == 1 ? FMODE_EPOCH : FMODE_SOM));
+                mdt_mfd_set_mode(mfd, ret == MDT_IOEPOCH_OPENED ?
+                                      FMODE_EPOCH : FMODE_SOM);
 
                 LASSERT(mdt_info_req(info));
                 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
+                cfs_spin_lock(&med->med_open_lock);
+                cfs_list_add(&mfd->mfd_list, &med->med_open_head);
                 class_handle_hash_back(&mfd->mfd_handle);
-                spin_unlock(&med->med_open_lock);
+                cfs_spin_unlock(&med->med_open_lock);
 
-                if (ret == 1) {
+                if (ret == MDT_IOEPOCH_OPENED) {
                         ret = 0;
                 } else {
+                        ret = -EAGAIN;
                         CDEBUG(D_INODE, "Size-on-MDS attribute update is "
                                "needed on "DFID"\n", PFID(mdt_object_fid(o)));
                 }
@@ -1151,20 +1524,24 @@ int mdt_close(struct mdt_thread_info *info)
         int rc, ret = 0;
         ENTRY;
 
+        mdt_counter_incr(req->rq_export, LPROC_MDT_CLOSE);
         /* Close may come with the Size-on-MDS update. Unpack it. */
         rc = mdt_close_unpack(info);
         if (rc)
                 RETURN(err_serious(rc));
 
-        LASSERT(info->mti_epoch);
+        LASSERT(info->mti_ioepoch);
 
         req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
                              info->mti_mdt->mdt_max_mdsize);
         req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
                              info->mti_mdt->mdt_max_cookiesize);
         rc = req_capsule_server_pack(info->mti_pill);
-        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
+        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
+                if (rc == 0)
+                        mdt_shrink_reply(info);
                 RETURN(lustre_msg_get_status(req->rq_repmsg));
+        }
 
         /* Continue to close handle even if we can not pack reply */
         if (rc == 0) {
@@ -1183,22 +1560,24 @@ int mdt_close(struct mdt_thread_info *info)
                 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
                 repbody->eadatasize = 0;
                 repbody->aclsize = 0;
-        } else
+        } else {
                 rc = err_serious(rc);
+        }
 
         med = &req->rq_export->exp_mdt_data;
-        spin_lock(&med->med_open_lock);
-        mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
+        cfs_spin_lock(&med->med_open_lock);
+        mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
         if (mdt_mfd_closed(mfd)) {
-                spin_unlock(&med->med_open_lock);
+                cfs_spin_unlock(&med->med_open_lock);
                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID
                        ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
-                       info->mti_epoch->handle.cookie);
-                rc = err_serious(-ESTALE);
+                       info->mti_ioepoch->handle.cookie);
+                /** not serious error since bug 3633 */
+                rc = -ESTALE;
         } else {
                 class_handle_unhash(&mfd->mfd_handle);
-                list_del_init(&mfd->mfd_list);
-                spin_unlock(&med->med_open_lock);
+                cfs_list_del_init(&mfd->mfd_list);
+                cfs_spin_unlock(&med->med_open_lock);
 
                 /* Do not lose object before last unlink. */
                 o = mfd->mfd_object;
@@ -1215,11 +1594,23 @@ int mdt_close(struct mdt_thread_info *info)
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
                 RETURN(err_serious(-ENOMEM));
 
+        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
+                                 OBD_FAIL_MDS_CLOSE_NET_REP))
+                info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
         RETURN(rc ? rc : ret);
 }
 
+/**
+ * DONE_WRITING rpc handler.
+ *
+ * As mfd is not kept after replayed CLOSE (see mdt_ioepoch_close_on_replay()),
+ * only those DONE_WRITING rpc will be replayed which really wrote smth on disk,
+ * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
+ * skip attributes and reconstruct the reply here.
+ */
 int mdt_done_writing(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
@@ -1241,30 +1632,45 @@ int mdt_done_writing(struct mdt_thread_info *info)
                 RETURN(err_serious(rc));
 
         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
-                RETURN(lustre_msg_get_status(mdt_info_req(info)->rq_repmsg));
+                RETURN(lustre_msg_get_status(req->rq_repmsg));
 
         med = &info->mti_exp->exp_mdt_data;
-        spin_lock(&med->med_open_lock);
-        mfd = mdt_handle2mfd(info, &info->mti_epoch->handle);
+        cfs_spin_lock(&med->med_open_lock);
+        mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
         if (mfd == NULL) {
-                spin_unlock(&med->med_open_lock);
+                cfs_spin_unlock(&med->med_open_lock);
                 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
-                       ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
-                       info->mti_epoch->handle.cookie);
+                       ": cookie = "LPX64" ioepoch = "LPU64"\n",
+                       PFID(info->mti_rr.rr_fid1),
+                       info->mti_ioepoch->handle.cookie,
+                       info->mti_ioepoch->ioepoch);
+                /* If this is a replay, reconstruct the transno. */
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        mdt_empty_transno(info);
+                        RETURN(info->mti_ioepoch->flags & MF_SOM_AU ?
+                               -EAGAIN : 0);
+                }
                 RETURN(-ESTALE);
-        } 
+        }
+
         LASSERT(mfd->mfd_mode == FMODE_EPOCH ||
-                mfd->mfd_mode == FMODE_EPOCHLCK);
+                mfd->mfd_mode == FMODE_TRUNC);
         class_handle_unhash(&mfd->mfd_handle);
-        list_del_init(&mfd->mfd_list);
-        spin_unlock(&med->med_open_lock);
+        cfs_list_del_init(&mfd->mfd_list);
+        cfs_spin_unlock(&med->med_open_lock);
 
         /* Set EPOCH CLOSE flag if not set by client. */
-        info->mti_epoch->flags |= MF_EPOCH_CLOSE;
+        info->mti_ioepoch->flags |= MF_EPOCH_CLOSE;
         info->mti_attr.ma_valid = 0;
+
+        info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
+        OBD_ALLOC(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
+        if (info->mti_attr.ma_lmm == NULL)
+                RETURN(-ENOMEM);
+
         rc = mdt_mfd_close(info, mfd);
+
+        OBD_FREE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
         mdt_empty_transno(info);
         RETURN(rc);
 }
-