Whamcloud - gitweb
LU-8150 mdt: Track open+create as mknod
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index ae97398..b305b57 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -206,6 +206,19 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
         if (lustre_msg_get_transno(req->rq_repmsg) != 0)
                 RETURN_EXIT;
 
+       if (tgt_is_multimodrpcs_client(req->rq_export)) {
+               struct thandle         *th;
+
+               /* generate an empty transaction to get a transno
+                * and reply data */
+               th = dt_trans_create(info->mti_env, mdt->mdt_bottom);
+               if (!IS_ERR(th)) {
+                       rc = dt_trans_start(info->mti_env, mdt->mdt_bottom, th);
+                       dt_trans_stop(info->mti_env, mdt->mdt_bottom, th);
+               }
+               RETURN_EXIT;
+       }
+
        spin_lock(&mdt->mdt_lut.lut_translock);
        if (rc != 0) {
                if (info->mti_transno != 0) {
@@ -524,22 +537,17 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-ENOENT);
        }
 
-        if (exp_connect_rmtclient(exp)) {
-                void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
-
-                rc = mdt_pack_remote_perm(info, o, buf);
-                if (rc) {
-                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = 0;
-               } else {
-                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
-                }
-        }
 #ifdef CONFIG_FS_POSIX_ACL
-       else if (exp_connect_flags(exp) & OBD_CONNECT_ACL)
-               rc = mdt_pack_acl2body(info, repbody, o,
-                                      exp->exp_target_data.ted_nodemap);
+       if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
+               struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+
+               rc = mdt_pack_acl2body(info, repbody, o, nodemap);
+               nodemap_putref(nodemap);
+               if (rc)
+                       RETURN(rc);
+       }
 #endif
 
         /*
@@ -614,8 +622,6 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_device       *mdt  = info->mti_mdt;
         struct req_capsule      *pill = info->mti_pill;
         struct ptlrpc_request   *req  = mdt_info_req(info);
-        struct tg_export_data   *ted  = &req->rq_export->exp_target_data;
-        struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
        __u64                   flags = info->mti_spec.sp_cr_flags;
@@ -624,17 +630,18 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_object       *child;
         struct mdt_body         *repbody;
         int                      rc;
-        ENTRY;
+       __u64                    opdata;
+       ENTRY;
 
         LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
        ma->ma_need = MA_INODE | MA_HSM;
-        ma->ma_valid = 0;
+       ma->ma_valid = 0;
 
-        mdt_req_from_lcd(req, lcd);
-        mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
+       opdata = mdt_req_from_lrd(req, info->mti_reply_data);
+       mdt_set_disposition(info, ldlm_rep, opdata);
 
         CDEBUG(D_INODE, "This is reconstruct open: disp="LPX64", result=%d\n",
                ldlm_rep->lock_policy_res1, req->rq_status);
@@ -767,13 +774,13 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                                struct mdt_lock_handle *lhc,
                                __u64 *ibits)
 {
-       struct md_attr  *ma = &info->mti_attr;
-       __u64            open_flags = info->mti_spec.sp_cr_flags;
-       ldlm_mode_t      lm = LCK_CR;
-       bool             acq_lease = !!(open_flags & MDS_OPEN_LEASE);
-       bool             try_layout = false;
-       bool             create_layout = false;
-       int              rc = 0;
+       struct md_attr *ma = &info->mti_attr;
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       enum ldlm_mode lm = LCK_CR;
+       bool acq_lease = !!(open_flags & MDS_OPEN_LEASE);
+       bool try_layout = false;
+       bool create_layout = false;
+       int rc = 0;
        ENTRY;
 
        *ibits = 0;
@@ -857,15 +864,13 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                 * However this is a double-edged sword because changing
                 * permission will revoke huge # of LOOKUP locks. */
                *ibits |= MDS_INODELOCK_LAYOUT | MDS_INODELOCK_LOOKUP;
