Whamcloud - gitweb
LU-3544 fid: do open-by-fid by default
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 7e96491..d35526e 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -58,44 +58,47 @@ static struct portals_handle_ops mfd_handle_ops = {
 
 /* Create a new mdt_file_data struct, initialize it,
  * and insert it to global hash table */
-struct mdt_file_data *mdt_mfd_new(void)
+struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
 {
-        struct mdt_file_data *mfd;
-        ENTRY;
-
-        OBD_ALLOC_PTR(mfd);
-        if (mfd != NULL) {
-                CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
-                CFS_INIT_LIST_HEAD(&mfd->mfd_list);
+       struct mdt_file_data *mfd;
+       ENTRY;
+
+       OBD_ALLOC_PTR(mfd);
+       if (mfd != NULL) {
+               CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+               mfd->mfd_handle.h_owner = med;
+               CFS_INIT_LIST_HEAD(&mfd->mfd_list);
                class_handle_hash(&mfd->mfd_handle, &mfd_handle_ops);
-        }
-        RETURN(mfd);
+       }
+
+       RETURN(mfd);
 }
 
 /*
  * Find the mfd pointed to by handle in global hash table.
  * In case of replay the handle is obsoleted
- * but mfd can be found in mfd list by that handle
+ * but mfd can be found in mfd list by that handle.
+ * Callers need to be holding med_open_lock.
  */
-struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
-                                     const struct lustre_handle *handle)
+struct mdt_file_data *mdt_handle2mfd(struct mdt_export_data *med,
+                                    const struct lustre_handle *handle,
+                                    bool is_replay_or_resent)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct mdt_file_data  *mfd;
-        ENTRY;
+       struct mdt_file_data   *mfd;
+       ENTRY;
+
+       LASSERT(handle != NULL);
+       mfd = class_handle2object(handle->cookie, med);
+       /* during dw/setattr replay the mfd can be found by old handle */
+       if (mfd == NULL && is_replay_or_resent) {
+               cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+                       if (mfd->mfd_old_handle.cookie == handle->cookie)
+                               RETURN(mfd);
+               }
+               mfd = NULL;
+       }
 
-        LASSERT(handle != NULL);
-        mfd = class_handle2object(handle->cookie);
-        /* during dw/setattr replay the mfd can be found by old handle */
-        if (mfd == NULL && req_is_replay(req)) {
-                struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
-                cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
-                        if (mfd->mfd_old_handle.cookie == handle->cookie)
-                                RETURN (mfd);
-                }
-                mfd = NULL;
-        }
-        RETURN (mfd);
+       RETURN(mfd);
 }
 
 /* free mfd */
@@ -108,29 +111,33 @@ void mdt_mfd_free(struct mdt_file_data *mfd)
 static int mdt_create_data(struct mdt_thread_info *info,
                            struct mdt_object *p, struct mdt_object *o)
 {
-        struct md_op_spec     *spec = &info->mti_spec;
-        struct md_attr        *ma   = &info->mti_attr;
-        int                    rc   = 0;
-        ENTRY;
+       struct md_op_spec     *spec = &info->mti_spec;
+       struct md_attr        *ma   = &info->mti_attr;
+       int                    rc   = 0;
+       ENTRY;
 
-        if (!md_should_create(spec->sp_cr_flags))
-                RETURN(0);
+       if (!md_should_create(spec->sp_cr_flags))
+               RETURN(0);
 
-        ma->ma_need = MA_INODE | MA_LOV;
-        ma->ma_valid = 0;
+       ma->ma_need = MA_INODE | MA_LOV;
+       ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
-        if (!(o->mot_flags & MOF_LOV_CREATED)) {
-                rc = mdo_create_data(info->mti_env,
-                                     p ? mdt_object_child(p) : NULL,
-                                     mdt_object_child(o), spec, ma);
+       if (!(o->mot_flags & MOF_LOV_CREATED)) {
+               if (p != NULL && !fid_is_md_operative(mdt_object_fid(p)))
+                       GOTO(unlock, rc = -EPERM);
+
+               rc = mdo_create_data(info->mti_env,
+                                    p ? mdt_object_child(p) : NULL,
+                                    mdt_object_child(o), spec, ma);
                if (rc == 0)
                        rc = mdt_attr_get_complex(info, o, ma);
 
-                if (rc == 0 && ma->ma_valid & MA_LOV)
-                        o->mot_flags |= MOF_LOV_CREATED;
-        }
+               if (rc == 0 && ma->ma_valid & MA_LOV)
+                       o->mot_flags |= MOF_LOV_CREATED;
+       }
+unlock:
        mutex_unlock(&o->mot_lov_mutex);
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_ioepoch_opened(struct mdt_object *mo)
@@ -168,9 +175,9 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
         int rc = 0;
         ENTRY;
 
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
-            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
-                RETURN(0);
+       if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+           !S_ISREG(lu_object_attr(&o->mot_obj)))
+               RETURN(0);
 
        mutex_lock(&o->mot_ioepoch_mutex);
        if (mdt_ioepoch_opened(o)) {
@@ -217,18 +224,13 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
 /**
  * Update SOM on-disk attributes.
  * If enabling, write update inodes and lustre-ea with the proper IOEpoch,
- * mountid and attributes. If disabling, zero IOEpoch id in lustre-ea.
+ * mountid and attributes. If disabling, clean SOM xattr.
  * Call under ->mot_ioepoch_mutex.
  */
 static int mdt_som_attr_set(struct mdt_thread_info *info,
-                            struct mdt_object *obj, __u64 ioepoch, int enable)
+                           struct mdt_object *obj, __u64 ioepoch, bool enable)
 {
-       struct lustre_mdt_attrs *lma;
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_buf           *buf = &info->mti_buf;
        struct md_object        *next = mdt_object_child(obj);
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct lu_attr          *la = &ma->ma_attr;
        int                      rc;
         ENTRY;
 
@@ -236,35 +238,35 @@ static int mdt_som_attr_set(struct mdt_thread_info *info,
                " on "DFID".\n", enable ? "update" : "disabling",
                ioepoch, PFID(mdt_object_fid(obj)));
 
-       lma = (struct lustre_mdt_attrs *) info->mti_xattr_buf;
-       CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*lma));
-
-       buf->lb_buf = lma;
-       buf->lb_len = sizeof(info->mti_xattr_buf);
-       rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LMA);
-       if (rc > 0) {
-               lustre_lma_swab(lma);
-       } else if (rc == -ENODATA) {
-               memset(lma, 0, sizeof(*lma));
+       if (enable) {
+               struct lu_buf           *buf = &info->mti_buf;
+               struct som_attrs        *attrs;
+               struct md_attr          *ma = &info->mti_attr;
+               struct lu_attr          *la = &ma->ma_attr;
+               struct obd_device       *obd = info->mti_mdt->mdt_lut.lut_obd;
+
+               attrs = (struct som_attrs *)info->mti_xattr_buf;
+               CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*attrs));
+
+               /* pack SOM attributes */
+               memset(attrs, 0, sizeof(*attrs));
+               attrs->som_ioepoch = ioepoch;
+               attrs->som_mountid = obd->u.obt.obt_mount_count;
+               if ((la->la_valid & LA_SIZE) != 0)
+                       attrs->som_size = la->la_size;
+               if ((la->la_valid & LA_BLOCKS) != 0)
+                       attrs->som_blocks = la->la_blocks;
+               lustre_som_swab(attrs);
+
+               /* update SOM attributes */
+               buf->lb_buf = attrs;
+               buf->lb_len = sizeof(*attrs);
+               rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_SOM, 0);
        } else {
-               RETURN(rc);
+               /* delete SOM attributes */
+               rc = mo_xattr_del(info->mti_env, next, XATTR_NAME_SOM);
        }
 
-       /* Copy FID */
-       memcpy(&lma->lma_self_fid, mdt_object_fid(obj), sizeof(lma->lma_self_fid));
-
-       /* Copy SOM data */
-       lma->lma_ioepoch     = ioepoch;
-       lma->lma_som_size    = la->la_valid & LA_SIZE ? la->la_size : 0;
-       lma->lma_som_blocks  = la->la_valid & LA_BLOCKS ?  la->la_blocks : 0;
-       lma->lma_som_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
-       if (enable)
-               lma->lma_compat |= LMAC_SOM;
-       else
-               lma->lma_compat &= ~LMAC_SOM;
-
-       rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_LMA, 0);
-
         RETURN(rc);
 }
 
@@ -416,7 +418,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
         if (ret == MDT_IOEPOCH_GETATTR && recovery) {
                 struct mdt_body *rep;
                 rep = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-                rep->valid |= OBD_MD_FLGETATTRLOCK;
+               rep->mbo_valid |= OBD_MD_FLGETATTRLOCK;
         }
 
         RETURN(rc ? : ret);
@@ -438,12 +440,12 @@ error_up:
  */
 static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
-        ENTRY;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       ENTRY;
 
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
-            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
-                RETURN(0);
+       if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+           !S_ISREG(lu_object_attr(&o->mot_obj)))
+               RETURN(0);
 
         LASSERT(o->mot_ioepoch_count);
         LASSERT(info->mti_ioepoch == NULL ||
@@ -455,7 +457,7 @@ static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
                 RETURN(mdt_ioepoch_close_on_eviction(info, o));
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
                 RETURN(mdt_ioepoch_close_on_replay(info, o));
-        if (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)
+       if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_EPOCH_CLOSE))
                 RETURN(mdt_ioepoch_close_reg(info, o));
         /* IO epoch is not closed. */
         RETURN(MDT_IOEPOCH_OPENED);
