Whamcloud - gitweb
LU-8359 ldlm: Wrong evict during failover
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index e997be8..e281117 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2015, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#include <linux/pagemap.h>
 
 #include <dt_object.h>
 #include <lustre_acl.h>
 #include <lustre_export.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre_ioctl.h>
 #include <lustre_lfsck.h>
 #include <lustre_log.h>
 #include <lustre_net.h>
@@ -64,6 +61,7 @@
 #include <lustre_swab.h>
 #include <obd.h>
 #include <obd_support.h>
+#include <lustre_barrier.h>
 
 #include <llog_swab.h>
 
@@ -177,8 +175,8 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
        lh->mlh_type = MDT_PDO_LOCK;
 
        if (lu_name_is_valid(lname)) {
-               lh->mlh_pdo_hash = full_name_hash(lname->ln_name,
-                                                 lname->ln_namelen);
+               lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
+                                                    lname->ln_namelen);
                /* XXX Workaround for LU-2856
                 *
                 * Zero is a valid return value of full_name_hash, but
@@ -356,8 +354,10 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
        struct mdt_device       *mdt = info->mti_mdt;
        struct mdt_body         *repbody;
-       char                    *fileset;
+       char                    *fileset = NULL, *buffer = NULL;
        int                      rc;
+       struct obd_export       *exp = info->mti_exp;
+       char                    *nodemap_fileset;
 
        ENTRY;
 
@@ -373,7 +373,29 @@ static int mdt_get_root(struct tgt_session_info *tsi)
                fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
                if (fileset == NULL)
                        GOTO(out, rc = err_serious(-EFAULT));
+       }
+
+       nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
+       if (nodemap_fileset && nodemap_fileset[0]) {
+               CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
+               if (fileset) {
+                       /* consider fileset from client as a sub-fileset
+                        * of the nodemap one */
+                       OBD_ALLOC(buffer, PATH_MAX + 1);
+                       if (buffer == NULL)
+                               GOTO(out, rc = err_serious(-ENOMEM));
+                       if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
+                                    nodemap_fileset, fileset) >= PATH_MAX + 1)
+                               GOTO(out, rc = err_serious(-EINVAL));
+                       fileset = buffer;
+               } else {
+                       /* enforce fileset as specified in the nodemap */
+                       fileset = nodemap_fileset;
+               }
+       }
 
+       if (fileset) {
+               CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
                rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
                if (rc < 0)
                        GOTO(out, rc = err_serious(rc));
@@ -385,6 +407,8 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        EXIT;
 out:
        mdt_thread_info_fini(info);
+       if (buffer)
+               OBD_FREE(buffer, PATH_MAX+1);
        return rc;
 }
 
@@ -424,15 +448,15 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
                if (rc)
                        GOTO(out, rc);
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                info->mti_mdt->mdt_osfs = *osfs;
                info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        } else {
                /** use cached statfs data */
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                *osfs = info->mti_mdt->mdt_osfs;
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        }
 
        if (rc == 0)
@@ -459,14 +483,18 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        const struct lu_env     *env = info->mti_env;
        struct md_object        *next = mdt_object_child(o);
        struct lu_buf           *buf = &info->mti_buf;
+       struct mdt_device       *mdt = info->mti_mdt;
        int rc;
 
+       ENTRY;
+
        buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
        buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
                                           RCL_SERVER);
        if (buf->lb_len == 0)
-               return 0;
+               RETURN(0);
 
+again:
        rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
        if (rc < 0) {
                if (rc == -ENODATA) {
@@ -476,17 +504,49 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                } else if (rc == -EOPNOTSUPP) {
                        rc = 0;
                } else {
+                       if (rc == -ERANGE &&
+                           exp_connect_large_acl(info->mti_exp) &&
+                           buf->lb_buf != info->mti_big_acl) {
+                               if (info->mti_big_acl == NULL) {
+                                       OBD_ALLOC_LARGE(info->mti_big_acl,
+                                                       mdt->mdt_max_ea_size);
+                                       if (info->mti_big_acl == NULL) {
+                                               CERROR("%s: unable to grow "
+                                                      DFID" ACL buffer\n",
+                                                      mdt_obd_name(mdt),
+                                                      PFID(mdt_object_fid(o)));
+                                               RETURN(-ENOMEM);
+                                       }
+
+                                       info->mti_big_aclsize =
+                                                       mdt->mdt_max_ea_size;
+                               }
+
+                               CDEBUG(D_INODE, "%s: grow the "DFID
+                                      " ACL buffer to size %d\n",
+                                      mdt_obd_name(mdt),
+                                      PFID(mdt_object_fid(o)),
+                                      mdt->mdt_max_ea_size);
+
+                               buf->lb_buf = info->mti_big_acl;
+                               buf->lb_len = info->mti_big_aclsize;
+
+                               goto again;
+                       }
+
                        CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
+                              mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
+               if (buf->lb_buf == info->mti_big_acl)
+                       info->mti_big_acl_used = 1;
+
                rc = nodemap_map_acl(nodemap, buf->lb_buf,
                                     rc, NODEMAP_FS_TO_CLIENT);
                /* if all ACLs mapped out, rc is still >= 0 */
                if (rc < 0) {
                        CERROR("%s: nodemap_map_acl unable to parse "DFID
-                              " ACL: rc = %d\n", mdt_obd_name(info->mti_mdt),
+                              " ACL: rc = %d\n", mdt_obd_name(mdt),
                               PFID(mdt_object_fid(o)), rc);
                } else {
                        repbody->mbo_aclsize = rc;
@@ -494,16 +554,41 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                        rc = 0;
                }
        }
-       return rc;
+
+       RETURN(rc);
 }
 #endif
 
+/* XXX Look into layout in MDT layer. */
+static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
+{
+       struct lov_comp_md_v1   *comp_v1;
+       struct lov_mds_md       *v1;
+       int                      i;
+
+       if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
+               comp_v1 = (struct lov_comp_md_v1 *)lmm;
+
+               for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+                       v1 = (struct lov_mds_md *)((char *)comp_v1 +
+                               comp_v1->lcm_entries[i].lcme_offset);
+                       /* We don't support partial release for now */
+                       if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
+                               return false;
+               }
+               return true;
+       } else {
+               return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
+                       true : false;
+       }
+}
+
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid)
 {
-       struct md_attr          *ma = &info->mti_attr;
-       struct obd_export       *exp = info->mti_exp;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
+       struct md_attr *ma = &info->mti_attr;
+       struct obd_export *exp = info->mti_exp;
+       struct lu_nodemap *nodemap = NULL;
 
        LASSERT(ma->ma_valid & MA_INODE);
 
@@ -527,6 +612,11 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_nlink = attr->la_nlink;
                b->mbo_valid |= OBD_MD_FLNLINK;
        }
