Whamcloud - gitweb
LU-15107 mdt: Exclusive create isn't replayed
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index c13fe07..d59e92d 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdt/mdt_open.c
  *
 #include "mdt_internal.h"
 #include <lustre_nodemap.h>
 
-static const struct portals_handle_ops mfd_open_handle_ops = {
-       .hop_free   = NULL,
-       .hop_type       = "mdt",
-};
+static const char mfd_open_handle_owner[] = "mdt";
 
 /* Create a new mdt_file_data struct, initialize it,
  * and insert it to global hash table */
@@ -59,10 +55,10 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
        OBD_ALLOC_PTR(mfd);
        if (mfd != NULL) {
                refcount_set(&mfd->mfd_open_handle.h_ref, 1);
-               INIT_LIST_HEAD_RCU(&mfd->mfd_open_handle.h_link);
+               INIT_HLIST_NODE(&mfd->mfd_open_handle.h_link);
                mfd->mfd_owner = med;
                INIT_LIST_HEAD(&mfd->mfd_list);
-               class_handle_hash(&mfd->mfd_open_handle, &mfd_open_handle_ops);
+               class_handle_hash(&mfd->mfd_open_handle, mfd_open_handle_owner);
        }
 
        RETURN(mfd);
@@ -82,7 +78,7 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med,
        ENTRY;
 
        LASSERT(open_handle != NULL);
-       mfd = class_handle2object(open_handle->cookie, &mfd_open_handle_ops);
+       mfd = class_handle2object(open_handle->cookie, mfd_open_handle_owner);
        if (mfd)
                refcount_dec(&mfd->mfd_open_handle.h_ref);
 
@@ -104,7 +100,8 @@ void mdt_mfd_free(struct mdt_file_data *mfd)
 {
        LASSERT(refcount_read(&mfd->mfd_open_handle.h_ref) == 1);
        LASSERT(list_empty(&mfd->mfd_list));
-       OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_open_handle);
+       OBD_FREE_PRE(mfd, sizeof(*mfd), "rcu");
+       kfree_rcu(mfd, mfd_open_handle.h_rcu);
 }
 
 static int mdt_create_data(struct mdt_thread_info *info,
@@ -309,7 +306,15 @@ static void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
                                     struct mdt_object *obj,
                                     struct md_attr *ma)
 {
-       LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+       if (ma->ma_lmv || ma->ma_lmm) {
+               CDEBUG(D_INFO, DFID " %s already set.\n",
+                      PFID(mdt_object_fid(obj)),
+                      ma->ma_lmv ? (ma->ma_lmm ? "ma_lmv and ma_lmm"
+                                               : "ma_lmv")
+                                 : "ma_lmm");
+               return;
+       }
+
        if (S_ISDIR(obj->mot_header.loh_attr)) {
                ma->ma_lmv = req_capsule_server_get(info->mti_pill,
                                                    &RMF_MDT_MD);
@@ -775,6 +780,11 @@ static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
                        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                                        DISP_LOOKUP_EXECD |
                                                        DISP_LOOKUP_POS));
+                       if ((open_flags & MDS_OPEN_EXCL) &&
+                           (open_flags & MDS_OPEN_CREAT))
+                               mdt_set_disposition(info, rep,
+                                                   DISP_OPEN_CREATE);
+
                        mdt_prep_ma_buf_from_rep(info, o, ma);
                        rc = mdt_attr_get_complex(info, o, ma);
                        if (rc == 0)
@@ -803,8 +813,9 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
        bool try_layout = false;
        bool create_layout = false;
        int rc = 0;
-       int dom_stripes = LMM_NO_DOM;
-       bool dom_lock = false;
+       __u32 dom_stripe = 0;
+       unsigned int dom_only = 0;
+       unsigned int dom_lock = 0;
 
        ENTRY;
 
@@ -822,23 +833,23 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                    ma->ma_need & MA_LOV)
                        try_layout = true;
 