@@ -472,16 +474,16 @@ static int mdt_ioepoch_close(struct mdt_thread_info *info, struct mdt_object *o)
  */
 int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
-        __u64 ioepoch = 0;
-        int act = MDT_SOM_ENABLE;
-        int rc = 0;
-        ENTRY;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       __u64                    ioepoch = 0;
+       int                      act = MDT_SOM_ENABLE;
+       int                      rc = 0;
+       ENTRY;
 
-        LASSERT(!req || info->mti_ioepoch);
-        if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
-            !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
-                RETURN(0);
+       LASSERT(!req || info->mti_ioepoch);
+       if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
+           !S_ISREG(lu_object_attr(&o->mot_obj)))
+               RETURN(0);
 
         /* No size whereas MF_SOM_CHANGE is set means client failed to
          * obtain ost attributes, drop the SOM cache on disk if so. */
@@ -501,7 +503,8 @@ int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
                 ioepoch =  info->mti_ioepoch ?
                         info->mti_ioepoch->ioepoch : o->mot_ioepoch;
 
-                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+               if (req != NULL
+                   && !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
                         rc = mdt_som_attr_set(info, o, ioepoch, act);
                 mdt_object_som_enable(o, ioepoch);
         }
@@ -582,10 +585,10 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
                        struct obd_export *exp = req->rq_export;
 
                        CERROR("%s: replay trans "LPU64" NID %s: rc = %d\n",
-                               mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
-                               info->mti_transno,
-                               libcfs_nid2str(exp->exp_connection->c_peer.nid),
-                               rc);
+                              mdt_obd_name(mdt), info->mti_transno,
+                              libcfs_nid2str(exp->exp_connection->c_peer.nid),
+                              rc);
+                       spin_unlock(&mdt->mdt_lut.lut_translock);
                        RETURN_EXIT;
                }
        } else if (info->mti_transno == 0) {
@@ -597,18 +600,18 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
        }
        spin_unlock(&mdt->mdt_lut.lut_translock);
 
-        CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
-                        info->mti_transno,
-                        req->rq_export->exp_obd->obd_last_committed);
+       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+              info->mti_transno,
+              req->rq_export->exp_obd->obd_last_committed);
 
-        req->rq_transno = info->mti_transno;
-        lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
+       req->rq_transno = info->mti_transno;
+       lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
 
-        /* update lcd in memory only for resent cases */
-        ted = &req->rq_export->exp_target_data;
-        LASSERT(ted);
+       /* update lcd in memory only for resent cases */
+       ted = &req->rq_export->exp_target_data;
+       LASSERT(ted);
        mutex_lock(&ted->ted_lcd_lock);
-        lcd = ted->ted_lcd;
+       lcd = ted->ted_lcd;
        if (info->mti_transno < lcd->lcd_last_transno &&
            info->mti_transno != 0) {
                /* This should happen during replay. Do not update
@@ -617,8 +620,8 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
                 * be checked correctly by xid */
                mutex_unlock(&ted->ted_lcd_lock);
                CDEBUG(D_HA, "%s: transno = "LPU64" < last_transno = "LPU64"\n",
-                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
-                       info->mti_transno, lcd->lcd_last_transno);
+                      mdt_obd_name(mdt), info->mti_transno,
+                      lcd->lcd_last_transno);
                RETURN_EXIT;
        }
 
@@ -649,18 +652,46 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
         EXIT;
 }
 
-void mdt_mfd_set_mode(struct mdt_file_data *mfd, int mode)
+void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 {
-        LASSERT(mfd != NULL);
+       LASSERT(mfd != NULL);
 
-        CDEBUG(D_HA, "Change mfd %p mode 0x%x->0x%x\n",
-               mfd, (unsigned int)mfd->mfd_mode, (unsigned int)mode);
+       CDEBUG(D_HA, DFID " Change mfd mode "LPO64" -> "LPO64".\n",
+              PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
 
-        mfd->mfd_mode = mode;
+       mfd->mfd_mode = mode;
+}
+
+/**
+ * prep ma_lmm/ma_lmv for md_attr from reply
+ */
+void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
+                             struct mdt_object *obj,
+                             struct md_attr *ma)
+{
+       LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+       if (S_ISDIR(obj->mot_header.loh_attr)) {
+               ma->ma_lmv = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmv_size > 0)
+                       ma->ma_need |= MA_LMV;
+       } else {
+               ma->ma_lmm = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmm_size > 0)
+                       ma->ma_need |= MA_LOV;
+       }
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
-                        struct mdt_object *o, __u64 flags, int created)
+                       struct mdt_object *o, __u64 flags, int created,
+                       struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
@@ -675,7 +706,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         isreg = S_ISREG(la->la_mode);
         isdir = S_ISDIR(la->la_mode);
-        if (isreg && !(ma->ma_valid & MA_LOV)) {
+       if (isreg && !(ma->ma_valid & MA_LOV) && !(flags & MDS_OPEN_RELEASE)) {
                 /*
                  * No EA, check whether it is will set regEA and dirEA since in
                  * above attr get, these size might be zero, so reset it, to
@@ -688,6 +719,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 rc = mdt_create_data(info, p, o);
                 if (rc)
                         RETURN(rc);
+
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
         }
 
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
@@ -695,21 +729,28 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         if (ma->ma_valid & MA_LOV) {
                 LASSERT(ma->ma_lmm_size != 0);
-                repbody->eadatasize = ma->ma_lmm_size;
-                if (isdir)
-                        repbody->valid |= OBD_MD_FLDIREA;
-                else
-                        repbody->valid |= OBD_MD_FLEASIZE;
-        }
+               repbody->mbo_eadatasize = ma->ma_lmm_size;
+               if (isdir)
+                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+               else
+                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
+       }
 
-        if (flags & FMODE_WRITE) {
-                rc = mdt_write_get(o);
-                if (rc == 0) {
-                        mdt_ioepoch_open(info, o, created);
-                        repbody->ioepoch = o->mot_ioepoch;
+       if (ma->ma_valid & MA_LMV) {
+               LASSERT(ma->ma_lmv_size != 0);
+               repbody->mbo_eadatasize = ma->ma_lmv_size;
+               LASSERT(isdir);
+               repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
+       }
+
+       if (flags & FMODE_WRITE) {
+               rc = mdt_write_get(o);
+               if (rc == 0) {
+                       mdt_ioepoch_open(info, o, created);
+                       repbody->mbo_ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-                rc = mdt_write_deny(o);
+               rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -719,87 +760,98 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         if (rc)
                 GOTO(err_out, rc);
 
-        mfd = mdt_mfd_new();
-        if (mfd != NULL) {
-                /*
-                 * Keep a reference on this object for this open, and is
-                 * released by mdt_mfd_close().
-                 */
-                mdt_object_get(info->mti_env, o);
-
-                /*
-                 * @flags is always not zero. At least it should be FMODE_READ,
-                 * FMODE_WRITE or MDS_FMODE_EXEC.
-                 */
-                LASSERT(flags != 0);
-
-                /* Open handling. */
-                mdt_mfd_set_mode(mfd, flags);
-
-                mfd->mfd_object = o;
-                mfd->mfd_xid = req->rq_xid;
-
-                /* replay handle */
-                if (req_is_replay(req)) {
-                        struct mdt_file_data *old_mfd;
-                        /* Check wheather old cookie already exist in
-                         * the list, becasue when do recovery, client
-                         * might be disconnected from server, and
-                         * restart replay, so there maybe some orphan
-                         * mfd here, we should remove them */
-                        LASSERT(info->mti_rr.rr_handle != NULL);
-                        old_mfd = mdt_handle2mfd(info, info->mti_rr.rr_handle);
-                        if (old_mfd) {
-                                CDEBUG(D_HA, "del orph mfd %p fid=("DFID") "
-                                       "cookie=" LPX64"\n", mfd,
-                                       PFID(mdt_object_fid(mfd->mfd_object)),
-                                       info->mti_rr.rr_handle->cookie);
-                               spin_lock(&med->med_open_lock);
-                               class_handle_unhash(&old_mfd->mfd_handle);
-                               cfs_list_del_init(&old_mfd->mfd_list);
-                               spin_unlock(&med->med_open_lock);
-                                /* no attr update for that close */
-                                la->la_valid = 0;
-                                ma->ma_valid |= MA_FLAGS;
-                                ma->ma_attr_flags |= MDS_RECOV_OPEN;
-                                mdt_mfd_close(info, old_mfd);
-                                ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
-                                ma->ma_valid &= ~MA_FLAGS;
-                        }
-                        CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
-                               info->mti_rr.rr_handle->cookie);
-                        mfd->mfd_old_handle.cookie =
-                                                info->mti_rr.rr_handle->cookie;
-                }
-                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-
-                if (req->rq_export->exp_disconnected) {
-                       spin_lock(&med->med_open_lock);
-                       class_handle_unhash(&mfd->mfd_handle);
-                       cfs_list_del_init(&mfd->mfd_list);
+       mfd = mdt_mfd_new(med);
+       if (mfd == NULL)
+               GOTO(err_out, rc = -ENOMEM);
+
+       /*
+        * Keep a reference on this object for this open, and is
+        * released by mdt_mfd_close().
+        */
+       mdt_object_get(info->mti_env, o);
+       mfd->mfd_object = o;
+       mfd->mfd_xid = req->rq_xid;
+
+       /*
+        * @flags is always not zero. At least it should be FMODE_READ,
+        * FMODE_WRITE or MDS_FMODE_EXEC.
+        */
+       LASSERT(flags != 0);
+
+       /* Open handling. */
+       mdt_mfd_set_mode(mfd, flags);
+
+       atomic_inc(&o->mot_open_count);
+       if (flags & MDS_OPEN_LEASE)
+               atomic_inc(&o->mot_lease_count);
+
+       /* replay handle */
+       if (req_is_replay(req)) {
+               struct mdt_file_data *old_mfd;
+               /* Check wheather old cookie already exist in
+                * the list, becasue when do recovery, client
+                * might be disconnected from server, and
+                * restart replay, so there maybe some orphan
+                * mfd here, we should remove them */
+               LASSERT(info->mti_rr.rr_handle != NULL);
+               spin_lock(&med->med_open_lock);
+               old_mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle, true);
+               if (old_mfd != NULL) {
+                       CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
+                              "cookie = "LPX64"\n", mfd,
+                              PFID(mdt_object_fid(mfd->mfd_object)),
+                              info->mti_rr.rr_handle->cookie);
+                       class_handle_unhash(&old_mfd->mfd_handle);
+                       cfs_list_del_init(&old_mfd->mfd_list);
                        spin_unlock(&med->med_open_lock);
-                       mdt_mfd_close(info, mfd);
+                       /* no attr update for that close */
+                       la->la_valid = 0;
+                       ma->ma_valid |= MA_FLAGS;
+                       ma->ma_attr_flags |= MDS_RECOV_OPEN;
+                       mdt_mfd_close(info, old_mfd);
+                       ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
+                       ma->ma_valid &= ~MA_FLAGS;
                } else {
-                       spin_lock(&med->med_open_lock);
-                       cfs_list_add(&mfd->mfd_list, &med->med_open_head);
                        spin_unlock(&med->med_open_lock);
-                }
+                       CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
+                              "cookie = "LPX64"\n",
+                              PFID(mdt_object_fid(mfd->mfd_object)),
+                              info->mti_rr.rr_handle->cookie);
+               }
 