+       if (attr->la_valid & (LA_UID|LA_GID)) {
+               nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       goto out;
+       }
        if (attr->la_valid & LA_UID) {
                b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
                                            NODEMAP_FS_TO_CLIENT,
@@ -539,6 +629,13 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                                            attr->la_gid);
                b->mbo_valid |= OBD_MD_FLGID;
        }
+
+       if (attr->la_valid & LA_PROJID) {
+               /* TODO, nodemap for project id */
+               b->mbo_projid = attr->la_projid;
+               b->mbo_valid |= OBD_MD_FLPROJID;
+       }
+
        b->mbo_mode = attr->la_mode;
        if (attr->la_valid & LA_MODE)
                b->mbo_valid |= OBD_MD_FLMODE;
@@ -548,13 +645,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        if (fid != NULL) {
                b->mbo_fid1 = *fid;
                b->mbo_valid |= OBD_MD_FLID;
-               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid="LPX64"\n",
+               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
                       PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
        }
 
-       if (info != NULL)
-               mdt_body_reverse_idmap(info, b);
-
        if (!(attr->la_valid & LA_TYPE))
                return;
 
@@ -573,7 +667,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                 * b=22272 */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
        } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
-                  ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) {
+                  mdt_hsm_is_released(ma->ma_lmm)) {
                /* A released file stores its size on MDS. */
                /* But return 1 block for released file, unless tools like tar
                 * will consider it fully sparse. (LU-3864)
@@ -588,6 +682,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
                CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
                       PFID(fid), (unsigned long long)b->mbo_size);
+
+out:
+       if (!IS_ERR_OR_NULL(nodemap))
+               nodemap_putref(nodemap);
 }
 
 static inline int mdt_body_has_lov(const struct lu_attr *la,
@@ -897,7 +995,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
 #endif
 out:
        ma->ma_need = need;
-       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
               rc, ma->ma_valid, ma->ma_lmm);
        RETURN(rc);
 }
@@ -915,9 +1013,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        struct mdt_body         *repbody;
        struct lu_buf           *buffer = &info->mti_buf;
        struct obd_export       *exp = info->mti_exp;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
        int                      rc;
-       int                      is_root;
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -984,7 +1080,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CERROR("%s: getattr error for "DFID": rc = %d\n",
+               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
+                      "%s: getattr error for "DFID": rc = %d\n",
                       mdt_obd_name(info->mti_mdt),
                       PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
@@ -998,32 +1095,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-       is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
-
-       /* the Lustre protocol supposes to return default striping
-        * on the user-visible root if explicitly requested */
-       if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
-           (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) {
-               struct lu_fid      rootfid;
-               struct mdt_object *root;
-               struct mdt_device *mdt = info->mti_mdt;
-
-               rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
-               if (rc)
-                       RETURN(rc);
-               root = mdt_object_find(env, mdt, &rootfid);
-               if (IS_ERR(root))
-                       RETURN(PTR_ERR(root));
-               rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV);
-               mdt_object_put(info->mti_env, root);
-               if (unlikely(rc)) {
-                       CERROR("%s: getattr error for "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
-                       RETURN(rc);
-               }
-       }
-
         if (likely(ma->ma_valid & MA_INODE))
                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
         else
@@ -1054,6 +1125,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
                        LASSERT(S_ISDIR(la->la_mode));
+                       mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
@@ -1071,7 +1143,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               PFID(mdt_object_fid(o)), rc);
                        rc = -EFAULT;
                } else {
-                       int print_limit = min_t(int, PAGE_CACHE_SIZE - 128, rc);
+                       int print_limit = min_t(int, PAGE_SIZE - 128, rc);
 
                        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
                                rc -= 2;
@@ -1105,25 +1177,16 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                       repbody->mbo_max_mdsize);
        }
 
-       if (exp_connect_rmtclient(info->mti_exp) &&
-           reqbody->mbo_valid & OBD_MD_FLRMTPERM) {
-               void *buf = req_capsule_server_get(pill, &RMF_ACL);
-
-               /* mdt_getattr_lock only */
-               rc = mdt_pack_remote_perm(info, o, buf);
-               if (rc) {
-                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = 0;
-                       RETURN(rc);
-               } else {
-                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
-               }
-       }
 #ifdef CONFIG_FS_POSIX_ACL
-       else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
-                (reqbody->mbo_valid & OBD_MD_FLACL))
+       if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
+                (reqbody->mbo_valid & OBD_MD_FLACL)) {
+               struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+
                rc = mdt_pack_acl2body(info, repbody, o, nodemap);
+               nodemap_putref(nodemap);
+       }
 #endif
 
 out:
@@ -1179,6 +1242,12 @@ static int mdt_getattr(struct tgt_session_info *tsi)
 
        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
        rc = req_capsule_server_pack(pill);
        if (unlikely(rc != 0))
                GOTO(out, rc = err_serious(rc));
@@ -1188,18 +1257,13 @@ static int mdt_getattr(struct tgt_session_info *tsi)
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
-       if (reqbody->mbo_valid & OBD_MD_FLRMTPERM)
-               rc = mdt_init_ucred(info, reqbody);
-       else
-               rc = mdt_check_ucred(info);
+       rc = mdt_check_ucred(info);
        if (unlikely(rc))
                GOTO(out_shrink, rc);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
 
        rc = mdt_getattr_internal(info, obj, 0);
-       if (reqbody->mbo_valid & OBD_MD_FLRMTPERM)
-                mdt_exit_ucred(info);
         EXIT;
 out_shrink:
         mdt_client_compatibility(info);
