Whamcloud - gitweb
LU-13693 lfs: check early for MDS_OPEN_DIRECTORY
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index f01571b..5aa5314 100644 (file)
 #include "mdt_internal.h"
 #include <lustre_nodemap.h>
 
-/* we do nothing because we do not have refcount now */
-static void mdt_mfd_get(void *mfdp)
-{
-}
-
-static struct portals_handle_ops mfd_open_handle_ops = {
-       .hop_addref = mdt_mfd_get,
-       .hop_free   = NULL,
-};
+static const char mfd_open_handle_owner[] = "mdt";
 
 /* Create a new mdt_file_data struct, initialize it,
  * and insert it to global hash table */
@@ -63,10 +55,11 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
 
        OBD_ALLOC_PTR(mfd);
        if (mfd != NULL) {
-               INIT_LIST_HEAD_RCU(&mfd->mfd_open_handle.h_link);
-               mfd->mfd_open_handle.h_owner = med;
+               refcount_set(&mfd->mfd_open_handle.h_ref, 1);
+               INIT_HLIST_NODE(&mfd->mfd_open_handle.h_link);
+               mfd->mfd_owner = med;
                INIT_LIST_HEAD(&mfd->mfd_list);
-               class_handle_hash(&mfd->mfd_open_handle, &mfd_open_handle_ops);
+               class_handle_hash(&mfd->mfd_open_handle, mfd_open_handle_owner);
        }
 
        RETURN(mfd);
@@ -86,9 +79,12 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med,
        ENTRY;
 
        LASSERT(open_handle != NULL);
-       mfd = class_handle2object(open_handle->cookie, med);
+       mfd = class_handle2object(open_handle->cookie, mfd_open_handle_owner);
+       if (mfd)
+               refcount_dec(&mfd->mfd_open_handle.h_ref);
+
        /* during dw/setattr replay the mfd can be found by old handle */