-               if (!mdt_object_lock_try(info, obj, lhc, *ibits,
-                                        MDT_CROSS_LOCK)) {
+               if (!mdt_object_lock_try(info, obj, lhc, *ibits)) {
                        *ibits &= ~(MDS_INODELOCK_LAYOUT|MDS_INODELOCK_LOOKUP);
                        if (*ibits != 0)
-                               rc = mdt_object_lock(info, obj, lhc, *ibits,
-                                               MDT_CROSS_LOCK);
+                               rc = mdt_object_lock(info, obj, lhc, *ibits);
                }
        } else if (*ibits != 0) {
-               rc = mdt_object_lock(info, obj, lhc, *ibits, MDT_CROSS_LOCK);
+               rc = mdt_object_lock(info, obj, lhc, *ibits);
        }
 
        CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
@@ -892,8 +897,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                LASSERT(!try_layout);
                mdt_lock_handle_init(ll);
                mdt_lock_reg_init(ll, LCK_EX);
-               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
-                                       MDT_LOCAL_LOCK);
+               rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT);
 
                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
        }
@@ -1018,7 +1022,7 @@ static int mdt_open_by_fid_lock(struct mdt_thread_info *info,
         struct mdt_object       *parent= NULL;
         struct mdt_object       *o;
         int                      rc;
-       int                      object_locked = 0;
+       bool                     object_locked = false;
        __u64                    ibits = 0;
         ENTRY;
 
@@ -1078,7 +1082,7 @@ static int mdt_open_by_fid_lock(struct mdt_thread_info *info,
                GOTO(out, rc);
        } else if (rc > 0) {
                rc = mdt_object_open_lock(info, o, lhc, &ibits);
-               object_locked = 1;
+               object_locked = true;
                if (rc)
                        GOTO(out_unlock, rc);
        }
@@ -1136,7 +1140,7 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                       mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
                LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
                                &o->mot_obj,
-                               "Object isn't on this server! FLD error?\n");
+                               "Object isn't on this server! FLD error?");
                 rc = -EFAULT;
        } else {
                if (mdt_object_exists(o)) {
@@ -1210,6 +1214,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
             info->mti_spec.u.sp_ea.eadata == NULL)
                 GOTO(out, result = err_serious(-EINVAL));
 
+       if (create_flags & FMODE_WRITE &&
+           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+               GOTO(out, result = -EROFS);
+
        CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
               "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
               PFID(rr->rr_fid1), PNAME(&rr->rr_name),
@@ -1264,15 +1272,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        if (!lu_name_is_valid(&rr->rr_name))
                GOTO(out, result = -EPROTO);
 
+again:
         lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh,
                          (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
                          &rr->rr_name);
 
-        parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
-                                      MDS_INODELOCK_UPDATE);
-        if (IS_ERR(parent))
-                GOTO(out, result = PTR_ERR(parent));
+       parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
+       if (IS_ERR(parent))
+               GOTO(out, result = PTR_ERR(parent));
+
+       result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+       if (result != 0) {
+               mdt_object_put(info->mti_env, parent);
+               GOTO(out, result);
+       }
 
         /* get and check version of parent */
         result = mdt_version_get_check(info, parent, 0);
@@ -1292,16 +1306,45 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (result != 0 && result != -ENOENT && result != -ESTALE)
                 GOTO(out_parent, result);
 
-        if (result == -ENOENT || result == -ESTALE) {
-                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
-                if (result == -ESTALE) {
-                        /*
-                         * -ESTALE means the parent is a dead(unlinked) dir, so
-                         * it should return -ENOENT to in accordance with the
-                         * original mds implementaion.
-                         */
-                        GOTO(out_parent, result = -ENOENT);
-                }
+       if (result == -ENOENT || result == -ESTALE) {
+               /* If the object is dead, let's check if the object
+                * is being migrated to a new object */
+               if (result == -ESTALE) {
+                       struct lu_buf lmv_buf;
+
+                       lmv_buf.lb_buf = info->mti_xattr_buf;
+                       lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
+                       rc = mo_xattr_get(info->mti_env,
+                                         mdt_object_child(parent),
+                                         &lmv_buf, XATTR_NAME_LMV);
+                       if (rc > 0) {
+                               struct lmv_mds_md_v1 *lmv;
+
+                               lmv = lmv_buf.lb_buf;
+                               if (le32_to_cpu(lmv->lmv_hash_type) &
+                                               LMV_HASH_FLAG_MIGRATION) {
+                                       /* Get the new parent FID and retry */
+                                       mdt_object_unlock_put(info, parent,
+                                                             lh, 1);
+                                       mdt_lock_handle_init(lh);
+                                       fid_le_to_cpu(
+                                               (struct lu_fid *)rr->rr_fid1,
+                                               &lmv->lmv_stripe_fids[1]);
+                                       goto again;
+                               }
+                       }
+               }
+
+               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+               if (result == -ESTALE) {
+                       /*
+                        * -ESTALE means the parent is a dead(unlinked) dir, so
+                        * it should return -ENOENT to in accordance with the
+                        * original mds implementaion.
+                        */
+                       GOTO(out_parent, result = -ENOENT);
+               }
+
                 if (!(create_flags & MDS_OPEN_CREAT))
                         GOTO(out_parent, result);
                if (exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
@@ -1373,7 +1416,8 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                         if (result != 0)
                                 GOTO(out_child, result);
                 }
-                created = 1;
+               created = 1;
+               mdt_counter_incr(req, LPROC_MDT_MKNOD);
         } else {
                 /*
                  * The object is on remote node, return its FID for remote open.
@@ -1390,13 +1434,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        if (rc < 0) {
                                GOTO(out_child, result = rc);
                        } else if (rc > 0) {
-                                mdt_lock_handle_init(lhc);
-                                mdt_lock_reg_init(lhc, LCK_PR);
+                               mdt_lock_handle_init(lhc);
+                               mdt_lock_reg_init(lhc, LCK_PR);
 
-                                rc = mdt_object_lock(info, child, lhc,
-                                                     MDS_INODELOCK_LOOKUP,
-                                                     MDT_CROSS_LOCK);
-                        }
+                               rc = mdt_object_lock(info, child, lhc,
+                                                    MDS_INODELOCK_LOOKUP);
+                       }
                        repbody->mbo_fid1 = *mdt_object_fid(child);
                        repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                         if (rc != 0)
@@ -1599,7 +1642,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        lease_broken = ldlm_is_cancel(lease);
        unlock_res_and_lock(lease);
 
-       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+       LDLM_DEBUG(lease, DFID " lease broken? %d",
                   PFID(mdt_object_fid(o)), lease_broken);
 
        /* Cancel server side lease. Client side counterpart should
@@ -1696,7 +1739,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
 
        mdt_lock_reg_init(lh, LCK_EX);
        rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
+                            MDS_INODELOCK_XATTR);
        if (rc != 0)
                GOTO(out_close, rc);
 
@@ -1722,17 +1765,19 @@ out_close:
                CERROR("%s: error closing volatile file "DFID": rc = %d\n",
                       mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
        LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
-                       "object closed\n");
+                       "object closed");
        mdt_object_put(info->mti_env, orphan);
 
 out_unlock:
        up_write(&o->mot_open_sem);
 
-       if (rc == 0) { /* already released */
+       /* already released */
+       if (rc == 0) {
                struct mdt_body *repbody;
+
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
-               repbody->mbo_valid |= OBD_MD_FLRELEASED;
+               repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
        }
 
 out_reprocess:
@@ -1745,8 +1790,143 @@ out_reprocess:
        return rc;
 }
 
-#define MFD_CLOSED(mode) ((mode) == MDS_FMODE_CLOSED)
+int mdt_close_swap_layouts(struct mdt_thread_info *info,
+                          struct mdt_object *o, struct md_attr *ma)
+{
+       struct mdt_lock_handle  *lh1 = &info->mti_lh[MDT_LH_NEW];
+       struct mdt_lock_handle  *lh2 = &info->mti_lh[MDT_LH_OLD];
+       struct close_data       *data;
+       struct ldlm_lock        *lease;
+       struct mdt_object       *o1 = o, *o2;
+       bool                     lease_broken;
+       bool                     swap_objects;
+       int                      rc;
+       ENTRY;
+
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
+       if (!S_ISREG(lu_object_attr(&o1->mot_obj)))
+               RETURN(-EINVAL);
+
+       data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+       if (data == NULL)
+               RETURN(-EPROTO);
+
+       if (fid_is_zero(&data->cd_fid) || !fid_is_sane(&data->cd_fid))
+               RETURN(-EINVAL);
+
+       rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
+       if (unlikely(rc == 0))
+               RETURN(-EINVAL);
+
+       /* Exchange o1 and o2, to enforce locking order */
+       swap_objects = (rc < 0);
+
+       lease = ldlm_handle2lock(&data->cd_handle);
+       if (lease == NULL)
+               RETURN(-ESTALE);
+
+       o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
+       if (IS_ERR(o2))
+               GOTO(out_lease, rc = PTR_ERR(o2));
+
+       if (!S_ISREG(lu_object_attr(&o2->mot_obj))) {
+               swap_objects = false; /* not swapped yet */
+               GOTO(out_obj, rc = -EINVAL);
+       }
+
+       if (swap_objects)
+               swap(o1, o2);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+                          MAY_WRITE);
+       if (rc < 0)
+               GOTO(out_obj, rc);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+                          MAY_WRITE);
+       if (rc < 0)
+               GOTO(out_obj, rc);
+
+       /* try to hold open_sem so that nobody else can open the file */
+       if (!down_write_trylock(&o->mot_open_sem)) {
+               ldlm_lock_cancel(lease);
+               GOTO(out_obj, rc = -EBUSY);
+       }
+
+       /* Check if the lease open lease has already canceled */
+       lock_res_and_lock(lease);
+       lease_broken = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+
+       LDLM_DEBUG(lease, DFID " lease broken? %d",
+                  PFID(mdt_object_fid(o)), lease_broken);
+
+       /* Cancel server side lease. Client side counterpart should
+        * have been cancelled. It's okay to cancel it now as we've
+        * held mot_open_sem. */
+       ldlm_lock_cancel(lease);
+
+       if (lease_broken)
+               GOTO(out_unlock_sem, rc = -ESTALE);
+
+       mdt_lock_reg_init(lh1, LCK_EX);
+       rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
+                            MDS_INODELOCK_XATTR);
+       if (rc < 0)
+               GOTO(out_unlock_sem, rc);
+
+       mdt_lock_reg_init(lh2, LCK_EX);
+       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+                            MDS_INODELOCK_XATTR);
+       if (rc < 0)
+               GOTO(out_unlock1, rc);
+
+       /* Swap layout with orphan object */
+       rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+                            mdt_object_child(o2), 0);
+       if (rc < 0)
+               GOTO(out_unlock2, rc);
+
+       EXIT;
+
+out_unlock2:
+       /* Release exclusive LL */
+       mdt_object_unlock(info, o2, lh2, 1);
+
+out_unlock1:
+       mdt_object_unlock(info, o1, lh1, 1);
+
+out_unlock_sem:
+       up_write(&o->mot_open_sem);
+
+       /* already swapped */
+       if (rc == 0) {
+               struct mdt_body *repbody;
+
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+       }
+
+out_obj:
+       mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
+
+       ldlm_reprocess_all(lease->l_resource);
+
+out_lease:
+       LDLM_LOCK_PUT(lease);
 