@@ -1212,6 +1276,60 @@ out:
 }
 
 /**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param[in] info     thread environment
+ * \param[in] obj      object
+ * \param[in] layout   layout intent
+ * \param[in] buf      buffer containing client's lovea, could be empty
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+                            struct mdt_object *obj,
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       int rc;
+       ENTRY;
+
+       CDEBUG(D_INFO, "got layout change request from client: "
+              "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+              layout->li_opc, layout->li_flags,
+              layout->li_start, layout->li_end);
+       if (layout->li_start >= layout->li_end) {
+               CERROR("Recieved an invalid layout change range [%llu, %llu) "
+                      "for "DFID"\n", layout->li_start, layout->li_end,
+                      PFID(mdt_object_fid(obj)));
+               RETURN(-EINVAL);
+       }
+
+       if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+               GOTO(out, rc = -EINVAL);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+                          MAY_WRITE);
+       if (rc)
+               GOTO(out, rc);
+
+       /* take layout lock to prepare layout change */
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, obj, lh,
+                            MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+                             buf);
+
+       mdt_object_unlock(info, obj, lh, 1);
+out:
+       RETURN(rc);
+}
+
+/**
  * Exchange MOF_LOV_CREATED flags between two objects after a
  * layout swap. No assumption is made on whether o1 or o2 have
  * created objects or not.
@@ -1221,17 +1339,13 @@ out:
  */
 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
 {
-       __u64   o1_flags;
+       unsigned int o1_lov_created = o1->mot_lov_created;
 
        mutex_lock(&o1->mot_lov_mutex);
        mutex_lock(&o2->mot_lov_mutex);
 
-       o1_flags = o1->mot_flags;
-       o1->mot_flags = (o1->mot_flags & ~MOF_LOV_CREATED) |
-                       (o2->mot_flags & MOF_LOV_CREATED);
-
-       o2->mot_flags = (o2->mot_flags & ~MOF_LOV_CREATED) |
-                       (o1_flags & MOF_LOV_CREATED);
+       o1->mot_lov_created = o2->mot_lov_created;
+       o2->mot_lov_created = o1_lov_created;
 
        mutex_unlock(&o2->mot_lov_mutex);
        mutex_unlock(&o1->mot_lov_mutex);
@@ -1759,7 +1873,7 @@ static int mdt_readpage(struct tgt_session_info *tsi)
          */
        rdpg->rp_hash = reqbody->mbo_size;
        if (rdpg->rp_hash != reqbody->mbo_size) {
-               CERROR("Invalid hash: "LPX64" != "LPX64"\n",
+               CERROR("Invalid hash: %#llx != %#llx\n",
                       rdpg->rp_hash, reqbody->mbo_size);
                RETURN(-EFAULT);
        }
@@ -1769,14 +1883,14 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                rdpg->rp_attrs |= LUDA_64BITHASH;
        rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
                                exp_max_brw_size(tsi->tsi_exp));
-       rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE - 1) >>
-                         PAGE_CACHE_SHIFT;
+       rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
+                         PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
 
         for (i = 0; i < rdpg->rp_npages; ++i) {
-               rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
+               rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
                 if (rdpg->rp_pages[i] == NULL)
                         GOTO(free_rdpg, rc = -ENOMEM);
         }
@@ -1803,6 +1917,26 @@ free_rdpg:
        return rc;
 }
 
+static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
+{
+       struct lu_ucred *uc = mdt_ucred_check(info);
+       struct lu_attr *attr = &info->mti_attr.ma_attr;
+
+       if (uc == NULL)
+               return -EINVAL;
+
+       if (op != REINT_SETATTR) {
+               if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
+                       attr->la_uid = uc->uc_fsuid;
+               /* for S_ISGID, inherit gid from his parent, such work will be
+                * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
+               if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
+                       attr->la_gid = uc->uc_fsgid;
+       }
+
+       return 0;
+}
+
 static int mdt_reint_internal(struct mdt_thread_info *info,
                               struct mdt_lock_handle *lhc,
                               __u32 op)
@@ -1828,6 +1962,13 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+               req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                    LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
         rc = req_capsule_server_pack(pill);
         if (rc != 0) {
                 CERROR("Can't pack response, rc %d\n", rc);
@@ -2021,7 +2162,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        int                      id, rc;
        struct mdt_device       *mdt = mdt_exp2dev(exp);
        struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
+       struct lu_nodemap       *nodemap;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
@@ -2032,65 +2173,61 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (rc)
                RETURN(err_serious(rc));
 
+       nodemap = nodemap_get_from_exp(exp);
+       if (IS_ERR(nodemap))
+               RETURN(PTR_ERR(nodemap));
+
        switch (oqctl->qc_cmd) {
                /* master quotactl */
        case Q_SETINFO:
        case Q_SETQUOTA:
                if (!nodemap_can_setquota(nodemap))
-                       RETURN(-EPERM);
+                       GOTO(out_nodemap, rc = -EPERM);
        case Q_GETINFO:
        case Q_GETQUOTA:
                if (qmt == NULL)
-                       RETURN(-EOPNOTSUPP);
+                       GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
        case Q_GETOINFO:
        case Q_GETOQUOTA:
                break;
        default:
                CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               RETURN(-EFAULT);
+               GOTO(out_nodemap, rc = -EFAULT);
        }
 
-       /* map uid/gid for remote client */
        id = oqctl->qc_id;
-       if (exp_connect_rmtclient(exp)) {
-               struct lustre_idmap_table *idmap;
-
-               idmap = exp->exp_mdt_data.med_idmap;
-
-               if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
-                            oqctl->qc_cmd != Q_GETINFO))
-                       RETURN(-EPERM);
-
-               if (oqctl->qc_type == USRQUOTA)
-                       id = lustre_idmap_lookup_uid(NULL, idmap, 0,
-                                                    oqctl->qc_id);
-               else if (oqctl->qc_type == GRPQUOTA)
-                       id = lustre_idmap_lookup_gid(NULL, idmap, 0,
-                                                    oqctl->qc_id);
-               else
-                       RETURN(-EINVAL);
-
-               if (id == CFS_IDMAP_NOTFOUND) {
-                       CDEBUG(D_QUOTA, "no mapping for id %u\n", oqctl->qc_id);
-                       RETURN(-EACCES);
-               }
-       }
-
-       if (oqctl->qc_type == USRQUOTA)
+       switch (oqctl->qc_type) {
+       case USRQUOTA:
                id = nodemap_map_id(nodemap, NODEMAP_UID,
                                    NODEMAP_CLIENT_TO_FS, id);
-       else if (oqctl->qc_type == GRPQUOTA)
-               id = nodemap_map_id(nodemap, NODEMAP_UID,
+               break;
+       case GRPQUOTA:
+               id = nodemap_map_id(nodemap, NODEMAP_GID,
                                    NODEMAP_CLIENT_TO_FS, id);
-
+               break;
+       case PRJQUOTA:
+               /* todo: check/map project id */
+               id = oqctl->qc_id;
+               break;
+       default:
+               GOTO(out_nodemap, rc = -EOPNOTSUPP);
+       }
        repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
        if (repoqc == NULL)
-               RETURN(err_serious(-EFAULT));
+               GOTO(out_nodemap, rc = err_serious(-EFAULT));
+
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
+               barrier_exit(tsi->tsi_tgt->lut_bottom);
 
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
+               if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
+                       RETURN(-EINPROGRESS);
+       }
+
        switch (oqctl->qc_cmd) {
 
        case Q_GETINFO:
@@ -2110,14 +2247,20 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
 
        default:
                CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               RETURN(-EFAULT);
+               GOTO(out_nodemap, rc = -EFAULT);
        }
 
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
        *repoqc = *oqctl;