-                mdt_empty_transno(info, rc);
-        } else {
-                GOTO(err_out, rc = -ENOMEM);
-        }
+               CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
+                      info->mti_rr.rr_handle->cookie);
+
+               mfd->mfd_old_handle.cookie = info->mti_rr.rr_handle->cookie;
+       }
+
+       repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
+
+       if (req->rq_export->exp_disconnected) {
+               spin_lock(&med->med_open_lock);
+               class_handle_unhash(&mfd->mfd_handle);
+               cfs_list_del_init(&mfd->mfd_list);
+               spin_unlock(&med->med_open_lock);
+               mdt_mfd_close(info, mfd);
+       } else {
+               spin_lock(&med->med_open_lock);
+               cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+               spin_unlock(&med->med_open_lock);
+       }
+
+       mdt_empty_transno(info, rc);
 
         RETURN(rc);
 
 err_out:
-        if (flags & FMODE_WRITE)
-                        /* XXX We also need to close io epoch here.
-                         * See LU-1220 - green */
-                mdt_write_put(o);
-        else if (flags & FMODE_EXEC)
-                mdt_write_allow(o);
-        return rc;
+       if (flags & FMODE_WRITE)
+               /* XXX We also need to close io epoch here.
+                * See LU-1220 - green */
+               mdt_write_put(o);
+       else if (flags & MDS_FMODE_EXEC)
+               mdt_write_allow(o);
+
+       return rc;
 }
 
 int mdt_finish_open(struct mdt_thread_info *info,
@@ -832,9 +884,13 @@ int mdt_finish_open(struct mdt_thread_info *info,
         * that looks like it was actually almost succesful and a failure at the
         * same time */
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
-               mdt_set_disposition(info, rep, DISP_OPEN_LOCK | \
-                                   DISP_OPEN_OPEN | DISP_LOOKUP_NEG | \
-                                   DISP_LOOKUP_POS);
+               mdt_set_disposition(info, rep, DISP_OPEN_OPEN |
+                                              DISP_LOOKUP_NEG |
+                                              DISP_LOOKUP_POS);
+
+               if (flags & MDS_OPEN_LOCK)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+
                RETURN(-ENOENT);
        }
 
@@ -843,15 +899,15 @@ int mdt_finish_open(struct mdt_thread_info *info,
 
                 rc = mdt_pack_remote_perm(info, o, buf);
                 if (rc) {
-                        repbody->valid &= ~OBD_MD_FLRMTPERM;
-                        repbody->aclsize = 0;
-                } else {
-                        repbody->valid |= OBD_MD_FLRMTPERM;
-                        repbody->aclsize = sizeof(struct mdt_remote_perm);
+                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
+                       repbody->mbo_aclsize = 0;
+               } else {
+                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
+                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
                 }
         }
 #ifdef CONFIG_FS_POSIX_ACL
-        else if (exp->exp_connect_flags & OBD_CONNECT_ACL) {
+       else if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
                 const struct lu_env *env = info->mti_env;
                 struct md_object *next = mdt_object_child(o);
                 struct lu_buf *buf = &info->mti_buf;
@@ -864,8 +920,8 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                           XATTR_NAME_ACL_ACCESS);
                         if (rc < 0) {
                                 if (rc == -ENODATA) {
-                                        repbody->aclsize = 0;
-                                        repbody->valid |= OBD_MD_FLACL;
+                                       repbody->mbo_aclsize = 0;
+                                       repbody->mbo_valid |= OBD_MD_FLACL;
                                         rc = 0;
                                 } else if (rc == -EOPNOTSUPP) {
                                         rc = 0;
@@ -873,16 +929,16 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                         CERROR("got acl size: %d\n", rc);
                                 }
                         } else {
-                                repbody->aclsize = rc;
-                                repbody->valid |= OBD_MD_FLACL;
+                               repbody->mbo_aclsize = rc;
+                               repbody->mbo_valid |= OBD_MD_FLACL;
                                 rc = 0;
                         }
                 }
         }
 #endif
 
-        if (info->mti_mdt->mdt_opts.mo_mds_capa &&
-            exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+       if (info->mti_mdt->mdt_lut.lut_mds_capa &&
+           exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
@@ -891,11 +947,11 @@ int mdt_finish_open(struct mdt_thread_info *info,
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
-                repbody->valid |= OBD_MD_FLMDSCAPA;
+               repbody->mbo_valid |= OBD_MD_FLMDSCAPA;
         }
-        if (info->mti_mdt->mdt_opts.mo_oss_capa &&
-            exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
-            S_ISREG(lu_object_attr(&o->mot_obj.mo_lu))) {
+       if (info->mti_mdt->mdt_lut.lut_oss_capa &&
+           exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA &&
+           S_ISREG(lu_object_attr(&o->mot_obj))) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
@@ -904,20 +960,18 @@ int mdt_finish_open(struct mdt_thread_info *info,
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
-                repbody->valid |= OBD_MD_FLOSSCAPA;
+               repbody->mbo_valid |= OBD_MD_FLOSSCAPA;
         }
 
         /*
          * If we are following a symlink, don't open; and do not return open
          * handle for special nodes as client required.
          */
-        if (islnk || (!isreg && !isdir &&
-            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
-                lustre_msg_set_transno(req->rq_repmsg, 0);
-                RETURN(0);
-        }
-
-        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+       if (islnk || (!isreg && !isdir &&
+           (exp_connect_flags(req->rq_export) & OBD_CONNECT_NODEVOH))) {
+               lustre_msg_set_transno(req->rq_repmsg, 0);
+               RETURN(0);
+       }
 
         /*
          * We need to return the existing object's fid back, so it is done here,
@@ -952,22 +1006,26 @@ int mdt_finish_open(struct mdt_thread_info *info,
                spin_unlock(&med->med_open_lock);
 
                 if (mfd != NULL) {
-                        repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-                        /*set repbody->ea_size for resent case*/
-                        if (ma->ma_valid & MA_LOV) {
-                                LASSERT(ma->ma_lmm_size != 0);
-                                repbody->eadatasize = ma->ma_lmm_size;
-                                if (isdir)
-                                        repbody->valid |= OBD_MD_FLDIREA;
-                                else
-                                        repbody->valid |= OBD_MD_FLEASIZE;
+                       repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
+                       /* set repbody->ea_size for resent case */
+                       if (ma->ma_valid & MA_LOV) {
+                               LASSERT(ma->ma_lmm_size != 0);
+                               repbody->mbo_eadatasize = ma->ma_lmm_size;
+                               if (isdir)
+                                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+                               else
+                                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
                         }
-                        RETURN(0);
-                }
-        }
+                       mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+                       RETURN(0);
+               }
+       }
 
-        rc = mdt_mfd_open(info, p, o, flags, created);
-        RETURN(rc);
+       rc = mdt_mfd_open(info, p, o, flags, created, rep);
+       if (!rc)
+               mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+
+       RETURN(rc);
 }
 
 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
@@ -984,7 +1042,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
-        __u32                   flags = info->mti_spec.sp_cr_flags;
+       __u64                   flags = info->mti_spec.sp_cr_flags;
         struct ldlm_reply       *ldlm_rep;
         struct mdt_object       *parent;
         struct mdt_object       *child;