+       if (ma != NULL) {
+               ma->ma_valid = 0;
+               ma->ma_need = 0;
+       }
+
+       return rc;
+}
+
+#define MFD_CLOSED(mode) ((mode) == MDS_FMODE_CLOSED)
 static int mdt_mfd_closed(struct mdt_file_data *mfd)
 {
         return ((mfd == NULL) || MFD_CLOSED(mfd->mfd_mode));
@@ -1767,12 +1947,23 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                rc = mdt_hsm_release(info, o, ma);
                if (rc < 0) {
                        CDEBUG(D_HSM, "%s: File " DFID " release failed: %d\n",
-                               mdt_obd_name(info->mti_mdt),
-                               PFID(mdt_object_fid(o)), rc);
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(o)), rc);
                        /* continue to close even error occurred. */
                }
        }
 
+       if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
+               rc = mdt_close_swap_layouts(info, o, ma);
+               if (rc < 0) {
+                       CDEBUG(D_INODE,
+                              "%s: cannot swap layout of "DFID": rc=%d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(o)), rc);
+                       /* continue to close even if error occurred. */
+               }
+       }
+
        if (mode & FMODE_WRITE)
                mdt_write_put(o);
        else if (mode & MDS_FMODE_EXEC)
@@ -1811,13 +2002,48 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        RETURN(rc);
 }
 