-       RETURN(rc);
+
+       EXIT;
+
+out_nodemap:
+       nodemap_putref(nodemap);
+
+       return rc;
 }
 
 /** clone llog ctxt from child (mdd)
@@ -2169,21 +2312,9 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env,
  */
 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
 {
-       int rc;
-
-       rc = mdt_handle_idmap(tsi);
-       if (unlikely(rc)) {
-               struct ptlrpc_request   *req = tgt_ses_req(tsi);
-               __u32                    opc;
-
-               opc = lustre_msg_get_opc(req->rq_reqmsg);
-               if (opc == SEC_CTX_INIT || opc == SEC_CTX_INIT_CONT)
-                       sptlrpc_svc_ctx_invalidate(req);
-       }
-
        CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
 
-       return rc;
+       return 0;
 }
 
 /*
@@ -2306,49 +2437,66 @@ static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
  * \see ldlm_blocking_ast_nocheck
  */
 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
+                    void *data, int flag)
 {
-        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        int rc;
-        ENTRY;
+       struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool commit_async = false;
+       int rc;
+       ENTRY;
 
-        if (flag == LDLM_CB_CANCELING)
-                RETURN(0);
+       if (flag == LDLM_CB_CANCELING)
+               RETURN(0);
 
-        lock_res_and_lock(lock);
-        if (lock->l_blocking_ast != mdt_blocking_ast) {
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
+       lock_res_and_lock(lock);
+       if (lock->l_blocking_ast != mdt_blocking_ast) {
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
+       /* There is no lock conflict if l_blocking_lock == NULL,
+        * it indicates a blocking ast sent from ldlm_lock_decref_internal
+        * when the last reference to a local lock was released */
        if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
            lock->l_blocking_lock != NULL) {
-               if (mdt_cos_is_enabled(mdt) &&
-                   lock->l_client_cookie !=
-                   lock->l_blocking_lock->l_client_cookie)
-                       mdt_set_lock_sync(lock);
-               else if (mdt_slc_is_enabled(mdt) &&
-                        ldlm_is_cos_incompat(lock->l_blocking_lock))
+               if (mdt_cos_is_enabled(mdt)) {
+                       if (lock->l_client_cookie !=
+                           lock->l_blocking_lock->l_client_cookie)
+                               mdt_set_lock_sync(lock);
+               else if (mdt_slc_is_enabled(mdt) &&
+                          ldlm_is_cos_incompat(lock->l_blocking_lock)) {
                        mdt_set_lock_sync(lock);
+                       /*
+                        * we may do extra commit here, but there is a small
+                        * window to miss a commit: lock was unlocked (saved),
+                        * then a conflict lock queued and we come here, but
+                        * REP-ACK not received, so lock was not converted to
+                        * COS mode yet.
+                        * Fortunately this window is quite small, so the
+                        * extra commit should be rare (not to say distributed
+                        * operation is rare too).
+                        */
+                       commit_async = true;
+               }
+       } else if (lock->l_req_mode == LCK_COS &&
+                  lock->l_blocking_lock != NULL) {
+               commit_async = true;
        }
-        rc = ldlm_blocking_ast_nocheck(lock);
 
-        /* There is no lock conflict if l_blocking_lock == NULL,
-         * it indicates a blocking ast sent from ldlm_lock_decref_internal
-         * when the last reference to a local lock was released */
-        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
-                struct lu_env env;
+       rc = ldlm_blocking_ast_nocheck(lock);
+
+       if (commit_async) {
+               struct lu_env env;
 
                rc = lu_env_init(&env, LCT_LOCAL);
                if (unlikely(rc != 0))
                        CWARN("%s: lu_env initialization failed, cannot "
                              "start asynchronous commit: rc = %d\n",
                              obd->obd_name, rc);
-                else
-                        mdt_device_commit_async(&env, mdt);
-                lu_env_fini(&env);
-        }
-        RETURN(rc);
+               else
+                       mdt_device_commit_async(&env, mdt);
+               lu_env_fini(&env);
+       }
+       RETURN(rc);
 }
 
 /*
@@ -2366,12 +2514,13 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
-       struct lustre_handle lockh;
-       int               rc;
+       int rc = 0;
        ENTRY;
 
        switch (flag) {
-       case LDLM_CB_BLOCKING:
+       case LDLM_CB_BLOCKING: {
+               struct lustre_handle lockh;
+
                ldlm_lock2handle(lock, &lockh);
                rc = ldlm_cli_cancel(&lockh,
                        ldlm_is_atomic_cb(lock) ? 0 : LCF_ASYNC);
@@ -2380,17 +2529,47 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                        RETURN(rc);
                }
                break;
-       case LDLM_CB_CANCELING:
-               LDLM_DEBUG(lock, "Revoke remote lock");
+       }
+       case LDLM_CB_CANCELING: {
+               struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+               struct mdt_device *mdt =
+                               mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
+
+               LDLM_DEBUG(lock, "Revoke remote lock\n");
+
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
-               tgt_discard_slc_lock(lock);
+               tgt_discard_slc_lock(&mdt->mdt_lut, lock);
+
+               /* once we cache lock, l_ast_data is set to mdt_object */
+               if (lock->l_ast_data != NULL) {
+                       struct mdt_object *mo = lock->l_ast_data;
+                       struct lu_env env;
+
+                       rc = lu_env_init(&env, LCT_MD_THREAD);
+                       if (unlikely(rc != 0)) {
+                               CWARN("%s: lu_env initialization failed, object"
+                                     "%p "DFID" is leaked!\n",
+                                     obd->obd_name, mo,
+                                     PFID(mdt_object_fid(mo)));
+                               RETURN(rc);
+                       }
+
+                       if (lock->l_policy_data.l_inodebits.bits &
+                           (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)) {
+                               rc = mo_invalidate(&env, mdt_object_child(mo));
+                               mo->mot_cache_attr = 0;
+                       }
+                       mdt_object_put(&env, mo);
+                       lu_env_fini(&env);
+               }
                break;
+       }
        default:
                LBUG();
        }
 