@@ -996,13 +1054,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
-        ma->ma_need = MA_INODE;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
+       ma->ma_need = MA_INODE | MA_HSM;
         ma->ma_valid = 0;
 
         mdt_req_from_lcd(req, lcd);
@@ -1033,37 +1085,49 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                         mdt_export_evict(exp);
                         RETURN_EXIT;
                 }
-                child = mdt_object_find(env, mdt, rr->rr_fid2);
-                if (IS_ERR(child)) {
-                        rc = PTR_ERR(child);
-                        LCONSOLE_WARN("Child "DFID" lookup error %d."
-                                      " Evicting client %s with export %s.\n",
-                                      PFID(mdt_object_fid(child)), rc,
-                                      obd_uuid2str(&exp->exp_client_uuid),
-                                      obd_export_nid2str(exp));
-                        mdt_object_put(env, parent);
-                        mdt_export_evict(exp);
-                        RETURN_EXIT;
-                }
-                rc = mdt_object_exists(child);
-                if (rc > 0) {
-
-                        mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
-                       rc = mdt_attr_get_complex(info, child, ma);
-                        if (rc == 0)
-                              rc = mdt_finish_open(info, parent, child,
-                                                   flags, 1, ldlm_rep);
-                } else if (rc < 0) {
-                        /* the child object was created on remote server */
-                        repbody->fid1 = *rr->rr_fid2;
-                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
-                        rc = 0;
-                } else if (rc == 0) {
-                        /* the child does not exist, we should do regular open */
-                        mdt_object_put(env, parent);
-                        mdt_object_put(env, child);
-                        GOTO(regular_open, 0);
-                }
+
+               child = mdt_object_find(env, mdt, rr->rr_fid2);
+               if (IS_ERR(child)) {
+                       rc = PTR_ERR(child);
+                       LCONSOLE_WARN("cannot lookup child "DFID": rc = %d; "
+                                     "evicting client %s with export %s\n",
+                                     PFID(rr->rr_fid2), rc,
+                                     obd_uuid2str(&exp->exp_client_uuid),
+                                     obd_export_nid2str(exp));
+                       mdt_object_put(env, parent);
+                       mdt_export_evict(exp);
+                       RETURN_EXIT;
+               }
+
+               if (unlikely(mdt_object_remote(child))) {
+                       /* the child object was created on remote server */
+                       if (!mdt_is_dne_client(exp)) {
+                               /* Return -EIO for old client */
+                               mdt_object_put(env, parent);
+                               mdt_object_put(env, child);
+                               GOTO(out, rc = -EIO);
+                       }
+                       repbody->mbo_fid1 = *rr->rr_fid2;
+                       repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                       rc = 0;
+               } else {
+                       if (mdt_object_exists(child)) {
+                               mdt_set_capainfo(info, 1, rr->rr_fid2,
+                                                BYPASS_CAPA);
+                               mdt_prep_ma_buf_from_rep(info, child, ma);
+                               rc = mdt_attr_get_complex(info, child, ma);
+                               if (rc == 0)
+                                       rc = mdt_finish_open(info, parent,
+                                                            child, flags,
+                                                            1, ldlm_rep);
+                       } else {
+                               /* the child does not exist, we should do
+                                * regular open */
+                               mdt_object_put(env, parent);
+                               mdt_object_put(env, child);
+                               GOTO(regular_open, 0);
+                       }
+               }
                 mdt_object_put(env, parent);
                 mdt_object_put(env, child);
                 GOTO(out, rc);
@@ -1080,10 +1144,9 @@ out:
         LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
 }
 
-int mdt_open_by_fid(struct mdt_thread_info* info,
-                    struct ldlm_reply *rep)
+int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
 {
-        __u32                    flags = info->mti_spec.sp_cr_flags;
+       __u64                    flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct md_attr          *ma = &info->mti_attr;
         struct mdt_object       *o;
@@ -1094,42 +1157,292 @@ int mdt_open_by_fid(struct mdt_thread_info* info,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc > 0) {
-                mdt_set_disposition(info, rep, (DISP_IT_EXECD |
-                                                DISP_LOOKUP_EXECD |
-                                                DISP_LOOKUP_POS));
-
-               rc = mdt_attr_get_complex(info, o, ma);
-                if (rc == 0)
-                        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
-        } else if (rc == 0) {
-                rc = -ENOENT;
-        } else  {
+       if (unlikely(mdt_object_remote(o))) {
                 /* the child object was created on remote server */
                 struct mdt_body *repbody;
+
+               mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+                                               DISP_LOOKUP_EXECD |
+                                               DISP_LOOKUP_POS));
                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-                repbody->fid1 = *rr->rr_fid2;
-                repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+               repbody->mbo_fid1 = *rr->rr_fid2;
+               repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                 rc = 0;
-        }
+       } else {
+               if (mdt_object_exists(o)) {
+                       mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+                                                       DISP_LOOKUP_EXECD |
+                                                       DISP_LOOKUP_POS));
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
+                       rc = mdt_attr_get_complex(info, o, ma);
+                       if (rc == 0)
+                               rc = mdt_finish_open(info, NULL, o, flags, 0,
+                                                    rep);
+               } else {
+                       rc = -ENOENT;
+               }
+       }
 
         mdt_object_put(info->mti_env, o);
         RETURN(rc);
 }
 