-       if (mfd == NULL && is_replay_or_resent) {
+       if ((!mfd || mfd->mfd_owner != med) && is_replay_or_resent) {
                list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                        if (mfd->mfd_open_handle_old.cookie ==
                            open_handle->cookie)
@@ -103,8 +99,10 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med,
 /* free mfd */
 void mdt_mfd_free(struct mdt_file_data *mfd)
 {
+       LASSERT(refcount_read(&mfd->mfd_open_handle.h_ref) == 1);
        LASSERT(list_empty(&mfd->mfd_list));
-       OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_open_handle);
+       OBD_FREE_PRE(mfd, sizeof(*mfd), "rcu");
+       kfree_rcu(mfd, mfd_open_handle.h_rcu);
 }
 
 static int mdt_create_data(struct mdt_thread_info *info,
@@ -309,7 +307,15 @@ static void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
                                     struct mdt_object *obj,
                                     struct md_attr *ma)
 {
-       LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+       if (ma->ma_lmv || ma->ma_lmm) {
+               CDEBUG(D_INFO, DFID " %s already set.\n",
+                      PFID(mdt_object_fid(obj)),
+                      ma->ma_lmv ? (ma->ma_lmm ? "ma_lmv and ma_lmm"
+                                               : "ma_lmv")
+                                 : "ma_lmm");
+               return;
+       }
+
        if (S_ISDIR(obj->mot_header.loh_attr)) {
                ma->ma_lmv = req_capsule_server_get(info->mti_pill,
                                                    &RMF_MDT_MD);
@@ -394,7 +400,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                RETURN(rc);
 
        rc = mo_open(info->mti_env, mdt_object_child(o),
-                    created ? open_flags | MDS_OPEN_CREATED : open_flags);
+                    created ? open_flags | MDS_OPEN_CREATED : open_flags,
+                    &info->mti_spec);
        if (rc != 0) {
                /* If we allow the client to chgrp (CFS_SETGRP_PERM), but the
                 * client does not know which suppgid should be sent to the MDS,
@@ -542,6 +549,11 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-EOPNOTSUPP);
        }
 
+       /* Overstriped files can crash older clients */
+       if (isreg && !exp_connect_overstriping(exp) &&
+           mdt_lmm_is_overstriping(ma->ma_lmm))
+               RETURN(-EOPNOTSUPP);
+
        /* LU-2275, simulate broken behaviour (esp. prevalent in
         * pre-2.4 servers where a very strange reply is sent on error
         * that looks like it was actually almost successful and a
@@ -557,7 +569,7 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-ENOENT);
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
                struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
                if (IS_ERR(nodemap))
@@ -797,8 +809,9 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
        bool try_layout = false;
        bool create_layout = false;
        int rc = 0;
-       int dom_stripes = LMM_NO_DOM;
-       bool dom_lock = false;
+       __u32 dom_stripe = 0;
+       unsigned int dom_only = 0;
+       unsigned int dom_lock = 0;
 
        ENTRY;
 
@@ -816,23 +829,23 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                    ma->ma_need & MA_LOV)
                        try_layout = true;
 
-               /* DoM files can have just MDT stripe or combined MDT + OST
-                * stripes.
-                * - In the first case the open for read/write will do IO to
-                *   the MDT stripe and it makes sense to take IO lock in
-                *   advance along with OPEN even if it is blocking lock.
-                * - In the second case it is just size of MDT stripe and it
-                *   is quite unlikely that client will write into it, though
-                *   it may read it. So IO lock will be taken optionally if it
-                *   is non-blocking one.
+               /* DoM files can take IO lock at OPEN when it makes sense,
+                * check if file has DoM stripe and ask for lock if client
+                * no lock on that resource yet.
                 */
                if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
-                       dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
-
-               if (dom_stripes == LMM_DOM_ONLY &&
-                   info->mti_mdt->mdt_opts.mo_dom_lock > 0 &&
+                       dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm,
+                                                            &dom_only);
+               /* If only DOM stripe is being used then we can expect IO
+                * to it after OPEN and will return corresponding DOM ibit
+                * using default strategy from mdt_opts.mo_dom_lock.
+                * Otherwise trylock mode is used always and DOM ibit will
+                * be returned optionally.
+                */
+               if (dom_stripe &&
                    !mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
-                       dom_lock = true;
+                       dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN :
+                                  info->mti_mdt->mdt_opts.mo_dom_lock;
        }
 
        if (acq_lease) {
@@ -887,16 +900,14 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        lhc = &info->mti_lh[MDT_LH_LOCAL];
                } else if (dom_lock) {
                        lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
-                       if (info->mti_mdt->mdt_opts.mo_dom_lock ==
-                           TRYLOCK_DOM_ON_OPEN) {
+                       if (dom_lock == TRYLOCK_DOM_ON_OPEN) {
                                trybits |= MDS_INODELOCK_DOM |
                                           MDS_INODELOCK_LAYOUT;
                        } else {
-                               /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
+                               /* dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
                                *ibits = MDS_INODELOCK_DOM;
-                               if (info->mti_mdt->mdt_opts.mo_dom_read_open) {
+                               if (info->mti_mdt->mdt_opts.mo_dom_read_open)
                                        trybits |= MDS_INODELOCK_LAYOUT;
-                               }
                        }
                }
 
@@ -1214,7 +1225,9 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (rc != 0)
                                GOTO(out, rc);
 
-                       mdt_pack_secctx_in_reply(info, o);
+                       rc = mdt_pack_secctx_in_reply(info, o);
+                       if (unlikely(rc))
+                               GOTO(out, rc);
 
                        rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep);
                } else {
@@ -1305,12 +1318,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        int created = 0;
        int object_locked = 0;
        u32 msg_flags;
+       ktime_t kstart = ktime_get();
 
        ENTRY;
        OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
                               (obd_timeout + 1) / 4);
 
-       mdt_counter_incr(req, LPROC_MDT_OPEN);
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
        ma->ma_need = MA_INODE;
@@ -1356,15 +1369,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 * via a regular replay. */
                if (!(open_flags & MDS_OPEN_CREAT)) {
                        DEBUG_REQ(D_ERROR, req,
-                                 "OPEN & CREAT not in open replay/by_fid.");
+                                 "OPEN & CREAT not in open replay/by_fid");
                        GOTO(out, result = -EFAULT);
                }
                CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
-       } else if (open_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) {
-               /*
-                * MDS_OPEN_LOCK is checked for backward compatibility with 2.1
-                * client.
-                */
+       } else if (open_flags & MDS_OPEN_BY_FID) {
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
                if (result < 0)
                        CDEBUG(D_INFO, "no object for "DFID": %d\n",
@@ -1385,7 +1394,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        if (result < 0)
                GOTO(out, result);
 
-again:
        lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh, (open_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
                          &rr->rr_name);
@@ -1407,56 +1415,21 @@ again:
 
        fid_zero(child_fid);
 
-       result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                           &rr->rr_name, child_fid, &info->mti_spec);
+       result = -ENOENT;
+       if ((open_flags & MDS_OPEN_VOLATILE) == 0)
+               result = mdo_lookup(info->mti_env, mdt_object_child(parent),
+                                   &rr->rr_name, child_fid, &info->mti_spec);
 
        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
                 "looking for "DFID"/"DNAME", found FID = "DFID"\n",
                 PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
                 PFID(child_fid));
 
-       if (result != 0 && result != -ENOENT && result != -ESTALE)
+       if (result != 0 && result != -ENOENT)
                GOTO(out_parent, result);
 
-       if (result == -ENOENT || result == -ESTALE) {
-               /* If the object is dead, let's check if the object
-                * is being migrated to a new object */
-               if (result == -ESTALE) {
-                       struct lu_buf lmv_buf;
-
-                       lmv_buf.lb_buf = info->mti_xattr_buf;
-                       lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
-                       rc = mo_xattr_get(info->mti_env,
-                                         mdt_object_child(parent),
-                                         &lmv_buf, XATTR_NAME_LMV);
-                       if (rc > 0) {
-                               struct lmv_mds_md_v1 *lmv;
-
-                               lmv = lmv_buf.lb_buf;
-                               if (le32_to_cpu(lmv->lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                                       /* Get the new parent FID and retry */
-                                       mdt_object_unlock_put(info, parent,
-                                                             lh, 1);
-                                       mdt_lock_handle_init(lh);
-                                       fid_le_to_cpu(
-                                               (struct lu_fid *)rr->rr_fid1,
-                                               &lmv->lmv_stripe_fids[1]);
-                                       goto again;
-                               }
-                       }
-               }
-
+       if (result == -ENOENT) {
                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
-               if (result == -ESTALE) {
-                       /*
-                        * -ESTALE means the parent is a dead(unlinked) dir, so
-                        * it should return -ENOENT to in accordance with the
-                        * original mds implementaion.
-                        */
-                       GOTO(out_parent, result = -ENOENT);
-               }
-
                if (!(open_flags & MDS_OPEN_CREAT))
                        GOTO(out_parent, result);
                if (mdt_rdonly(req->rq_export))
@@ -1486,7 +1459,8 @@ again:
 
        if (result == -ENOENT) {
                /* Create under OBF and .lustre is not permitted */
-               if (!fid_is_md_operative(rr->rr_fid1))
+               if (!fid_is_md_operative(rr->rr_fid1) &&
+                   (open_flags & MDS_OPEN_VOLATILE) == 0)
                        GOTO(out_child, result = -EPERM);
 
                /* save versions in reply */
@@ -1523,7 +1497,8 @@ again:
                                 GOTO(out_child, result);
                 }
                created = 1;
-               mdt_counter_incr(req, LPROC_MDT_MKNOD);
+               mdt_counter_incr(req, LPROC_MDT_MKNOD,
+                                ktime_us_delta(ktime_get(), kstart));
         } else {
                 /*
                  * The object is on remote node, return its FID for remote open.
@@ -1554,6 +1529,14 @@ again:
                                result = -MDT_EREMOTE_OPEN;
                         GOTO(out_child, result);
                } else if (mdt_object_exists(child)) {
+                       /* Check early for MDS_OPEN_DIRECTORY/O_DIRECTORY to
+                        * avoid opening regular files from lfs getstripe
+                        * since doing so breaks the leases used by lfs
+                        * mirror. See LU-13693. */
+                       if (open_flags & MDS_OPEN_DIRECTORY &&
+                           S_ISREG(lu_object_attr(&child->mot_obj)))
+                               GOTO(out_child, result = -ENOTDIR);
+
                        /* We have to get attr & LOV EA & HSM for this
                         * object. */
                        mdt_prep_ma_buf_from_rep(info, child, ma);
@@ -1570,7 +1553,12 @@ again:
                }
        }
 
-       mdt_pack_secctx_in_reply(info, child);
+       repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
+       repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
+
+       rc = mdt_pack_secctx_in_reply(info, child);
+       if (unlikely(rc))
+               GOTO(out_child, result = rc);
 
        rc = mdt_check_resent_lock(info, child, lhc);
        if (rc < 0) {
@@ -1624,6 +1612,10 @@ again:
                        mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                }
        }
+
+       mdt_counter_incr(req, LPROC_MDT_OPEN,
+                        ktime_us_delta(ktime_get(), kstart));
+
        EXIT;
 out_child_unlock:
        if (object_locked)
@@ -1688,7 +1680,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 
        uc = lu_ucred(env);
        uc_cap_save = uc->uc_cap;
-       uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
+       uc->uc_cap |= BIT(CFS_CAP_DAC_OVERRIDE);
        rc = mdo_create(env, mdt_object_child(local_root), &lname,
                        mdt_object_child(obj), spec, attr);
        uc->uc_cap = uc_cap_save;
@@ -1698,7 +1690,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                GOTO(out, rc);
        }
 
-       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED, spec);
        if (rc < 0)
                CERROR("%s: cannot open volatile file "DFID", orphan "
                       "file will be left in PENDING directory until "
@@ -1741,6 +1733,22 @@ static inline int mdt_hsm_set_released(struct lov_mds_md *lmm)
        return 0;
 }
 
+static inline int mdt_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
+{
+       struct lov_comp_md_v1 *comp_v1;
+
+       if (le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_COMP_V1)) {
+               comp_v1 = (struct lov_comp_md_v1 *)lmm;
+               *gen = le32_to_cpu(comp_v1->lcm_layout_gen);
+       } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
+                  le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
+               *gen = le16_to_cpu(lmm->lmm_layout_gen);
+       } else {
+               return -EINVAL;
+       }
+       return 0;
+}
+
 static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
                           struct md_attr *ma)
 {
@@ -1800,19 +1808,66 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        if (rc != 0)
                GOTO(out_unlock, rc);
 
-       if (!mdt_hsm_release_allow(ma))
-               GOTO(out_unlock, rc = -EPERM);
-
-       /* already released? */
-       if (ma->ma_hsm.mh_flags & HS_RELEASED)
-               GOTO(out_unlock, rc = 0);
+       if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
+               if (ma->ma_valid & MA_HSM) {
+                       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                               GOTO(out_unlock, rc = -EALREADY);
+
+                       if (ma->ma_hsm.mh_arch_id != data->cd_archive_id)
+                               CDEBUG(D_CACHE,
+                                      DFID" archive id diff: %llu:%u\n",
+                                      PFID(mdt_object_fid(o)),
+                                      ma->ma_hsm.mh_arch_id,
+                                      data->cd_archive_id);
+
+                       if (!(ma->ma_hsm.mh_flags & HS_DIRTY) &&
+                           ma->ma_hsm.mh_arch_ver == data->cd_data_version) {
+                               CDEBUG(D_CACHE,
+                                      DFID" data version matches: packed=%llu "
+                                      "and on-disk=%llu\n",
+                                      PFID(mdt_object_fid(o)),
+                                      data->cd_data_version,
+                                      ma->ma_hsm.mh_arch_ver);
+                               ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       }
 
-       /* Compare on-disk and packed data_version */
-       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
-               CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu"
-                      " and on-disk=%llu\n", PFID(mdt_object_fid(o)),
-                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
-               GOTO(out_unlock, rc = -EPERM);
+                       if (ma->ma_hsm.mh_flags & HS_DIRTY)
+                               ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+               } else {
+                       /* Set up HSM attribte for PCC archived object */
+                       BUILD_BUG_ON(sizeof(struct hsm_attrs) >
+                                    sizeof(info->mti_xattr_buf));
+                       buf = &info->mti_buf;
+                       buf->lb_buf = info->mti_xattr_buf;
+                       buf->lb_len = sizeof(struct hsm_attrs);
+                       memset(&ma->ma_hsm, 0, sizeof(ma->ma_hsm));
+                       ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       ma->ma_hsm.mh_arch_id = data->cd_archive_id;
+                       ma->ma_hsm.mh_arch_ver = data->cd_data_version;
+                       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+
+                       rc = mo_xattr_set(info->mti_env, mdt_object_child(o),
+                                         buf, XATTR_NAME_HSM, 0);
+                       if (rc)
+                               GOTO(out_unlock, rc);
+               }
+       } else {
+               if (!mdt_hsm_release_allow(ma))
+                       GOTO(out_unlock, rc = -EPERM);
+
+               /* already released? */
+               if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                       GOTO(out_unlock, rc = 0);
+
+               /* Compare on-disk and packed data_version */
+               if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+                       CDEBUG(D_HSM, DFID" data_version mismatches: "
+                              "packed=%llu and on-disk=%llu\n",
+                              PFID(mdt_object_fid(o)),
+                              data->cd_data_version,
+                              ma->ma_hsm.mh_arch_ver);
+                       GOTO(out_unlock, rc = -EPERM);
+               }
        }
 
        ma->ma_valid = MA_INODE;