-       RETURN(0);
+       RETURN(rc);
 }
 
 int mdt_check_resent_lock(struct mdt_thread_info *info,
@@ -2408,7 +2587,7 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
                        /* Lock is pinned by ldlm_handle_enqueue0() as it is
                         * a resend case, however, it could be already destroyed
                         * due to client eviction or a raced cancel RPC. */
-                       LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64,
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
                                          lhc->mlh_reg_lh.cookie);
                        RETURN(-ESTALE);
                }
@@ -2430,7 +2609,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
 
 int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
                           const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool nonblock)
+                          enum ldlm_mode mode, __u64 ibits, bool nonblock,
+                          bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
        union ldlm_policy_data *policy = &mti->mti_policy;
@@ -2451,12 +2631,24 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
        einfo->ei_res_id = res_id;
        if (nonblock)
                einfo->ei_nonblock = 1;
+       if (cache) {
+               /*
+                * if we cache lock, couple lock with mdt_object, so that object
+                * can be easily found in lock ASTs.
+                */
+               mdt_object_get(mti->mti_env, o);
+               einfo->ei_cbdata = o;
+       }
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
        rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
                            policy);
+       if (rc < 0 && cache) {
+               mdt_object_put(mti->mti_env, o);
+               einfo->ei_cbdata = NULL;
+       }
        RETURN(rc);
 }
 
@@ -2605,7 +2797,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
                                            &lh->mlh_rreg_lh,
                                            lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE, nonblock);
+                                           MDS_INODELOCK_UPDATE, nonblock,
+                                           false);
                if (rc != ELDLM_OK) {
                        if (local_lh != NULL)
                                mdt_object_unlock(info, o, local_lh, rc);
@@ -2685,25 +2878,24 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       int cos;
-
-                       cos = (mdt_cos_is_enabled(mdt) ||
-                              mdt_slc_is_enabled(mdt));
+                       bool cos = mdt_cos_is_enabled(mdt);
+                       bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
 
-                       LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                       LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
 
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
-                               CDEBUG(D_HA, "request = %p reply state = %p"
-                                      " transno = "LPD64"\n", req,
-                                      req->rq_reply_state, req->rq_transno);
+                               LDLM_DEBUG(lock, "save lock request %p reply "
+                                       "state %p transno %lld\n", req,
+                                       req->rq_reply_state, req->rq_transno);
                                if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, cos);
+                               ptlrpc_save_lock(req, h, mode, cos,
+                                                convert_lock);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
@@ -2733,21 +2925,29 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
  * \param decref force immediate lock releasing
  */
 static void mdt_save_remote_lock(struct mdt_thread_info *info,
-                                struct lustre_handle *h, enum ldlm_mode mode,
-                                int decref)
+                                struct mdt_object *o, struct lustre_handle *h,
+                                enum ldlm_mode mode, int decref)
 {
        ENTRY;
 
        if (lustre_handle_is_used(h)) {
+               struct ldlm_lock *lock = ldlm_handle2lock(h);
+
+               if (o != NULL &&
+                   (lock->l_policy_data.l_inodebits.bits &
+                    (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
+                       mo_invalidate(info->mti_env, mdt_object_child(o));
+
                if (decref || !info->mti_has_trans ||
                    !(mode & (LCK_PW | LCK_EX))) {
                        ldlm_lock_decref_and_cancel(h, mode);
+                       LDLM_LOCK_PUT(lock);
                } else {
-                       struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
-                       tgt_save_slc_lock(lock, req->rq_transno);
+                       tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
+                                         req->rq_transno);
                        ldlm_lock_decref(h, mode);
                }
                h->cookie = 0ull;
@@ -2767,6 +2967,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
  * \param o mdt object
  * \param lh mdt lock handle referencing regular and PDO locks
  * \param decref force immediate lock releasing
+ *
+ * XXX o is not used and may be NULL, see hsm_cdt_request_completed().
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                       struct mdt_lock_handle *lh, int decref)
@@ -2775,7 +2977,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 
        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
-       mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref);
+       mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode,
+                            decref);
 
        EXIT;
 }
@@ -2880,6 +3083,13 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
 
+               /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+                * by default. If the target object has more ACL entries, then
+                * enlarge the buffer when necessary. */
+               if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
                 rc = req_capsule_server_pack(pill);
         }
         RETURN(rc);
@@ -2936,6 +3146,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_cross_ref = 0;
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
+       info->mti_big_acl_used = 0;
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -2985,11 +3196,6 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi)
 
 static int mdt_tgt_connect(struct tgt_session_info *tsi)
 {
-       struct ptlrpc_request   *req = tgt_ses_req(tsi);
-       int                      rc;
-
-       ENTRY;
-
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
            cfs_fail_val ==
            tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
@@ -2997,17 +3203,7 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi)
                schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC));
        }
 