+int mdt_close_internal(struct mdt_thread_info *info, struct ptlrpc_request *req,
+                      struct mdt_body *repbody)
+{
+       struct mdt_export_data *med;
+       struct mdt_file_data   *mfd;
+       struct mdt_object      *o;
+       struct md_attr         *ma = &info->mti_attr;
+       int                     ret = 0;
+       int                     rc = 0;
+       ENTRY;
+
+       med = &req->rq_export->exp_mdt_data;
+       spin_lock(&med->med_open_lock);
+       mfd = mdt_handle2mfd(med, &info->mti_close_handle, req_is_replay(req));
+       if (mdt_mfd_closed(mfd)) {
+               spin_unlock(&med->med_open_lock);
+               CDEBUG(D_INODE, "no handle for file close: fid = "DFID
+                      ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
+                      info->mti_close_handle.cookie);
+               /** not serious error since bug 3633 */
+               rc = -ESTALE;
+       } else {
+               class_handle_unhash(&mfd->mfd_handle);
+               list_del_init(&mfd->mfd_list);
+               spin_unlock(&med->med_open_lock);
+
+               /* Do not lose object before last unlink. */
+               o = mfd->mfd_object;
+               mdt_object_get(info->mti_env, o);
+               ret = mdt_mfd_close(info, mfd);
+               if (repbody != NULL)
+                       rc = mdt_handle_last_unlink(info, o, ma);
+               mdt_object_put(info->mti_env, o);
+       }
+
+       RETURN(rc ? rc : ret);
+}
+
 int mdt_close(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
-        struct mdt_export_data *med;
-        struct mdt_file_data   *mfd;
-        struct mdt_object      *o;
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
         int rc, ret = 0;
@@ -1860,30 +2086,10 @@ int mdt_close(struct tgt_session_info *tsi)
                 rc = err_serious(rc);
         }
 