-               /* DoM files can have just MDT stripe or combined MDT + OST
-                * stripes.
-                * - In the first case the open for read/write will do IO to
-                *   the MDT stripe and it makes sense to take IO lock in
-                *   advance along with OPEN even if it is blocking lock.
-                * - In the second case it is just size of MDT stripe and it
-                *   is quite unlikely that client will write into it, though
-                *   it may read it. So IO lock will be taken optionally if it
-                *   is non-blocking one.
+               /* DoM files can take IO lock at OPEN when it makes sense,
+                * check if file has DoM stripe and ask for lock if client
+                * no lock on that resource yet.
                 */
                if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
-                       dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
-
-               if (dom_stripes == LMM_DOM_ONLY &&
-                   info->mti_mdt->mdt_opts.mo_dom_lock > 0 &&
+                       dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm,
+                                                            &dom_only);
+               /* If only DOM stripe is being used then we can expect IO
+                * to it after OPEN and will return corresponding DOM ibit
+                * using default strategy from mdt_opts.mo_dom_lock.
+                * Otherwise trylock mode is used always and DOM ibit will
+                * be returned optionally.
+                */
+               if (dom_stripe &&
                    !mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
-                       dom_lock = true;
+                       dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN :
+                                  info->mti_mdt->mdt_opts.mo_dom_lock;
        }
 
        if (acq_lease) {
@@ -861,8 +872,8 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        GOTO(out, rc = -EPROTO);
                }
 
-               /* XXX: only exclusive open is supported. */
-               lm = LCK_EX;
+               /* should conflict with new opens for write/execute */
+               lm = LCK_PW;
                *ibits = MDS_INODELOCK_OPEN;
 
                /* never grant LCK_EX layout lock to client */
@@ -893,17 +904,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        lhc = &info->mti_lh[MDT_LH_LOCAL];
                } else if (dom_lock) {
                        lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
-                       if (info->mti_mdt->mdt_opts.mo_dom_lock ==
-                           TRYLOCK_DOM_ON_OPEN) {
-                               trybits |= MDS_INODELOCK_DOM |
-                                          MDS_INODELOCK_LAYOUT;
-                       } else {
-                               /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
-                               *ibits = MDS_INODELOCK_DOM;
-                               if (info->mti_mdt->mdt_opts.mo_dom_read_open) {
-                                       trybits |= MDS_INODELOCK_LAYOUT;
-                               }
-                       }
+                       trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT;
                }
 
                CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
@@ -990,8 +991,11 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        PFID(mdt_object_fid(obj)),
                        atomic_read(&obj->mot_open_count), open_count);
 
-               if (atomic_read(&obj->mot_open_count) > open_count)
-                       GOTO(out, rc = -EBUSY);
+               if (atomic_read(&obj->mot_open_count) > open_count) {
+                       /* fail if anyone *else* has opened file for write */
+                       if (mdt_write_read(obj) > 1)
+                               GOTO(out, rc = -EBUSY);
+               }
        }
        GOTO(out, rc);
 
@@ -1220,7 +1224,13 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (rc != 0)
                                GOTO(out, rc);
 
-                       mdt_pack_secctx_in_reply(info, o);
+                       rc = mdt_pack_secctx_in_reply(info, o);
+                       if (unlikely(rc))
+                               GOTO(out, rc);
+
+                       rc = mdt_pack_encctx_in_reply(info, o);
+                       if (unlikely(rc))
+                               GOTO(out, rc);
 
                        rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep);
                } else {
@@ -1299,7 +1309,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        struct ptlrpc_request *req = mdt_info_req(info);
        struct mdt_object *parent;
        struct mdt_object *child;
-       struct mdt_lock_handle *lh;
+       struct mdt_lock_handle *lh = NULL;
        struct ldlm_reply *ldlm_rep;
        struct mdt_body *repbody;
        struct lu_fid *child_fid = &info->mti_tmp_fid1;
@@ -1310,13 +1320,14 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        int result, rc;
        int created = 0;
        int object_locked = 0;
+       enum ldlm_mode lock_mode = LCK_PR;
        u32 msg_flags;
+       ktime_t kstart = ktime_get();
 
        ENTRY;
        OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
                               (obd_timeout + 1) / 4);
 
-       mdt_counter_incr(req, LPROC_MDT_OPEN);
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
        ma->ma_need = MA_INODE;