@@ -1880,7 +1935,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        }
 
        /* Set up HSM attribute for orphan object */
-       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       BUILD_BUG_ON(sizeof(struct hsm_attrs) > sizeof(info->mti_xattr_buf));
        buf = &info->mti_buf;
        buf->lb_buf = info->mti_xattr_buf;
        buf->lb_len = sizeof(struct hsm_attrs);
@@ -1908,6 +1963,12 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
                             mdt_object_child(orphan),
                             SWAP_LAYOUTS_MDS_HSM);
+
+       if (!rc && ma->ma_attr_flags & MDS_PCC_ATTACH) {
+               ma->ma_need = MA_LOV;
+               rc = mdt_attr_get_complex(info, o, ma);
+       }
+
        EXIT;
 
 out_layout_lock:
@@ -1934,10 +1995,17 @@ out_unlock:
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
                repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+               if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
+                       LASSERT(ma->ma_valid & MA_LOV);
+                       rc = mdt_get_lmm_gen(ma->ma_lmm,
+                                            &repbody->mbo_layout_gen);
+                       if (!rc)
+                               repbody->mbo_valid |= OBD_MD_LAYOUT_VERSION;
+               }
        }
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2100,7 +2168,7 @@ out_unlock_sem:
 out_obj:
        mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
 
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
 
 out_lease:
        LDLM_LOCK_PUT(lease);