-        med = &req->rq_export->exp_mdt_data;
-       spin_lock(&med->med_open_lock);
-       mfd = mdt_handle2mfd(med, &info->mti_close_handle, req_is_replay(req));
-       if (mdt_mfd_closed(mfd)) {
-               spin_unlock(&med->med_open_lock);
-               CDEBUG(D_INODE, "no handle for file close: fid = "DFID
-                      ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
-                      info->mti_close_handle.cookie);
-               /** not serious error since bug 3633 */
-               rc = -ESTALE;
-       } else {
-               class_handle_unhash(&mfd->mfd_handle);
-               list_del_init(&mfd->mfd_list);
-               spin_unlock(&med->med_open_lock);
+       rc = mdt_close_internal(info, req, repbody);
+       if (rc != -ESTALE)
+               mdt_empty_transno(info, rc);
 
-                /* Do not lose object before last unlink. */
-                o = mfd->mfd_object;
-                mdt_object_get(info->mti_env, o);
-                ret = mdt_mfd_close(info, mfd);
-                if (repbody != NULL)
-                        rc = mdt_handle_last_unlink(info, o, ma);
-                mdt_empty_transno(info, rc);
-                mdt_object_put(info->mti_env, o);
-        }
         if (repbody != NULL) {
                 mdt_client_compatibility(info);
                 rc = mdt_fix_reply(info);