@@ -1387,84 +1398,65 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        if (result < 0)
                GOTO(out, result);
 
-again:
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, (open_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
-                         &rr->rr_name);
-
        parent = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
        if (IS_ERR(parent))
                GOTO(out, result = PTR_ERR(parent));
 
-       result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
-       if (result != 0) {
+       /* get and check version of parent */
+       result = mdt_version_get_check(info, parent, 0);
+       if (result) {
                mdt_object_put(info->mti_env, parent);
                GOTO(out, result);
        }
 
-       /* get and check version of parent */
-       result = mdt_version_get_check(info, parent, 0);
-       if (result)
-               GOTO(out_parent, result);
-
+       OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
+again_pw:
        fid_zero(child_fid);
 
-       result = -ENOENT;
-       if ((open_flags & MDS_OPEN_VOLATILE) == 0)
+       if (open_flags & MDS_OPEN_VOLATILE) {
+               lh = NULL;
+               result = -ENOENT;
+       } else {
+               lh = &info->mti_lh[MDT_LH_PARENT];
+               mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
+               result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+               if (result != 0) {
+                       mdt_object_put(info->mti_env, parent);
+                       GOTO(out, result);
+               }
+
                result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                                    &rr->rr_name, child_fid, &info->mti_spec);
+       }
 
        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
                 "looking for "DFID"/"DNAME", found FID = "DFID"\n",
                 PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
                 PFID(child_fid));
 
-       if (result != 0 && result != -ENOENT && result != -ESTALE)
+       if (result != 0 && result != -ENOENT)
                GOTO(out_parent, result);
 
-       if (result == -ENOENT || result == -ESTALE) {
-               /* If the object is dead, let's check if the object
-                * is being migrated to a new object */
-               if (result == -ESTALE) {
-                       struct lu_buf lmv_buf;
-
-                       lmv_buf.lb_buf = info->mti_xattr_buf;
-                       lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
-                       rc = mo_xattr_get(info->mti_env,
-                                         mdt_object_child(parent),
-                                         &lmv_buf, XATTR_NAME_LMV);
-                       if (rc > 0) {
-                               struct lmv_mds_md_v1 *lmv;
-
-                               lmv = lmv_buf.lb_buf;
-                               if (le32_to_cpu(lmv->lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                                       /* Get the new parent FID and retry */
-                                       mdt_object_unlock_put(info, parent,
-                                                             lh, 1);
-                                       mdt_lock_handle_init(lh);
-                                       fid_le_to_cpu(
-                                               (struct lu_fid *)rr->rr_fid1,
-                                               &lmv->lmv_stripe_fids[1]);
-                                       goto again;
-                               }
-                       }
-               }
+       OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
 
+       if (result == -ENOENT) {
                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
-               if (result == -ESTALE) {
-                       /*
-                        * -ESTALE means the parent is a dead(unlinked) dir, so
-                        * it should return -ENOENT to in accordance with the
-                        * original mds implementaion.
-                        */
-                       GOTO(out_parent, result = -ENOENT);
-               }
-
                if (!(open_flags & MDS_OPEN_CREAT))
                        GOTO(out_parent, result);
                if (mdt_rdonly(req->rq_export))
                        GOTO(out_parent, result = -EROFS);
+
+               LASSERT(equi(lh == NULL, open_flags & MDS_OPEN_VOLATILE));
+
+               if (lh != NULL && lock_mode == LCK_PR) {
+                       /* first pass: get write lock and restart */
+                       mdt_object_unlock(info, parent, lh, 1);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                       mdt_lock_handle_init(lh);
+                       lock_mode = LCK_PW;
+                       goto again_pw;
+               }
+
                *child_fid = *info->mti_rr.rr_fid2;
                LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                         PFID(child_fid));
@@ -1504,10 +1496,6 @@ again:
                /* Not found and with MDS_OPEN_CREAT: let's create it. */
                mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
-               /* Let lower layers know what is lock mode on directory. */
-               info->mti_spec.sp_cr_mode =
-                       mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
-
                /* Don't do lookup sanity check. We know name doesn't exist. */
                info->mti_spec.sp_cr_lookup = 0;
                info->mti_spec.sp_feat = &dt_directory_features;
@@ -1528,7 +1516,8 @@ again:
                                 GOTO(out_child, result);
                 }
                created = 1;
-               mdt_counter_incr(req, LPROC_MDT_MKNOD);
+               mdt_counter_incr(req, LPROC_MDT_MKNOD,
+                                ktime_us_delta(ktime_get(), kstart));
         } else {
                 /*
                  * The object is on remote node, return its FID for remote open.
@@ -1559,6 +1548,14 @@ again:
                                result = -MDT_EREMOTE_OPEN;
                         GOTO(out_child, result);
                } else if (mdt_object_exists(child)) {
+                       /* Check early for MDS_OPEN_DIRECTORY/O_DIRECTORY to
+                        * avoid opening regular files from lfs getstripe
+                        * since doing so breaks the leases used by lfs
+                        * mirror. See LU-13693. */
+                       if (open_flags & MDS_OPEN_DIRECTORY &&
+                           S_ISREG(lu_object_attr(&child->mot_obj)))
+                               GOTO(out_child, result = -ENOTDIR);
+
                        /* We have to get attr & LOV EA & HSM for this
                         * object. */
                        mdt_prep_ma_buf_from_rep(info, child, ma);
@@ -1575,7 +1572,16 @@ again:
                }
        }
 