@@ -2116,13 +2184,15 @@ out_lease:
 static int mdt_close_resync_done(struct mdt_thread_info *info,
                                 struct mdt_object *o, struct md_attr *ma)
 {
-       struct close_data       *data;
-       struct ldlm_lock        *lease;
-       struct md_layout_change  layout = { 0 };
-       __u32                   *resync_ids = NULL;
-       size_t                   resync_count = 0;
-       bool                     lease_broken;
-       int                      rc;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LOCAL];
+       struct close_data *data;
+       struct ldlm_lock *lease;
+       struct md_layout_change layout = { 0 };
+       __u32 *resync_ids = NULL;
+       size_t resync_count = 0;
+       bool lease_broken;
+       int rc;
+
        ENTRY;
 
        if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
@@ -2176,7 +2246,7 @@ static int mdt_close_resync_done(struct mdt_thread_info *info,
                                           RCL_CLIENT))
                        GOTO(out_unlock, rc = -EPROTO);
 
-               OBD_ALLOC(resync_ids, resync_count * sizeof(__u32));
+               OBD_ALLOC_PTR_ARRAY(resync_ids, resync_count);
                if (!resync_ids)
                        GOTO(out_unlock, rc = -ENOMEM);
 
@@ -2195,10 +2265,12 @@ static int mdt_close_resync_done(struct mdt_thread_info *info,
                layout.mlc_som.lsa_size = ma->ma_attr.la_size;
                layout.mlc_som.lsa_blocks = ma->ma_attr.la_blocks;
        }