+/* lock object for open */
+static int mdt_object_open_lock(struct mdt_thread_info *info,
+                               struct mdt_object *obj,
+                               struct mdt_lock_handle *lhc,
+                               __u64 *ibits)
+{
+       struct md_attr  *ma = &info->mti_attr;
+       __u64            open_flags = info->mti_spec.sp_cr_flags;
+       ldlm_mode_t      lm = LCK_CR;
+       bool             acq_lease = !!(open_flags & MDS_OPEN_LEASE);
+       bool             try_layout = false;
+       bool             create_layout = false;
+       int              rc = 0;
+       ENTRY;
+
+       *ibits = 0;
+       mdt_lock_handle_init(lhc);
+
+       if (req_is_replay(mdt_info_req(info)))
+               RETURN(0);
+
+       if (S_ISREG(lu_object_attr(&obj->mot_obj))) {
+               if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV) &&
+                   md_should_create(open_flags))
+                       create_layout = true;
+               if (exp_connect_layout(info->mti_exp) && !create_layout &&
+                   ma->ma_need & MA_LOV)
+                       try_layout = true;
+       }
+
+       if (acq_lease) {
+               /* lease open, acquire write mode of open sem */
+               down_write(&obj->mot_open_sem);
+
+               /* Lease exists and ask for new lease */
+               if (atomic_read(&obj->mot_lease_count) > 0) {
+                       /* only exclusive open is supported, so lease
+                        * are conflicted to each other */
+                       GOTO(out, rc = -EBUSY);
+               }
+
+               /* Lease must be with open lock */
+               if (!(open_flags & MDS_OPEN_LOCK)) {
+                       CERROR("Request lease for file:"DFID ", but open lock "
+                               "is missed, open_flags = "LPO64".\n",
+                               PFID(mdt_object_fid(obj)), open_flags);
+                       GOTO(out, rc = -EPROTO);
+               }
+
+               /* XXX: only exclusive open is supported. */
+               lm = LCK_EX;
+               *ibits = MDS_INODELOCK_OPEN;
+
+               /* never grant LCK_EX layout lock to client */
+               try_layout = false;
+       } else { /* normal open */
+               /* normal open holds read mode of open sem */
+               down_read(&obj->mot_open_sem);
+
+               if (open_flags & FMODE_WRITE)
+                       lm = LCK_CW;
+               else if (open_flags & MDS_FMODE_EXEC)
+                       lm = LCK_PR;
+               else
+                       lm = LCK_CR;
+
+               if (open_flags & MDS_OPEN_LOCK) {
+                       *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
+               } else if (open_flags & (FMODE_WRITE | MDS_FMODE_EXEC) ||
+                          atomic_read(&obj->mot_lease_count) > 0) {
+                       /* We need to flush conflicting locks or revoke a lease.
+                        * In either case there is no need to acquire a layout
+                        * lock since it won't be returned to the client. */
+                       try_layout = false;
+                       *ibits = MDS_INODELOCK_OPEN;
+                       lhc = &info->mti_lh[MDT_LH_LOCAL];
+               }
+
+               CDEBUG(D_INODE, "normal open FID = "DFID", open_count = %d, "
+                      "lm = %d\n",
+                      PFID(mdt_object_fid(obj)),
+                      atomic_read(&obj->mot_open_count), lm);
+       }
+
+       mdt_lock_reg_init(lhc, lm);
+
+       /* one problem to return layout lock on open is that it may result
+        * in too many layout locks cached on the client side. */
+       if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_OPEN) && try_layout) {
+               /* return lookup lock to validate inode at the client side,
+                * this is pretty important otherwise mdt will return layout
+                * lock for each open.
+                * However this is a double-edged sword because changing
+                * permission will revoke huge # of LOOKUP locks. */
+               *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
+               if (!mdt_object_lock_try(info, obj, lhc, *ibits,
+                                        MDT_CROSS_LOCK)) {
+                       *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
+                       if (*ibits != 0)
+                               rc = mdt_object_lock(info, obj, lhc, *ibits,
+                                               MDT_CROSS_LOCK);
+               }
+       } else if (*ibits != 0) {
+               rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+       }
+
+       CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
+               ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
+               PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+
+       /* will change layout, revoke layout locks by enqueuing EX lock. */
+       if (rc == 0 && create_layout) {
+               struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
+
+               CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
+                       ", open_flags = "LPO64"\n",
+                       PFID(mdt_object_fid(obj)), open_flags);
+
+               /* We cannot enqueue another lock for the same resource we
+                * already have a lock for, due to mechanics of waiting list
+                * iterating in ldlm, see LU-3601.
+                * As such we'll drop the open lock we just got above here,
+                * it's ok not to have this open lock as it's main purpose is to
+                * flush unused cached client open handles. */
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
+
+               LASSERT(!try_layout);
+               mdt_lock_handle_init(ll);
+               mdt_lock_reg_init(ll, LCK_EX);
+               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+                                       MDT_LOCAL_LOCK);
+
+               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
+       }
+
+       /* Check if there is any other open handles after acquiring
+        * open lock. At this point, caching open handles have been revoked
+        * by open lock.
+        * XXX: Now only exclusive open is supported. Need to check the
+        * type of open for generic lease support. */
+       if (rc == 0 && acq_lease) {
+               struct ptlrpc_request *req = mdt_info_req(info);
+               struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+               struct mdt_file_data *mfd;
+               bool is_replay_or_resent;
+               int open_count = 0;
+
+               /* For lease: application can open a file and then apply lease,
+                * @handle contains original open handle in that case.
+                * In recovery, open REQ will be replayed and the lease REQ may
+                * be resent that means the open handle is already stale, so we
+                * need to fix it up here by finding new handle. */
+               is_replay_or_resent = req_is_replay(req) ||
+                       lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
+
+               /* if the request is _not_ a replay request, rr_handle
+                * may be used to hold an openhandle which is issuing the
+                * lease request, so that this openhandle doesn't count. */
+               mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle,
+                                    is_replay_or_resent);
+               if (mfd != NULL)
+                       ++open_count;
+
+               CDEBUG(D_INODE, "acq_lease "DFID": openers: %d, want: %d\n",
+                       PFID(mdt_object_fid(obj)),
+                       atomic_read(&obj->mot_open_count), open_count);
+
+               if (atomic_read(&obj->mot_open_count) > open_count)
+                       GOTO(out, rc = -EBUSY);
+       }
+       GOTO(out, rc);
+
+out:
+       RETURN(rc);
+}
+
+static void mdt_object_open_unlock(struct mdt_thread_info *info,
+                                  struct mdt_object *obj,
+                                  struct mdt_lock_handle *lhc,
+                                  __u64 ibits, int rc)
+{
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LOCAL];
+       ENTRY;
+
+       if (req_is_replay(mdt_info_req(info)))
+               RETURN_EXIT;
+
+       /* Release local lock - the lock put in MDT_LH_LOCAL will never
+        * return to client side. */
+       if (lustre_handle_is_used(&ll->mlh_reg_lh))
+               mdt_object_unlock(info, obj, ll, 1);
+
+       ll = &info->mti_lh[MDT_LH_LAYOUT];
+       /* Release local layout lock, layout was created */
+       if (lustre_handle_is_used(&ll->mlh_reg_lh)) {
+               LASSERT(!(ibits & MDS_INODELOCK_LAYOUT));
+               mdt_object_unlock(info, obj, ll, 1);
+       }
+
+       if (open_flags & MDS_OPEN_LEASE)
+               up_write(&obj->mot_open_sem);
+       else
+               up_read(&obj->mot_open_sem);
+
+       /* Cross-ref case, the lock should be returned to the client */
+       if (ibits == 0 || rc == -EREMOTE)
+               RETURN_EXIT;
+
+       if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT)) {
+               /* for the open request, the lock will only return to client
+                * if open or layout lock is granted. */
+               rc = 1;
+       }
+
+       if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               struct ldlm_reply       *ldlm_rep;
+
+               ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+               mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
+       }
+       RETURN_EXIT;
+}
+
+/**
+ * Check release is permitted for the current HSM flags.
+ */
+static bool mdt_hsm_release_allow(const struct md_attr *ma)
+{
+       if (!(ma->ma_valid & MA_HSM))
+               return false;
+
+       if (ma->ma_hsm.mh_flags & (HS_DIRTY|HS_NORELEASE|HS_LOST))
+               return false;
+
+       if (!(ma->ma_hsm.mh_flags & HS_ARCHIVED))
+               return false;
+
+       return true;
+}
+
 int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                         struct mdt_lock_handle *lhc)
 {
         const struct lu_env     *env   = info->mti_env;
         struct mdt_device       *mdt   = info->mti_mdt;
-        __u32                    flags = info->mti_spec.sp_cr_flags;
+        __u64                    flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr    = &info->mti_rr;
         struct md_attr          *ma    = &info->mti_attr;
         struct mdt_object       *parent= NULL;
         struct mdt_object       *o;
         int                      rc;
-        ldlm_mode_t              lm;
+       __u64                    ibits = 0;
         ENTRY;
 
        if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
@@ -1151,37 +1464,41 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc == 0) {
-                mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
-                                    DISP_LOOKUP_NEG));
-                GOTO(out, rc = -ENOENT);
-        } else if (rc < 0) {
-                CERROR("NFS remote open shouldn't happen.\n");
-                GOTO(out, rc);
-        }
-        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
-                                        DISP_LOOKUP_EXECD |
-                                        DISP_LOOKUP_POS));
-
-        if (flags & FMODE_WRITE)
-                lm = LCK_CW;
-        else if (flags & MDS_FMODE_EXEC)
-                lm = LCK_PR;
-        else
-                lm = LCK_CR;
+       if (mdt_object_remote(o)) {
+               CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
+                      mdt_obd_name(info->mti_mdt),
+                      PFID(rr->rr_fid2));
+               GOTO(out, rc = -EREMOTE);
+       } else if (!mdt_object_exists(o)) {
+               mdt_set_disposition(info, rep,
+                                   DISP_IT_EXECD |
+                                   DISP_LOOKUP_EXECD |
+                                   DISP_LOOKUP_NEG);
+               GOTO(out, rc = -ENOENT);
+       }
 
-        mdt_lock_handle_init(lhc);
-        mdt_lock_reg_init(lhc, lm);
-        rc = mdt_object_lock(info, o, lhc,
-                             MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                             MDT_CROSS_LOCK);
-        if (rc)
-                GOTO(out, rc);
+       mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       mdt_prep_ma_buf_from_rep(info, o, ma);
+       if (flags & MDS_OPEN_RELEASE)
+               ma->ma_need |= MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
-        if (rc)
-                GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc);
+
+       /* If a release request, check file flags are fine and ask for an
+        * exclusive open access. */
+       if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
+               GOTO(out, rc = -EPERM);
+
+       rc = mdt_check_resent_lock(info, o, lhc);
+       if (rc < 0) {
+               GOTO(out, rc);
+       } else if (rc > 0) {
+               rc = mdt_object_open_lock(info, o, lhc, &ibits);
+               if (rc)
+                       GOTO(out_unlock, rc);
+       }
 
         if (ma->ma_valid & MA_PFID) {
                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
@@ -1194,31 +1511,30 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
                 }
         }
 
-        if (flags & MDS_OPEN_LOCK)
-                mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
         rc = mdt_finish_open(info, parent, o, flags, 0, rep);
+       if (!rc) {
+               mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+               if (flags & MDS_OPEN_LOCK)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+               if (flags & MDS_OPEN_LEASE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LEASE);
+       }
+       GOTO(out_unlock, rc);
 
-        if (!(flags & MDS_OPEN_LOCK) || rc)
-                mdt_object_unlock(info, o, lhc, 1);
-
-        GOTO(out, rc);
+out_unlock:
+       mdt_object_open_unlock(info, o, lhc, ibits, rc);
 out:
-        mdt_object_put(env, o);
-        if (parent != NULL)
-                mdt_object_put(env, parent);
-        return rc;
-}
-
-int mdt_pin(struct mdt_thread_info* info)
-{
-        ENTRY;
-        RETURN(err_serious(-EOPNOTSUPP));
+       mdt_object_put(env, o);
+       if (parent != NULL)
+               mdt_object_put(env, parent);
+       return rc;
 }
 
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
-int mdt_cross_open(struct mdt_thread_info* info,
-                   const struct lu_fid *fid,
-                   struct ldlm_reply *rep, __u32 flags)
+static int mdt_cross_open(struct mdt_thread_info *info,
+                         const struct lu_fid *parent_fid,
+                         const struct lu_fid *fid,
+                         struct ldlm_reply *rep, __u32 flags)
 {
         struct md_attr    *ma = &info->mti_attr;
         struct mdt_object *o;
@@ -1229,35 +1545,47 @@ int mdt_cross_open(struct mdt_thread_info* info,
         if (IS_ERR(o))
                 RETURN(rc = PTR_ERR(o));
 
-        rc = mdt_object_exists(o);
-        if (rc > 0) {
-                /* Do permission check for cross-open. */
-                rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
-                                   NULL, flags | MDS_OPEN_CROSS);
-                if (rc)
-                        goto out;
-
-                mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
-               rc = mdt_attr_get_complex(info, o, ma);
-                if (rc == 0)
-                        rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
-        } else if (rc == 0) {
-                /*
-                 * Something is wrong here. lookup was positive but there is
-                 * no object!
-                 */
-                CERROR("Cross-ref object doesn't exist!\n");
+       if (mdt_object_remote(o)) {
+               /* Something is wrong here, the object is on another MDS! */
+               CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+               LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+                               &o->mot_obj,
+                               "Object isn't on this server! FLD error?\n");
                 rc = -EFAULT;
-        } else  {
-                /* Something is wrong here, the object is on another MDS! */
-                CERROR("The object isn't on this server! FLD error?\n");
-                LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
-                                &o->mot_obj.mo_lu,
-                                "Object isn't on this server! FLD error?\n");
+       } else {
+               if (mdt_object_exists(o)) {
+                       /* Do permission check for cross-open. */
+                       rc = mo_permission(info->mti_env, NULL,
+                                          mdt_object_child(o),
+                                          NULL, flags | MDS_OPEN_CROSS);
+                       if (rc)
+                               goto out;
+
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
+                       mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
+                       rc = mdt_attr_get_complex(info, o, ma);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-                rc = -EFAULT;
-        }
+                       /* Do not create lov object if the fid is opened
+                        * under OBF */
+                       if (S_ISREG(ma->ma_attr.la_mode) &&
+                           !(ma->ma_valid & MA_LOV) && (flags & FMODE_WRITE) &&
+                           fid_is_obf(parent_fid))
+                               GOTO(out, rc = -EPERM);
 
+                       rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
+               } else {
+                       /*
+                        * Something is wrong here. lookup was positive but
+                        * there is no object!
+                        */
+                       CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
+                             mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+                       rc = -EFAULT;
+               }
+        }
 out:
         mdt_object_put(info->mti_env, o);
         RETURN(rc);