-       mdt_pack_secctx_in_reply(info, child);
+       repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
+       repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
+
+       rc = mdt_pack_secctx_in_reply(info, child);
+       if (unlikely(rc))
+               GOTO(out_child, result = rc);
+
+       rc = mdt_pack_encctx_in_reply(info, child);
+       if (unlikely(rc))
+               GOTO(out_child, result = rc);
 
        rc = mdt_check_resent_lock(info, child, lhc);
        if (rc < 0) {
@@ -1629,6 +1635,10 @@ again:
                        mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                }
        }
+
+       mdt_counter_incr(req, LPROC_MDT_OPEN,
+                        ktime_us_delta(ktime_get(), kstart));
+
        EXIT;
 out_child_unlock:
        if (object_locked)
@@ -1638,7 +1648,10 @@ out_child:
        if (result == 0)
                mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh);
 out_parent:
-       mdt_object_unlock_put(info, parent, lh, result || !created);
+       if (lh != NULL)
+               mdt_object_unlock(info, parent, lh, result || !created);
+
+       mdt_object_put(info->mti_env, parent);
 out:
        if (result)
                lustre_msg_set_transno(req->rq_repmsg, 0);
@@ -1663,7 +1676,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                .ln_namelen = sizeof("i_am_nobody") - 1,
        };
        struct lu_ucred *uc;
-       cfs_cap_t uc_cap_save;
+       kernel_cap_t uc_cap_save;
        int rc;
        ENTRY;
 
@@ -1681,7 +1694,6 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 
        spec->sp_cr_lookup = 0;
        spec->sp_feat = &dt_directory_features;
-       spec->sp_cr_mode = MDL_MINMODE; /* no lock */
        spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
        if (attr->ma_valid & MA_LOV) {
                spec->u.sp_ea.eadata = attr->ma_lmm;
@@ -1693,7 +1705,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 
        uc = lu_ucred(env);
        uc_cap_save = uc->uc_cap;
-       uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
+       cap_raise(uc->uc_cap, CAP_DAC_OVERRIDE);
        rc = mdo_create(env, mdt_object_child(local_root), &lname,
                        mdt_object_child(obj), spec, attr);
        uc->uc_cap = uc_cap_save;
@@ -1772,7 +1784,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        struct mdt_object      *orphan;
        struct md_attr         *orp_ma;
        struct lu_buf          *buf;
-       cfs_cap_t               cap;
+       kernel_cap_t cap;
        bool                    lease_broken;
        int                     rc;
        int                     rc2;
@@ -1965,7 +1977,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        /* The orphan has root ownership so we need to raise
         * CAP_FOWNER to set the HSM attributes. */
        cap = uc->uc_cap;
-       uc->uc_cap |= MD_CAP_TO_MASK(CFS_CAP_FOWNER);
+       cap_raise(uc->uc_cap, CAP_FOWNER);
        rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
                          XATTR_NAME_HSM, 0);
        uc->uc_cap = cap;