-       rc = mdt_layout_change(info, o, &layout);
+       rc = mdt_layout_change(info, o, lhc, &layout);
        if (rc)
                GOTO(out_unlock, rc);
 
+       mdt_object_unlock(info, o, lhc, 0);
+
        EXIT;
 
 out_unlock:
@@ -2214,10 +2286,10 @@ out_unlock:
        }
 
        if (resync_ids)
-               OBD_FREE(resync_ids, resync_count * sizeof(__u32));
+               OBD_FREE_PTR_ARRAY(resync_ids, resync_count);
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2242,7 +2314,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        int rc = 0;
        u64 open_flags;
        u64 intent;
-       bool discard = false;
 
        ENTRY;
 
@@ -2250,8 +2321,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        intent = ma->ma_attr_flags & MDS_CLOSE_INTENT;
        *ofid = *mdt_object_fid(o);
 
-       CDEBUG(D_INODE, "%s: close file "DFID" with intent: %llx\n",
-              mdt_obd_name(info->mti_mdt), PFID(ofid), intent);
+       /* the below message is checked in replay-single.sh test_46 */
+       CDEBUG(D_INODE, "%s: %sclosing file handle "DFID" with intent: %llx\n",
+              mdt_obd_name(info->mti_mdt),
+              ma->ma_valid & MA_FORCE_LOG ? "force " : "", PFID(ofid), intent);
 
        switch (intent) {
        case MDS_HSM_RELEASE: {
@@ -2303,15 +2376,17 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        else if (open_flags & MDS_FMODE_EXEC)
                mdt_write_allow(o);
 
-        /* Update atime on close only. */
+       /* Update atime|mtime|ctime on close. */
        if ((open_flags & MDS_FMODE_EXEC || open_flags & MDS_FMODE_READ ||
             open_flags & MDS_FMODE_WRITE) && (ma->ma_valid & MA_INODE) &&
-           (ma->ma_attr.la_valid & LA_ATIME)) {
-                /* Set the atime only. */
-                ma->ma_valid = MA_INODE;
-                ma->ma_attr.la_valid = LA_ATIME;
-                rc = mo_attr_set(info->mti_env, next, ma);
-        }
+           (ma->ma_attr.la_valid & LA_ATIME ||
+            ma->ma_attr.la_valid & LA_MTIME ||
+            ma->ma_attr.la_valid & LA_CTIME)) {
+               ma->ma_valid = MA_INODE;
+               ma->ma_attr_flags |= MDS_CLOSE_UPDATE_TIMES;
+               ma->ma_attr.la_valid &= (LA_ATIME | LA_MTIME | LA_CTIME);
+               rc = mo_attr_set(info->mti_env, next, ma);
+       }
 
        /* If file data is modified, add the dirty flag. */
        if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
@@ -2326,7 +2401,8 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
        if (!MFD_CLOSED(open_flags)) {
                rc = mo_close(info->mti_env, next, ma, open_flags);
-               discard = mdt_dom_check_for_discard(info, o);
+               if (mdt_dom_check_for_discard(info, o))
+                       mdt_dom_discard_data(info, o);
        }
 
        /* adjust open and lease count */
@@ -2338,9 +2414,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        mdt_mfd_free(mfd);
        mdt_object_put(info->mti_env, o);
 
-       if (discard)
-               mdt_dom_discard_data(info, ofid);
-
        RETURN(rc);
 }
 
@@ -2380,10 +2453,10 @@ int mdt_close(struct tgt_session_info *tsi)
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
+       ktime_t                 kstart = ktime_get();
         int rc, ret = 0;
         ENTRY;
 
-       mdt_counter_incr(req, LPROC_MDT_CLOSE);
        /* Close may come with the Size-on-MDS update. Unpack it. */
        rc = mdt_close_unpack(info);
        if (rc)
@@ -2438,5 +2511,8 @@ int mdt_close(struct tgt_session_info *tsi)
                tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
 out:
        mdt_thread_info_fini(info);
+       if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_CLOSE,
+                                ktime_us_delta(ktime_get(), kstart));
        RETURN(rc ? rc : ret);
 }