@@ -1275,8 +1603,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
         struct md_attr          *ma = &info->mti_attr;
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
+       __u64                    ibits = 0;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct lu_name          *lname;
         int                      result, rc;
         int                      created = 0;
         __u32                    msg_flags;
@@ -1288,13 +1616,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        mdt_counter_incr(req, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
         ma->ma_need = MA_INODE;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
         ma->ma_valid = 0;
 
         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
@@ -1310,13 +1632,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
             info->mti_spec.u.sp_ea.eadata == NULL)
                 GOTO(out, result = err_serious(-EINVAL));
 
-        CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
-               "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
-               PFID(rr->rr_fid1), rr->rr_name,
-               PFID(rr->rr_fid2), create_flags,
-               ma->ma_attr.la_mode, msg_flags);
-
-       if (req_is_replay(req) ||
+       CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
+              "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
+              PFID(rr->rr_fid1), PNAME(&rr->rr_name),
+              PFID(rr->rr_fid2), create_flags,
+              ma->ma_attr.la_mode, msg_flags);
+
+       if (info->mti_cross_ref) {
+               /* This is cross-ref open */
+               mdt_set_disposition(info, ldlm_rep,
+                           (DISP_IT_EXECD | DISP_LOOKUP_EXECD |
+                            DISP_LOOKUP_POS));
+               result = mdt_cross_open(info, rr->rr_fid2, rr->rr_fid1,
+                                       ldlm_rep, create_flags);
+               GOTO(out, result);
+       } else if (req_is_replay(req) ||
            (req->rq_export->exp_libclient && create_flags & MDS_OPEN_HAS_EA)) {
                /* This is a replay request or from liblustre with ea. */
                result = mdt_open_by_fid(info, ldlm_rep);
@@ -1335,15 +1665,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        GOTO(out, result = -EFAULT);
                }
                CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
-       } else if ((rr->rr_namelen == 0 && !info->mti_cross_ref &&
-                   create_flags & MDS_OPEN_LOCK) ||
-                  (create_flags & MDS_OPEN_BY_FID)) {
+       } else if (create_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) {
+               /*
+                * MDS_OPEN_LOCK is checked for backward compatibility with 2.1
+                * client.
+                */
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
-               if (result != -ENOENT && !(create_flags & MDS_OPEN_CREAT))
-                       GOTO(out, result);
-               if (unlikely(rr->rr_namelen == 0))
-                       GOTO(out, result = -EINVAL);
-               CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
+               if (result < 0)
+                       CDEBUG(D_INFO, "no object for "DFID": %d\n",
+                              PFID(rr->rr_fid2), result);
+               GOTO(out, result);
        }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
@@ -1352,17 +1683,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_disposition(info, ldlm_rep,
                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
-        if (info->mti_cross_ref) {
-                /* This is cross-ref open */
-                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-                result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
-                                        create_flags);
-                GOTO(out, result);
-        }
+       if (!lu_name_is_valid(&rr->rr_name))
+               GOTO(out, result = -EPROTO);
 
         lh = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
-                          LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
+       mdt_lock_pdo_init(lh,
+                         (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
+                         &rr->rr_name);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
@@ -1376,12 +1703,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         fid_zero(child_fid);
 
-        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                            lname, child_fid, &info->mti_spec);
-        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
-                 "looking for "DFID"/%s, result fid="DFID"\n",
-                 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
+       result = mdo_lookup(info->mti_env, mdt_object_child(parent),
+                           &rr->rr_name, child_fid, &info->mti_spec);
+
+       LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
+                "looking for "DFID"/"DNAME", found FID = "DFID"\n",
+                PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
+                PFID(child_fid));
 
         if (result != 0 && result != -ENOENT && result != -ESTALE)
                 GOTO(out_parent, result);
@@ -1398,6 +1726,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 }
                 if (!(create_flags & MDS_OPEN_CREAT))
                         GOTO(out_parent, result);
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+                       GOTO(out_parent, result = -EROFS);
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                          PFID(child_fid));
@@ -1423,15 +1753,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
-                if (mdt_object_obf(parent))
-                        GOTO(out_child, result = -EPERM);
+               /* Create under OBF and .lustre is not permitted */
+               if (!fid_is_md_operative(rr->rr_fid1))
+                       GOTO(out_child, result = -EPERM);
 
-                /* save versions in reply */
-                mdt_version_get_save(info, parent, 0);
-                mdt_version_get_save(info, child, 1);
+               /* save versions in reply */
+               mdt_version_get_save(info, parent, 0);
+               mdt_version_get_save(info, child, 1);
 
-                /* version of child will be changed */
-                info->mti_mos = child;
+               /* version of child will be changed */
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
@@ -1447,17 +1778,17 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
-                result = mdo_create(info->mti_env,
-                                    mdt_object_child(parent),
-                                    lname,
-                                    mdt_object_child(child),
-                                    &info->mti_spec,
-                                    &info->mti_attr);
+               result = mdo_create(info->mti_env,
+                                   mdt_object_child(parent),
+                                   &rr->rr_name,
+                                   mdt_object_child(child),
+                                   &info->mti_spec,
+                                   &info->mti_attr);
                 if (result == -ERESTART) {
                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         GOTO(out_child, result);
                 } else {
-
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
                        /* XXX: we should call this once, see few lines below */
                        if (result == 0)
                                result = mdt_attr_get_complex(info, child, ma);
@@ -1467,12 +1798,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 }
                 created = 1;
         } else {
-                /* We have to get attr & lov ea for this object */
-               result = mdt_attr_get_complex(info, child, ma);
                 /*
                  * The object is on remote node, return its FID for remote open.
                  */
-                if (result == -EREMOTE) {
+               if (mdt_object_remote(child)) {
                         /*
                          * Check if this lock already was sent to client and
                          * this is resent case. For resent case do not take lock
@@ -1480,22 +1809,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                          */
                         LASSERT(lhc != NULL);
 
-                        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                                struct ldlm_lock *lock;
-
-                                LASSERT(msg_flags & MSG_RESENT);
-
-                                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                                if (!lock) {
-                                        CERROR("Invalid lock handle "LPX64"\n",
-                                               lhc->mlh_reg_lh.cookie);
-                                        LBUG();
-                                }
-                                LASSERT(fid_res_name_eq(mdt_object_fid(child),
-                                                        &lock->l_resource->lr_name));
-                                LDLM_LOCK_PUT(lock);
-                                rc = 0;
-                        } else {
+                       rc = mdt_check_resent_lock(info, child, lhc);
+                       if (rc < 0) {
+                               GOTO(out_child, result = rc);
+                       } else if (rc > 0) {
                                 mdt_lock_handle_init(lhc);
                                 mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -1503,71 +1820,349 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                                      MDS_INODELOCK_LOOKUP,
                                                      MDT_CROSS_LOCK);
                         }
-                        repbody->fid1 = *mdt_object_fid(child);
-                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                       repbody->mbo_fid1 = *mdt_object_fid(child);
+                       repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                         if (rc != 0)
                                 result = rc;
+                       else
+                               result = -EREMOTE;
                         GOTO(out_child, result);
-                }
+               } else if (mdt_object_exists(child)) {
+                       /* We have to get attr & LOV EA & HSM for this
+                        * object. */
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
+                       ma->ma_need |= MA_HSM;
+                       result = mdt_attr_get_complex(info, child, ma);
+               } else {
+                       /* Object does not exist. Likely FS corruption. */
+                       CERROR("%s: name '"DNAME"' present, but FID "
+                              DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
+                              PNAME(&rr->rr_name), PFID(child_fid));
+                       GOTO(out_child, result = -EIO);
+               }
         }
 