@@ -2018,7 +2030,8 @@ out_unlock:
        }
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2034,9 +2047,9 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        struct mdt_lock_handle  *lh2 = &info->mti_lh[MDT_LH_OLD];
        struct close_data       *data;
        struct ldlm_lock        *lease;
-       struct mdt_object       *o1 = o, *o2;
+       struct mdt_object       *o1 = o, *o2 = NULL;
        bool                     lease_broken;
-       bool                     swap_objects;
+       bool                     swap_objects = false;
        int                      rc;
        ENTRY;
 
@@ -2054,37 +2067,52 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                RETURN(-EINVAL);
 
        rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
-       if (unlikely(rc == 0))
-               RETURN(-EINVAL);
+       if (rc == 0) {
+               /**
+                * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
+                * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
+                */
+               if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
+                       RETURN(-EINVAL);
 
-       /* Exchange o1 and o2, to enforce locking order */
-       swap_objects = (rc < 0);
+               /* zero cd_fid to keeps o2 be NULL */
+               fid_zero(&data->cd_fid);
+       } else if (rc < 0) {
+               /* Exchange o1 and o2, to enforce locking order */
+               swap_objects = true;
+       }
 
        lease = ldlm_handle2lock(&data->cd_handle);
        if (lease == NULL)
                RETURN(-ESTALE);
 
-       o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
-       if (IS_ERR(o2))
-               GOTO(out_lease, rc = PTR_ERR(o2));
+       if (!fid_is_zero(&data->cd_fid)) {
+               o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+                                    &data->cd_fid);
+               if (IS_ERR(o2))
+                       GOTO(out_lease, rc = PTR_ERR(o2));
 
-       if (!S_ISREG(lu_object_attr(&o2->mot_obj))) {
-               swap_objects = false; /* not swapped yet */
-               GOTO(out_obj, rc = -EINVAL);
-       }
+               if (!mdt_object_exists(o2))
+                       GOTO(out_obj, rc = -ENOENT);
 
-       if (swap_objects)
-               swap(o1, o2);
+               if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
+                       GOTO(out_obj, rc = -EINVAL);
+
+               if (swap_objects)
+                       swap(o1, o2);
+       }
 
        rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
                           MAY_WRITE);
        if (rc < 0)
                GOTO(out_obj, rc);
 
-       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
-                          MAY_WRITE);
-       if (rc < 0)
-               GOTO(out_obj, rc);
+       if (o2) {
+               rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
+                                  NULL, MAY_WRITE);
+               if (rc < 0)
+                       GOTO(out_obj, rc);
+       }
 
        /* try to hold open_sem so that nobody else can open the file */
        if (!down_write_trylock(&o->mot_open_sem)) {
@@ -2114,11 +2142,13 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        if (rc < 0)
                GOTO(out_unlock_sem, rc);
 
-       mdt_lock_reg_init(lh2, LCK_EX);
-       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
-       if (rc < 0)
-               GOTO(out_unlock1, rc);
+       if (o2) {
+               mdt_lock_reg_init(lh2, LCK_EX);
+               rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+                                    MDS_INODELOCK_XATTR);
+               if (rc < 0)
+                       GOTO(out_unlock1, rc);
+       }
 
        /* Swap layout with orphan object */
        if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
@@ -2129,9 +2159,26 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                struct lu_buf *buf = &info->mti_buf;
                struct md_rejig_data mrd;
 
-               mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
-               if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
+               if (o2) {
+                       mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               } else {
+                       if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
+                               /* paranoid check again */
+                               CERROR(DFID
+                                 ":only mirror split support NULL o2 object\n",
+                                       PFID(mdt_object_fid(o)));
+                               GOTO(out_unlock1, rc = -EINVAL);
+                       }
+
+                       /* set NULL mrd_obj for deleting mirror objects */
+                       mrd.mrd_obj = NULL;
+               }
+
+               if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
                        mrd.mrd_mirror_id = data->cd_mirror_id;
+                       /* set a small enough blocks in the SoM */
+                       ma->ma_attr.la_blocks >>= 1;
+               }
 
                buf->lb_len = sizeof(mrd);
                buf->lb_buf = &mrd;