-       rc = tgt_connect(tsi);
-       if (rc != 0)
-               RETURN(rc);
-
-       rc = mdt_init_idmap(tsi);
-       if (rc != 0)
-               GOTO(err, rc);
-       RETURN(0);
-err:
-       obd_disconnect(class_export_get(req->rq_export));
-       return rc;
+       return tgt_connect(tsi);
 }
 
 enum mdt_it_code {
@@ -3139,14 +3335,14 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
                /* Lock is pinned by ldlm_handle_enqueue0() as it is
                 * a resend case, however, it could be already destroyed
                 * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64"\n",
+               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
                                  lh->mlh_reg_lh.cookie);
                lh->mlh_reg_lh.cookie = 0;
                RETURN(-ESTALE);
        }
 
        LASSERTF(new_lock != NULL,
-                "lockh "LPX64" flags "LPX64" rc %d\n",
+                "lockh %#llx flags %#llx : rc = %d\n",
                 lh->mlh_reg_lh.cookie, flags, result);
 
         /*
@@ -3233,7 +3429,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                lh->mlh_reg_mode = new_lock->l_granted_mode;
 
                LDLM_DEBUG(new_lock, "Restoring lock cookie");
-               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie %#llx",
                          lh->mlh_reg_lh.cookie);
                return;
        }
@@ -3252,7 +3448,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx",
                  dlmreq->lock_handle[0].cookie);
 }
 
@@ -3263,7 +3459,8 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 {
        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
        struct ldlm_reply      *ldlm_rep = NULL;
-       int rc, grc;
+       int rc;
+       ENTRY;
 
        /*
         * Initialize lhc->mlh_reg_lh either from a previously granted lock
@@ -3279,18 +3476,30 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
                        return rc;
        }
 
-       grc = mdt_getxattr(info);
-
-       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, 0);
+       rc = mdt_getxattr(info);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
-       if (ldlm_rep == NULL)
+
+       if (ldlm_rep == NULL ||
+           OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
+               mdt_object_unlock(info,  info->mti_object, lhc, 1);
                RETURN(err_serious(-EFAULT));
+       }
 
-       ldlm_rep->lock_policy_res2 = grc;
+       ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-       return rc;
+       /* This is left for interop instead of adding a new interop flag.
+        * LU-7433 */
+#if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0)
+       if (ldlm_rep->lock_policy_res2) {
+               mdt_object_unlock(info, info->mti_object, lhc, 1);
+               RETURN(ELDLM_LOCK_ABORTED);
+       }
+#endif
+
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+       RETURN(rc);
 }
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
@@ -3371,6 +3580,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
+       bool layout_change = false;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
@@ -3385,11 +3595,29 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (layout == NULL)
                RETURN(-EPROTO);
 
-       if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+       switch (layout->li_opc) {
+       case LAYOUT_INTENT_TRUNC:
+       case LAYOUT_INTENT_WRITE:
+               layout_change = true;
+               break;
+       case LAYOUT_INTENT_ACCESS:
+               break;
+       case LAYOUT_INTENT_READ:
+       case LAYOUT_INTENT_GLIMPSE:
+       case LAYOUT_INTENT_RELEASE:
+       case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), layout->li_opc);
-               RETURN(-EINVAL);
+               rc = -ENOTSUPP;
+               break;
+       default:
+               CERROR("%s: Unknown layout intent opc %d\n",
+                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+               rc = -EINVAL;
+               break;
        }
+       if (rc < 0)
+               RETURN(rc);
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
@@ -3410,12 +3638,67 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                        info->mti_mdt->mdt_max_mdsize = layout_size;
        }
 
+       /*
+        * set reply buffer size, so that ldlm_handle_enqueue0()->
+        * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+        */
        (*lockp)->l_lvb_type = LVB_T_LAYOUT;
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
-       GOTO(out_obj, rc);
+       if (rc)
+               GOTO(out_obj, rc);
+
+
+       if (layout_change) {
+               struct lu_buf *buf = &info->mti_buf;
+
+               /**
+                * mdt_layout_change is a reint operation, when the request
+                * is resent, layout write shouldn't reprocess it again.
+                */
+               rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+               if (rc)
+                       GOTO(out_obj, rc = rc < 0 ? rc : 0);
+
+               /**
+                * There is another resent case: the client's job has been
+                * done by another client, referring lod_declare_layout_change
+                * -EALREADY case, and it became a operation w/o transaction,
+                * so we should not do the layout change, otherwise
+                * mdt_layout_change() will try to cancel the granted server
+                * CR lock whose remote counterpart is still in hold on the
+                * client, and a deadlock ensues.
+                */
+               rc = mdt_check_resent_lock(info, obj, lhc);
+               if (rc <= 0)
+                       GOTO(out_obj, rc);
+
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               if (unlikely(req_is_replay(mdt_info_req(info)))) {
+                       buf->lb_buf = req_capsule_client_get(info->mti_pill,
+                                       &RMF_EADATA);
+                       buf->lb_len = req_capsule_get_size(info->mti_pill,
+                                       &RMF_EADATA, RCL_CLIENT);
+                       /*
+                        * If it's a replay of layout write intent RPC, the
+                        * client has saved the extended lovea when
+                        * it get reply then.
+                        */
+                       if (buf->lb_len > 0)
+                               mdt_fix_lov_magic(info, buf->lb_buf);
+               }
 
+               /*
+                * Instantiate some layout components, if @buf contains
+                * lovea, then it's a replay of the layout intent write
+                * RPC.
+                */
+               rc = mdt_layout_change(info, obj, layout, buf);
+               if (rc)
+                       GOTO(out_obj, rc);
+       }
 out_obj:
        mdt_object_put(info->mti_env, obj);
 
@@ -3572,6 +3855,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
                if (qmt == NULL)
                        RETURN(-EOPNOTSUPP);
 
+               if (mdt_rdonly(req->rq_export))
+                       RETURN(-EROFS);
+
                (*lockp)->l_lvb_type = LVB_T_LQUOTA;
                /* pass the request to quota master */
                rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
@@ -3588,8 +3874,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
        if (rc < 0)
                RETURN(rc);
 