-        LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
-
-        /* get openlock if this is not replay and if a client requested it */
-        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
-                ldlm_mode_t lm;
-
-                if (create_flags & FMODE_WRITE)
-                        lm = LCK_CW;
-                else if (create_flags & MDS_FMODE_EXEC)
-                        lm = LCK_PR;
-                else
-                        lm = LCK_CR;
-                mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, lm);
-                rc = mdt_object_lock(info, child, lhc,
-                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN,
-                                     MDT_CROSS_LOCK);
-                if (rc) {
-                        result = rc;
-                        GOTO(out_child, result);
-                } else {
-                        result = -EREMOTE;
-                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-                }
-        }
+       rc = mdt_check_resent_lock(info, child, lhc);
+       if (rc < 0) {
+               GOTO(out_child, result = rc);
+       } else if (rc == 0) {
+               /* the open lock might already be gotten in
+                * ldlm_handle_enqueue() */
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               if (create_flags & MDS_OPEN_LOCK)
+                       mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+       } else {
+               /* get openlock if this isn't replay and client requested it */
+               if (!req_is_replay(req)) {
+                       rc = mdt_object_open_lock(info, child, lhc, &ibits);
+                       if (rc != 0)
+                               GOTO(out_child_unlock, result = rc);
+                       else if (create_flags & MDS_OPEN_LOCK)
+                               mdt_set_disposition(info, ldlm_rep,
+                                                   DISP_OPEN_LOCK);
+               }
+       }
+       /* Try to open it now. */
+       rc = mdt_finish_open(info, parent, child, create_flags,
+                            created, ldlm_rep);
+       if (rc) {
+               result = rc;
+               /* openlock will be released if mdt_finish_open failed */
+               mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+
+               if (created && create_flags & MDS_OPEN_VOLATILE) {
+                       CERROR("%s: cannot open volatile file "DFID", orphan "
+                              "file will be left in PENDING directory until "
+                              "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                              PFID(mdt_object_fid(child)), rc);
+                       GOTO(out_child_unlock, result);
+               }
 
-        /* Try to open it now. */
-        rc = mdt_finish_open(info, parent, child, create_flags,
-                             created, ldlm_rep);
-        if (rc) {
-                result = rc;
-                if (lustre_handle_is_used(&lhc->mlh_reg_lh))
-                        /* openlock was acquired and mdt_finish_open failed -
-                           drop the openlock */
-                        mdt_object_unlock(info, child, lhc, 1);
-                if (created) {
-                        ma->ma_need = 0;
-                        ma->ma_valid = 0;
-                        ma->ma_cookie_size = 0;
-                        rc = mdo_unlink(info->mti_env,
-                                        mdt_object_child(parent),
-                                        mdt_object_child(child),
-                                        lname,
-                                        &info->mti_attr);
-                        if (rc != 0)
-                                CERROR("Error in cleanup of open\n");
-                }
-        }
-        EXIT;
+               if (created) {
+                       ma->ma_need = 0;
+                       ma->ma_valid = 0;
+                       rc = mdo_unlink(info->mti_env,
+                                       mdt_object_child(parent),
+                                       mdt_object_child(child),
+                                       &rr->rr_name,
+                                       &info->mti_attr, 0);
+                       if (rc != 0)
+                               CERROR("%s: "DFID" cleanup of open: rc = %d\n",
+                                      mdt_obd_name(info->mti_mdt),
+                                      PFID(mdt_object_fid(child)), rc);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
+               }
+       }
+       EXIT;
+out_child_unlock:
+       mdt_object_open_unlock(info, child, lhc, ibits, result);
 out_child:
-        mdt_object_put(info->mti_env, child);
+       mdt_object_put(info->mti_env, child);
 out_parent:
-        mdt_object_unlock_put(info, parent, lh, result || !created);
+       mdt_object_unlock_put(info, parent, lh, result || !created);
+out:
+       if (result)
+               lustre_msg_set_transno(req->rq_repmsg, 0);
+       return result;
+}
+
+/**
+ * Create an orphan object use local root.
+ */
+static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
+                                         struct mdt_device *mdt,
+                                         const struct lu_fid *fid,
+                                         struct md_attr *attr, fmode_t fmode)
+{
+       const struct lu_env *env = info->mti_env;
+       struct md_op_spec *spec = &info->mti_spec;
+       struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
+       struct mdt_object *obj = NULL;
+       struct mdt_object *local_root;
+       static const struct lu_name lname = {
+               .ln_name = "i_am_nobody",
+               .ln_namelen = sizeof("i_am_nobody") - 1,
+       };
+       struct lu_ucred *uc;
+       cfs_cap_t uc_cap_save;
+       int rc;
+       ENTRY;
+
+       rc = dt_root_get(env, mdt->mdt_bottom, local_root_fid);
+       if (rc != 0)
+               RETURN(ERR_PTR(rc));
+
+       local_root = mdt_object_find(env, mdt, local_root_fid);
+       if (IS_ERR(local_root))
+               RETURN(local_root);
+
+       obj = mdt_object_new(env, mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       spec->sp_cr_lookup = 0;
+       spec->sp_feat = &dt_directory_features;
+       spec->sp_cr_mode = MDL_MINMODE; /* no lock */
+       spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
+       if (attr->ma_valid & MA_LOV) {
+               spec->u.sp_ea.eadata = attr->ma_lmm;
+               spec->u.sp_ea.eadatalen = attr->ma_lmm_size;
+               spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+       } else {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+       }
+
+       uc = lu_ucred(env);
+       uc_cap_save = uc->uc_cap;
+       uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
+       rc = mdo_create(env, mdt_object_child(local_root), &lname,
+                       mdt_object_child(obj), spec, attr);
+       uc->uc_cap = uc_cap_save;
+       if (rc < 0) {
+               CERROR("%s: cannot create volatile file "DFID": rc = %d\n",
+                      mdt_obd_name(mdt), PFID(fid), rc);
+               GOTO(out, rc);
+       }
+
+       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+       if (rc < 0)
+               CERROR("%s: cannot open volatile file "DFID", orphan "
+                      "file will be left in PENDING directory until "
+                      "next reboot, rc = %d\n", mdt_obd_name(mdt),
+                      PFID(fid), rc);
+       GOTO(out, rc);
+
 out:
-        if (result && result != -EREMOTE)
-                lustre_msg_set_transno(req->rq_repmsg, 0);
-        return result;
+       if (rc < 0) {
+               if (!IS_ERR(obj))
+                       mdt_object_put(env, obj);
+               obj = ERR_PTR(rc);
+       }
+       mdt_object_put(env, local_root);
+       return obj;
+}
+
+static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct md_attr *ma)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LAYOUT];
+       struct close_data      *data;
+       struct ldlm_lock       *lease;
+       struct mdt_object      *orphan;
+       struct md_attr         *orp_ma;
+       struct lu_buf          *buf;
+       bool                    lease_broken;
+       int                     rc;
+       int                     rc2;
+       ENTRY;
+
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
+       data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+       if (data == NULL)
+               RETURN(-EPROTO);
+
+       lease = ldlm_handle2lock(&data->cd_handle);
+       if (lease == NULL)
+               RETURN(-ESTALE);
+
+       /* try to hold open_sem so that nobody else can open the file */
+       if (!down_write_trylock(&o->mot_open_sem)) {
+               ldlm_lock_cancel(lease);
+               GOTO(out_reprocess, rc = -EBUSY);
+       }
+
+       /* Check if the lease open lease has already canceled */
+       lock_res_and_lock(lease);
+       lease_broken = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+
+       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+                  PFID(mdt_object_fid(o)), lease_broken);
+
+       /* Cancel server side lease. Client side counterpart should
+        * have been cancelled. It's okay to cancel it now as we've
+        * held mot_open_sem. */
+       ldlm_lock_cancel(lease);
+
+       if (lease_broken) /* don't perform release task */
+               GOTO(out_unlock, rc = -ESTALE);
+
+       if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
+               GOTO(out_unlock, rc = -EINVAL);
+
+       /* ma_need was set before but it seems fine to change it in order to
+        * avoid modifying the one from RPC */
+       ma->ma_need = MA_HSM;
+       rc = mdt_attr_get_complex(info, o, ma);
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
+       if (!mdt_hsm_release_allow(ma))
+               GOTO(out_unlock, rc = -EPERM);
+
+       /* already released? */
+       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+               GOTO(out_unlock, rc = 0);
+
+       /* Compare on-disk and packed data_version */
+       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+               CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
+                      " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
+                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
+               GOTO(out_unlock, rc = -EPERM);
+       }
+
+       ma->ma_valid = MA_INODE;
+       ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_SIZE;
+       rc = mo_attr_set(info->mti_env, mdt_object_child(o), ma);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       ma->ma_need = MA_INODE | MA_LOV;
+       rc = mdt_attr_get_complex(info, o, ma);
+       if (rc < 0)
+               GOTO(out_unlock, rc);
+
+       if (!(ma->ma_valid & MA_LOV)) {
+               /* Even empty file are released */
+               memset(ma->ma_lmm, 0, sizeof(*ma->ma_lmm));
+               ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               ma->ma_lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+               ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE);
+               ma->ma_lmm_size = sizeof(*ma->ma_lmm);
+       } else {
+               /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret
+                * ma_lmm as lov_user_md, then it will be confused by union of
+                * layout_gen and stripe_offset. */
+               if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF);
+               else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3)
+                       ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF);
+               else
+                       GOTO(out_unlock, rc = -EINVAL);
+       }
+
+       /* Set file as released */
+       ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED);
+
+       /* Hopefully it's not used in this call path */
+       orp_ma = &info->mti_u.som.attr;
+       orp_ma->ma_attr.la_mode = S_IFREG | S_IWUSR;
+       orp_ma->ma_attr.la_uid = ma->ma_attr.la_uid;
+       orp_ma->ma_attr.la_gid = ma->ma_attr.la_gid;
+       orp_ma->ma_attr.la_valid = LA_MODE | LA_UID | LA_GID;
+       orp_ma->ma_lmm = ma->ma_lmm;
+       orp_ma->ma_lmm_size = ma->ma_lmm_size;
+       orp_ma->ma_valid = MA_INODE | MA_LOV;
+       orphan = mdt_orphan_open(info, info->mti_mdt, &data->cd_fid, orp_ma,
+                                FMODE_WRITE);
+       if (IS_ERR(orphan)) {
+               CERROR("%s: cannot open orphan file "DFID": rc = %ld\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid),
+                      PTR_ERR(orphan));
+               GOTO(out_unlock, rc = PTR_ERR(orphan));
+       }
+
+       /* Set up HSM attribute for orphan object */
+       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       buf = &info->mti_buf;
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(struct hsm_attrs);
+       ma->ma_hsm.mh_flags |= HS_RELEASED;
+       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+       ma->ma_hsm.mh_flags &= ~HS_RELEASED;
+
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
+                            MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
+       rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
+                         XATTR_NAME_HSM, 0);
+
+       if (rc == 0)
+               /* Swap layout with orphan object */
+               rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
+                                    mdt_object_child(orphan),
+                                    SWAP_LAYOUTS_MDS_HSM);
+
+       /* Release exclusive LL */
+       mdt_object_unlock(info, o, lh, 1);
+
+       EXIT;
+
+out_close:
+       /* Close orphan object anyway */
+       rc2 = mo_close(info->mti_env, mdt_object_child(orphan), orp_ma,
+                      FMODE_WRITE);
+       if (rc2 < 0)
+               CERROR("%s: error closing volatile file "DFID": rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
+       LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
+                       "object closed\n");
+       mdt_object_put(info->mti_env, orphan);
+
+out_unlock:
+       up_write(&o->mot_open_sem);
+
+       if (rc == 0) { /* already released */
+               struct mdt_body *repbody;
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->mbo_valid |= OBD_MD_FLRELEASED;
+       }
+
+out_reprocess:
+       ldlm_reprocess_all(lease->l_resource);
+       LDLM_LOCK_PUT(lease);
+
+       ma->ma_valid = 0;
+       ma->ma_need = 0;
+
+       return rc;
 }
 
 #define MFD_CLOSED(mode) (((mode) & ~(MDS_FMODE_EPOCH | MDS_FMODE_SOM | \