@@ -2139,11 +2186,18 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                                  XATTR_LUSTRE_LOV,
                                  ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
                                  LU_XATTR_SPLIT : LU_XATTR_MERGE);
-               if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
+               if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
+                                                      LA_LSIZE | LA_LBLOCKS)) {
                        int rc2;
+                       enum lustre_som_flags lsf;
+
+                       if (ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS))
+                               lsf = SOM_FL_STRICT;
+                       else
+                               lsf = SOM_FL_LAZY;
 
                        mutex_lock(&o->mot_som_mutex);
-                       rc2 = mdt_set_som(info, o, SOM_FL_STRICT,
+                       rc2 = mdt_set_som(info, o, lsf,
                                          ma->ma_attr.la_size,
                                          ma->ma_attr.la_blocks);
                        mutex_unlock(&o->mot_som_mutex);
@@ -2161,7 +2215,8 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
 
 out_unlock2:
        /* Release exclusive LL */
-       mdt_object_unlock(info, o2, lh2, 1);
+       if (o2)
+               mdt_object_unlock(info, o2, lh2, 1);
 
 out_unlock1:
        mdt_object_unlock(info, o1, lh1, 1);
@@ -2179,9 +2234,15 @@ out_unlock_sem:
        }
 
 out_obj:
-       mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
+       if (o1 != o)
+               /* the 2nd object has been used, and swapped to o1 */
+               mdt_object_put(info->mti_env, o1);
+       else if (o2)
+               /* the 2nd object has been used, and not swapped */
+               mdt_object_put(info->mti_env, o2);
 
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
 
 out_lease:
        LDLM_LOCK_PUT(lease);
@@ -2218,7 +2279,7 @@ static int mdt_close_resync_done(struct mdt_thread_info *info,
        if (data == NULL)
                RETURN(-EPROTO);
 
-       if (ptlrpc_req_need_swab(mdt_info_req(info)))
+       if (req_capsule_req_need_swab(info->mti_pill))
                lustre_swab_close_data_resync_done(&data->cd_resync);
 
        if (!fid_is_zero(&data->cd_fid))
@@ -2259,7 +2320,7 @@ static int mdt_close_resync_done(struct mdt_thread_info *info,
                                           RCL_CLIENT))
                        GOTO(out_unlock, rc = -EPROTO);
 
-               OBD_ALLOC(resync_ids, resync_count * sizeof(__u32));
+               OBD_ALLOC_PTR_ARRAY(resync_ids, resync_count);
                if (!resync_ids)
                        GOTO(out_unlock, rc = -ENOMEM);
 
@@ -2299,10 +2360,11 @@ out_unlock:
        }
 
        if (resync_ids)
-               OBD_FREE(resync_ids, resync_count * sizeof(__u32));
+               OBD_FREE_PTR_ARRAY(resync_ids, resync_count);
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2398,6 +2460,13 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                ma->ma_valid = MA_INODE;
                ma->ma_attr_flags |= MDS_CLOSE_UPDATE_TIMES;
                ma->ma_attr.la_valid &= (LA_ATIME | LA_MTIME | LA_CTIME);
+
+               if (ma->ma_attr.la_valid & LA_MTIME) {
+                       rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
+                       if (!rc)
+                               ma->ma_valid |= MA_PFID;
+               }
+
                rc = mo_attr_set(info->mti_env, next, ma);
        }
 
@@ -2466,10 +2535,10 @@ int mdt_close(struct tgt_session_info *tsi)
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
+       ktime_t                 kstart = ktime_get();
         int rc, ret = 0;
         ENTRY;
 
-       mdt_counter_incr(req, LPROC_MDT_CLOSE);
        /* Close may come with the Size-on-MDS update. Unpack it. */
        rc = mdt_close_unpack(info);
        if (rc)
@@ -2524,5 +2593,8 @@ int mdt_close(struct tgt_session_info *tsi)
                tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
 out:
        mdt_thread_info_fini(info);
+       if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_CLOSE,
+                                ktime_us_delta(ktime_get(), kstart));
        RETURN(rc ? rc : ret);
 }