-       if (flv->it_flags & MUTABOR &&
-           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+       if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
        if (flv->it_act != NULL) {
@@ -4446,53 +4731,64 @@ static struct tgt_opc_slice mdt_common_slice[] = {
 
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
-       struct md_device        *next = m->mdt_child;
-       struct lu_device        *d    = &m->mdt_lu_dev;
-       struct obd_device       *obd  = mdt2obd_dev(m);
-       struct lfsck_stop        stop;
-       ENTRY;
+       struct md_device *next = m->mdt_child;
+       struct lu_device *d = &m->mdt_lu_dev;
+       struct obd_device *obd = mdt2obd_dev(m);
+       struct lfsck_stop stop;
 
+       ENTRY;
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
-       target_recovery_fini(obd);
        ping_evictor_stop();
 
        if (m->mdt_opts.mo_coordinator)
                mdt_hsm_cdt_stop(m);
 
-       mdt_hsm_cdt_fini(m);
-
        mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
-        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+       mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
 
        if (m->mdt_namespace != NULL)
                ldlm_namespace_free_prior(m->mdt_namespace, NULL,
                                          d->ld_obd->obd_force);
 
-        obd_exports_barrier(obd);
-        obd_zombie_barrier();
+       obd_exports_barrier(obd);
+       obd_zombie_barrier();
+
+       mdt_quota_fini(env, m);
+
+       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+
+       /* Calling the cleanup functions in the same order as in the mdt_init0
+        * error path
+        */
+       mdt_procfs_fini(m);
+
+       target_recovery_fini(obd);
+       upcall_cache_cleanup(m->mdt_identity_cache);
+       m->mdt_identity_cache = NULL;
+
+       mdt_fs_cleanup(env, m);
 
-        mdt_procfs_fini(m);
+       tgt_fini(env, &m->mdt_lut);
 
-        tgt_fini(env, &m->mdt_lut);
-        mdt_fs_cleanup(env, m);
-        upcall_cache_cleanup(m->mdt_identity_cache);
-        m->mdt_identity_cache = NULL;
+       mdt_hsm_cdt_fini(m);
 
        if (m->mdt_namespace != NULL) {
                ldlm_namespace_free_post(m->mdt_namespace);
                d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
        }
 
-       mdt_quota_fini(env, m);
+       if (m->mdt_md_root != NULL) {
+               mdt_object_put(env, m->mdt_md_root);
+               m->mdt_md_root = NULL;
+       }
 
-       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+       mdt_seq_fini(env, m);
 
-        mdt_seq_fini(env, m);
-        mdt_fld_fini(env, m);
+       mdt_fld_fini(env, m);
 
        /*
         * Finish the stack
@@ -4572,7 +4868,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        init_rwsem(&m->mdt_squash.rsi_sem);
-       spin_lock_init(&m->mdt_osfs_lock);
+       spin_lock_init(&m->mdt_lock);
        m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
        m->mdt_enable_remote_dir_gid = 0;
@@ -4658,7 +4954,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_tgt, rc);
 
-       tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
+       tgt_adapt_sptlrpc_conf(&m->mdt_lut);
 
         next = m->mdt_child;
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
@@ -4855,6 +5151,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                spin_lock_init(&mo->mot_write_lock);
                mutex_init(&mo->mot_lov_mutex);
                init_rwsem(&mo->mot_open_sem);
+               atomic_set(&mo->mot_open_count, 0);
                RETURN(o);
        }
        RETURN(NULL);
@@ -4908,8 +5205,10 @@ static int mdt_object_print(const struct lu_env *env, void *cookie,
        struct mdt_object *mdto = mdt_obj((struct lu_object *)o);
 
        return (*p)(env, cookie,
-                   LUSTRE_MDT_NAME"-object@%p(flags=%d, writecount=%d)",
-                   mdto, mdto->mot_flags, mdto->mot_write_count);
+                   LUSTRE_MDT_NAME"-object@%p(%s %s, writecount=%d)",
+                   mdto, mdto->mot_lov_created ? "lov_created" : "",
+                   mdto->mot_cache_attr ? "cache_attr" : "",
+                   mdto->mot_write_count);
 }
 
 static int mdt_prepare(const struct lu_env *env,
@@ -4987,7 +5286,7 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        ENTRY;
 
        if (KEY_IS(KEY_SPTLRPC_CONF)) {
-               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp));
                RETURN(rc);
        }
 
@@ -5007,6 +5306,10 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
  * connect flags from the obd_connect_data::ocd_connect_flags field of the
  * reply. \see mdt_connect().
  *
+ * Before 2.7.50 clients will send a struct obd_connect_data_v1 rather than a
+ * full struct obd_connect_data. So care must be taken when accessing fields
+ * that are not present in struct obd_connect_data_v1. See LU-16.
+ *
  * \param exp   the obd_export associated with this client/target pair
  * \param mdt   the target device for the connection
  * \param data  stores data for this connect request
@@ -5022,12 +5325,21 @@ static int mdt_connect_internal(struct obd_export *exp,
        LASSERT(data != NULL);
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
+
+       if (mdt->mdt_bottom->dd_rdonly &&
+           !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_RDONLY))
+               RETURN(-EACCES);
+
+       if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
+               data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
+
        data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
        if (!(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
            !(data->ocd_connect_flags & OBD_CONNECT_IBITS)) {
                CWARN("%s: client %s does not support ibits lock, either "
-                     "very old or an invalid client: flags "LPX64"\n",
+                     "very old or an invalid client: flags %#llx\n",
                      mdt_obd_name(mdt), exp->exp_client_uuid.uuid,
                      data->ocd_connect_flags);
                return -EBADE;
@@ -5039,16 +5351,13 @@ static int mdt_connect_internal(struct obd_export *exp,
        if (!mdt->mdt_opts.mo_user_xattr)
                data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
 
-       if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
-               data->ocd_brw_size = min(data->ocd_brw_size,
-                                        (__u32)MD_MAX_BRW_SIZE);
+       if (OCD_HAS_FLAG(data, BRW_SIZE)) {
+               data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE);
                if (data->ocd_brw_size == 0) {
-                       CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
-                              " ocd_version: %x ocd_grant: %d "
-                              "ocd_index: %u ocd_brw_size is "
-                              "unexpectedly zero, network data "
-                              "corruption? Refusing connection of this"
-                              " client\n",
+                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
+                              "ocd_version: %x ocd_grant: %d ocd_index: %u "
+                              "ocd_brw_size unexpectedly zero, network data "
+                              "corruption? Refusing to connect this client\n",
                               mdt_obd_name(mdt),
                               exp->exp_client_uuid.uuid,
                               exp, data->ocd_connect_flags, data->ocd_version,
@@ -5193,7 +5502,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_obd->obd_fail) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -5202,9 +5511,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
         }
         info->mti_mdt = NULL;
         /* cleanup client slot early */
-        /* Do not erase record for recoverable client. */
-        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
-               tgt_client_del(&env, exp);
+       tgt_client_del(&env, exp);
         lu_env_fini(&env);
 
         RETURN(rc);
@@ -5263,6 +5570,8 @@ static int mdt_obd_connect(const struct lu_env *env,
        ENTRY;
 
        LASSERT(env != NULL);
+       LASSERT(data != NULL);
+
        if (!exp || !obd || !cluuid)
                RETURN(-EINVAL);
 
@@ -5279,7 +5588,7 @@ static int mdt_obd_connect(const struct lu_env *env,
         * XXX: probably not very appropriate method is used now
         *      at some point we should find a better one
         */
-       if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL &&
+       if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) &&
            !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) &&
            !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
                rc = obd_get_info(env, mdt->mdt_child_exp,
@@ -5365,8 +5674,6 @@ static int mdt_init_export(struct obd_export *exp)
 
        INIT_LIST_HEAD(&med->med_open_head);
        spin_lock_init(&med->med_open_lock);
-       mutex_init(&med->med_idmap_mutex);
-       med->med_idmap = NULL;
        spin_lock(&exp->exp_lock);
        exp->exp_connecting = 1;
        spin_unlock(&exp->exp_lock);
@@ -5398,9 +5705,6 @@ static int mdt_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
-        if (exp_connect_rmtclient(exp))
-                mdt_cleanup_idmap(&exp->exp_mdt_data);
-
         target_destroy_export(exp);
         /* destroy can be called from failed obd_setup, so
          * checking uuid is safer than obd_self_export */
@@ -5445,7 +5749,7 @@ int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
        if (rc < 0)
                return rc;
 
-       return linkea_init(ldata);
+       return linkea_init_with_rec(ldata);
 }
 
 /**
@@ -5499,9 +5803,14 @@ static int mdt_path_current(struct mdt_thread_info *info,
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
 
-               mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
-               if (IS_ERR(mdt_obj))
-                       GOTO(out, rc = PTR_ERR(mdt_obj));
+               if (lu_fid_eq(mdt_object_fid(obj), tmpfid)) {
+                       mdt_obj = obj;
+                       mdt_object_get(info->mti_env, mdt_obj);
+               } else {
+                       mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
+                       if (IS_ERR(mdt_obj))
+                               GOTO(out, rc = PTR_ERR(mdt_obj));
+               }
 
                if (!mdt_object_exists(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
@@ -5647,16 +5956,16 @@ static int mdt_fid2path(struct mdt_thread_info *info,
        int    rc;
        ENTRY;
 
-       CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
+       CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
                PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
 
        if (!fid_is_sane(&fp->gf_fid))
                RETURN(-EINVAL);
 
        if (!fid_is_namespace_visible(&fp->gf_fid)) {
-               CWARN("%s: "DFID" is invalid, sequence should be "
-                     ">= "LPX64"\n", mdt_obd_name(mdt),
-                     PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
+               CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx"
+                      ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt),
+                      PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
                RETURN(-EINVAL);
        }
 
@@ -5684,7 +5993,7 @@ static int mdt_fid2path(struct mdt_thread_info *info,
 
        rc = mdt_path(info, obj, fp, root_fid);
 
-       CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
+       CDEBUG(D_INFO, "fid "DFID", path %s recno %#llx linkno %u\n",
               PFID(&fp->gf_fid), fp->gf_u.gf_path,
               fp->gf_recno, fp->gf_linkno);
 
@@ -5774,30 +6083,6 @@ int mdt_get_info(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
-/* Pass the ioc down */
-static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
-                        unsigned int cmd, int len, void *data)
-{
-       struct lu_context ioctl_session;
-       struct md_device *next = mdt->mdt_child;
-       int rc;
-       ENTRY;
-
-        rc = lu_context_init(&ioctl_session, LCT_SERVER_SESSION);
-       if (rc)
-               RETURN(rc);
-       ioctl_session.lc_thread = (struct ptlrpc_thread *)current;
-       lu_context_enter(&ioctl_session);
-       env->le_ses = &ioctl_session;
-
-       LASSERT(next->md_ops->mdo_iocontrol);
-       rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
-
-       lu_context_exit(&ioctl_session);
-       lu_context_fini(&ioctl_session);
-       RETURN(rc);
-}
-
 static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 {
        struct obd_ioctl_data *data = karg;
@@ -5861,13 +6146,15 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         if (rc)
                 RETURN(rc);
 
-        switch (cmd) {
-        case OBD_IOC_SYNC:
-                rc = mdt_device_sync(&env, mdt);
-                break;
-        case OBD_IOC_SET_READONLY:
-                rc = dt->dd_ops->dt_ro(&env, dt);
-                break;
+       switch (cmd) {
+       case OBD_IOC_SYNC:
+               rc = mdt_device_sync(&env, mdt);
+               break;
+       case OBD_IOC_SET_READONLY:
+               rc = dt_sync(&env, dt);
+               if (rc == 0)
+                       rc = dt_ro(&env, dt);
+               break;
        case OBD_IOC_ABORT_RECOVERY:
                CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
                obd->obd_abort_recovery = 1;
@@ -5877,7 +6164,9 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
-                rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+               rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
+                                                          mdt->mdt_child,
+                                                          cmd, len, karg);
                 break;
        case OBD_IOC_START_LFSCK: {
                struct md_device *next = mdt->mdt_child;
@@ -5954,7 +6243,7 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
        int rc;
        ENTRY;
 
-       if (!mdt->mdt_skip_lfsck) {
+       if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
                struct lfsck_start_param lsp;
 
                lsp.lsp_start = NULL;
@@ -6054,6 +6343,13 @@ static void mdt_key_fini(const struct lu_context *ctx,
                info->mti_big_lmm = NULL;
                info->mti_big_lmmsize = 0;
        }
+
+       if (info->mti_big_acl) {
+               OBD_FREE_LARGE(info->mti_big_acl, info->mti_big_aclsize);
+               info->mti_big_acl = NULL;
+               info->mti_big_aclsize = 0;
+       }
+
        OBD_FREE_PTR(info);
 }