@@ -1585,11 +2180,21 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
         struct md_attr *ma = &info->mti_attr;
         int ret = MDT_IOEPOCH_CLOSED;
         int rc = 0;
-        int mode;
+       __u64 mode;
         ENTRY;
 
         mode = mfd->mfd_mode;
 
+       if (ma->ma_attr_flags & MDS_HSM_RELEASE) {
+               rc = mdt_hsm_release(info, o, ma);
+               if (rc < 0) {
+                       CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
+                               mdt_obd_name(info->mti_mdt),
+                               PFID(mdt_object_fid(o)), rc);
+                       /* continue to close even error occurred. */
+               }
+       }
+
         if ((mode & FMODE_WRITE) || (mode & MDS_FMODE_TRUNC)) {
                 mdt_write_put(o);
                 ret = mdt_ioepoch_close(info, o);
@@ -1610,6 +2215,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                 rc = mo_attr_set(info->mti_env, next, ma);
         }
 
+       /* If file data is modified, add the dirty flag. */
+       if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
+               rc = mdt_add_dirty_flag(info, o, ma);
+
         ma->ma_need |= MA_INODE;
         ma->ma_valid &= ~MA_INODE;
 
@@ -1640,42 +2249,54 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                                "needed on "DFID"\n", PFID(mdt_object_fid(o)));
                 }
         } else {
-                mdt_mfd_free(mfd);
-                mdt_object_put(info->mti_env, o);
-        }
+               /* adjust open and lease count */
+               if (mode & MDS_OPEN_LEASE) {
+                       LASSERT(atomic_read(&o->mot_lease_count) > 0);
+                       atomic_dec(&o->mot_lease_count);
+               }
+               LASSERT(atomic_read(&o->mot_open_count) > 0);
+               atomic_dec(&o->mot_open_count);
+
+               mdt_mfd_free(mfd);
+               mdt_object_put(info->mti_env, o);
+       }
 
-        RETURN(rc ? rc : ret);
+       RETURN(rc ? rc : ret);
 }
 
-int mdt_close(struct mdt_thread_info *info)
+int mdt_close(struct tgt_session_info *tsi)
 {
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
         struct mdt_object      *o;
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
-        struct ptlrpc_request  *req = mdt_info_req(info);
         int rc, ret = 0;
         ENTRY;
 
        mdt_counter_incr(req, LPROC_MDT_CLOSE);
-        /* Close may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Close may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
         LASSERT(info->mti_ioepoch);
 
+       /* These fields are no longer used and are left for compatibility.
+        * size is always zero */
         req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
-                             info->mti_mdt->mdt_max_mdsize);
+                            0);
         req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                             info->mti_mdt->mdt_max_cookiesize);
+                            0);
         rc = req_capsule_server_pack(info->mti_pill);
         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
                 mdt_client_compatibility(info);
                 if (rc == 0)
                         mdt_fix_reply(info);
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+               mdt_exit_ucred(info);
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
         }
 
         /* Continue to close handle even if we can not pack reply */
@@ -1687,21 +2308,17 @@ int mdt_close(struct mdt_thread_info *info)
                 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
                                                        &RMF_MDT_MD,
                                                        RCL_SERVER);
-                ma->ma_cookie = req_capsule_server_get(info->mti_pill,
-                                                       &RMF_LOGCOOKIES);
-                ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
-                                                          &RMF_LOGCOOKIES,
-                                                          RCL_SERVER);
                 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
-                repbody->eadatasize = 0;
-                repbody->aclsize = 0;
+               repbody->mbo_eadatasize = 0;
+               repbody->mbo_aclsize = 0;
         } else {
                 rc = err_serious(rc);
         }
 
         med = &req->rq_export->exp_mdt_data;
        spin_lock(&med->med_open_lock);
-       mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+       mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
+                            req_is_replay(req));
        if (mdt_mfd_closed(mfd)) {
                spin_unlock(&med->med_open_lock);
                CDEBUG(D_INODE, "no handle for file close: fid = "DFID
@@ -1728,13 +2345,16 @@ int mdt_close(struct mdt_thread_info *info)
                 rc = mdt_fix_reply(info);
         }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
-                RETURN(err_serious(-ENOMEM));
+       mdt_exit_ucred(info);
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
+               GOTO(out, rc = err_serious(-ENOMEM));
 
-        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
-                                 OBD_FAIL_MDS_CLOSE_NET_REP))
-                info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
-        RETURN(rc ? rc : ret);
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
+                                OBD_FAIL_MDS_CLOSE_NET_REP))
+               tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
+out:
+       mdt_thread_info_fini(info);
+       RETURN(rc ? rc : ret);
 }
 
 /**
@@ -1745,35 +2365,38 @@ int mdt_close(struct mdt_thread_info *info)
  * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
  * skip attributes and reconstruct the reply here.
  */
-int mdt_done_writing(struct mdt_thread_info *info)
+int mdt_done_writing(struct tgt_session_info *tsi)
 {
-        struct ptlrpc_request   *req = mdt_info_req(info);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
         int rc;
         ENTRY;
 
-        rc = req_capsule_server_pack(info->mti_pill);
-        if (rc)
-                RETURN(err_serious(rc));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
-        repbody = req_capsule_server_get(info->mti_pill,
-                                         &RMF_MDT_BODY);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       repbody->mbo_eadatasize = 0;
+       repbody->mbo_aclsize = 0;
 
-        /* Done Writing may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Done Writing may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
-        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+       if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
+               mdt_exit_ucred(info);
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
+       }
 
-        med = &info->mti_exp->exp_mdt_data;
+       med = &info->mti_exp->exp_mdt_data;
        spin_lock(&med->med_open_lock);
-       mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+       mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
+                            req_is_replay(req));
        if (mfd == NULL) {
                spin_unlock(&med->med_open_lock);
                 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
@@ -1786,9 +2409,9 @@ int mdt_done_writing(struct mdt_thread_info *info)
                         rc = info->mti_ioepoch->flags & MF_SOM_AU ?
                              -EAGAIN : 0;
                         mdt_empty_transno(info, rc);
-                        RETURN(rc);
-                }
-                RETURN(-ESTALE);
+               } else
+                       rc = -ESTALE;
+               GOTO(error_ucred, rc);
         }
 
         LASSERT(mfd->mfd_mode == MDS_FMODE_EPOCH ||
@@ -1803,12 +2426,16 @@ int mdt_done_writing(struct mdt_thread_info *info)
 
         info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
         OBD_ALLOC_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
-        if (info->mti_attr.ma_lmm == NULL)
-                RETURN(-ENOMEM);
+       if (info->mti_attr.ma_lmm == NULL)
+               GOTO(error_ucred, rc = -ENOMEM);
 
         rc = mdt_mfd_close(info, mfd);
 
         OBD_FREE_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
         mdt_empty_transno(info, rc);
-        RETURN(rc);
+error_ucred:
+       mdt_exit_ucred(info);
+out:
+       mdt_thread_info_fini(info);
+       RETURN(rc);
 }