Whamcloud - gitweb
LU-8359 ldlm: Wrong evict during failover
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index d8ddcdc..e281117 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2015, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -54,7 +50,7 @@
 #include <dt_object.h>
 #include <lustre_acl.h>
 #include <lustre_export.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre_ioctl.h>
 #include <lustre_lfsck.h>
 #include <lustre_log.h>
 #include <lustre_net.h>
@@ -65,6 +61,7 @@
 #include <lustre_swab.h>
 #include <obd.h>
 #include <obd_support.h>
+#include <lustre_barrier.h>
 
 #include <llog_swab.h>
 
@@ -178,8 +175,8 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
        lh->mlh_type = MDT_PDO_LOCK;
 
        if (lu_name_is_valid(lname)) {
-               lh->mlh_pdo_hash = full_name_hash(lname->ln_name,
-                                                 lname->ln_namelen);
+               lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
+                                                    lname->ln_namelen);
                /* XXX Workaround for LU-2856
                 *
                 * Zero is a valid return value of full_name_hash, but
@@ -486,14 +483,18 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        const struct lu_env     *env = info->mti_env;
        struct md_object        *next = mdt_object_child(o);
        struct lu_buf           *buf = &info->mti_buf;
+       struct mdt_device       *mdt = info->mti_mdt;
        int rc;
 
+       ENTRY;
+
        buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
        buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
                                           RCL_SERVER);
        if (buf->lb_len == 0)
-               return 0;
+               RETURN(0);
 
+again:
        rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
        if (rc < 0) {
                if (rc == -ENODATA) {
@@ -503,17 +504,49 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                } else if (rc == -EOPNOTSUPP) {
                        rc = 0;
                } else {
+                       if (rc == -ERANGE &&
+                           exp_connect_large_acl(info->mti_exp) &&
+                           buf->lb_buf != info->mti_big_acl) {
+                               if (info->mti_big_acl == NULL) {
+                                       OBD_ALLOC_LARGE(info->mti_big_acl,
+                                                       mdt->mdt_max_ea_size);
+                                       if (info->mti_big_acl == NULL) {
+                                               CERROR("%s: unable to grow "
+                                                      DFID" ACL buffer\n",
+                                                      mdt_obd_name(mdt),
+                                                      PFID(mdt_object_fid(o)));
+                                               RETURN(-ENOMEM);
+                                       }
+
+                                       info->mti_big_aclsize =
+                                                       mdt->mdt_max_ea_size;
+                               }
+
+                               CDEBUG(D_INODE, "%s: grow the "DFID
+                                      " ACL buffer to size %d\n",
+                                      mdt_obd_name(mdt),
+                                      PFID(mdt_object_fid(o)),
+                                      mdt->mdt_max_ea_size);
+
+                               buf->lb_buf = info->mti_big_acl;
+                               buf->lb_len = info->mti_big_aclsize;
+
+                               goto again;
+                       }
+
                        CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
+                              mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
+               if (buf->lb_buf == info->mti_big_acl)
+                       info->mti_big_acl_used = 1;
+
                rc = nodemap_map_acl(nodemap, buf->lb_buf,
                                     rc, NODEMAP_FS_TO_CLIENT);
                /* if all ACLs mapped out, rc is still >= 0 */
                if (rc < 0) {
                        CERROR("%s: nodemap_map_acl unable to parse "DFID
-                              " ACL: rc = %d\n", mdt_obd_name(info->mti_mdt),
+                              " ACL: rc = %d\n", mdt_obd_name(mdt),
                               PFID(mdt_object_fid(o)), rc);
                } else {
                        repbody->mbo_aclsize = rc;
@@ -521,10 +554,35 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                        rc = 0;
                }
        }
-       return rc;
+
+       RETURN(rc);
 }
 #endif
 
+/* XXX Look into layout in MDT layer. */
+static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
+{
+       struct lov_comp_md_v1   *comp_v1;
+       struct lov_mds_md       *v1;
+       int                      i;
+
+       if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
+               comp_v1 = (struct lov_comp_md_v1 *)lmm;
+
+               for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+                       v1 = (struct lov_mds_md *)((char *)comp_v1 +
+                               comp_v1->lcm_entries[i].lcme_offset);
+                       /* We don't support partial release for now */
+                       if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
+                               return false;
+               }
+               return true;
+       } else {
+               return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
+                       true : false;
+       }
+}
+
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid)
 {
@@ -572,6 +630,12 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_valid |= OBD_MD_FLGID;
        }
 
+       if (attr->la_valid & LA_PROJID) {
+               /* TODO, nodemap for project id */
+               b->mbo_projid = attr->la_projid;
+               b->mbo_valid |= OBD_MD_FLPROJID;
+       }
+
        b->mbo_mode = attr->la_mode;
        if (attr->la_valid & LA_MODE)
                b->mbo_valid |= OBD_MD_FLMODE;
@@ -581,7 +645,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        if (fid != NULL) {
                b->mbo_fid1 = *fid;
                b->mbo_valid |= OBD_MD_FLID;
-               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid="LPX64"\n",
+               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
                       PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
        }
 
@@ -603,7 +667,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                 * b=22272 */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
        } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
-                  ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) {
+                  mdt_hsm_is_released(ma->ma_lmm)) {
                /* A released file stores its size on MDS. */
                /* But return 1 block for released file, unless tools like tar
                 * will consider it fully sparse. (LU-3864)
@@ -931,7 +995,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
 #endif
 out:
        ma->ma_need = need;
-       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
               rc, ma->ma_valid, ma->ma_lmm);
        RETURN(rc);
 }
@@ -950,7 +1014,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        struct lu_buf           *buffer = &info->mti_buf;
        struct obd_export       *exp = info->mti_exp;
        int                      rc;
-       int                      is_root;
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1032,32 +1095,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-       is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
-
-       /* the Lustre protocol supposes to return default striping
-        * on the user-visible root if explicitly requested */
-       if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
-           (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) {
-               struct lu_fid      rootfid;
-               struct mdt_object *root;
-               struct mdt_device *mdt = info->mti_mdt;
-
-               rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
-               if (rc)
-                       RETURN(rc);
-               root = mdt_object_find(env, mdt, &rootfid);
-               if (IS_ERR(root))
-                       RETURN(PTR_ERR(root));
-               rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV);
-               mdt_object_put(info->mti_env, root);
-               if (unlikely(rc)) {
-                       CERROR("%s: getattr error for "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
-                       RETURN(rc);
-               }
-       }
-
         if (likely(ma->ma_valid & MA_INODE))
                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
         else
@@ -1088,6 +1125,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
                        LASSERT(S_ISDIR(la->la_mode));
+                       mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
@@ -1204,6 +1242,12 @@ static int mdt_getattr(struct tgt_session_info *tsi)
 
        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
        rc = req_capsule_server_pack(pill);
        if (unlikely(rc != 0))
                GOTO(out, rc = err_serious(rc));
@@ -1232,6 +1276,60 @@ out:
 }
 
 /**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param[in] info     thread environment
+ * \param[in] obj      object
+ * \param[in] layout   layout intent
+ * \param[in] buf      buffer containing client's lovea, could be empty
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+                            struct mdt_object *obj,
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       int rc;
+       ENTRY;
+
+       CDEBUG(D_INFO, "got layout change request from client: "
+              "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+              layout->li_opc, layout->li_flags,
+              layout->li_start, layout->li_end);
+       if (layout->li_start >= layout->li_end) {
+               CERROR("Recieved an invalid layout change range [%llu, %llu) "
+                      "for "DFID"\n", layout->li_start, layout->li_end,
+                      PFID(mdt_object_fid(obj)));
+               RETURN(-EINVAL);
+       }
+
+       if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+               GOTO(out, rc = -EINVAL);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+                          MAY_WRITE);
+       if (rc)
+               GOTO(out, rc);
+
+       /* take layout lock to prepare layout change */
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, obj, lh,
+                            MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+                             buf);
+
+       mdt_object_unlock(info, obj, lh, 1);
+out:
+       RETURN(rc);
+}
+
+/**
  * Exchange MOF_LOV_CREATED flags between two objects after a
  * layout swap. No assumption is made on whether o1 or o2 have
  * created objects or not.
@@ -1775,7 +1873,7 @@ static int mdt_readpage(struct tgt_session_info *tsi)
          */
        rdpg->rp_hash = reqbody->mbo_size;
        if (rdpg->rp_hash != reqbody->mbo_size) {
-               CERROR("Invalid hash: "LPX64" != "LPX64"\n",
+               CERROR("Invalid hash: %#llx != %#llx\n",
                       rdpg->rp_hash, reqbody->mbo_size);
                RETURN(-EFAULT);
        }
@@ -1864,6 +1962,13 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+               req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                    LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
         rc = req_capsule_server_pack(pill);
         if (rc != 0) {
                 CERROR("Can't pack response, rc %d\n", rc);
@@ -2092,20 +2197,37 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        }
 
        id = oqctl->qc_id;
-       if (oqctl->qc_type == USRQUOTA)
+       switch (oqctl->qc_type) {
+       case USRQUOTA:
                id = nodemap_map_id(nodemap, NODEMAP_UID,
                                    NODEMAP_CLIENT_TO_FS, id);
-       else if (oqctl->qc_type == GRPQUOTA)
-               id = nodemap_map_id(nodemap, NODEMAP_UID,
+               break;
+       case GRPQUOTA:
+               id = nodemap_map_id(nodemap, NODEMAP_GID,
                                    NODEMAP_CLIENT_TO_FS, id);
-
+               break;
+       case PRJQUOTA:
+               /* todo: check/map project id */
+               id = oqctl->qc_id;
+               break;
+       default:
+               GOTO(out_nodemap, rc = -EOPNOTSUPP);
+       }
        repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
        if (repoqc == NULL)
                GOTO(out_nodemap, rc = err_serious(-EFAULT));
 
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
+               barrier_exit(tsi->tsi_tgt->lut_bottom);
+
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
+               if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
+                       RETURN(-EINPROGRESS);
+       }
+
        switch (oqctl->qc_cmd) {
 
        case Q_GETINFO:
@@ -2315,49 +2437,66 @@ static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
  * \see ldlm_blocking_ast_nocheck
  */
 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
+                    void *data, int flag)
 {
-        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        int rc;
-        ENTRY;
+       struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool commit_async = false;
+       int rc;
+       ENTRY;
 
-        if (flag == LDLM_CB_CANCELING)
-                RETURN(0);
+       if (flag == LDLM_CB_CANCELING)
+               RETURN(0);
 
-        lock_res_and_lock(lock);
-        if (lock->l_blocking_ast != mdt_blocking_ast) {
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
+       lock_res_and_lock(lock);
+       if (lock->l_blocking_ast != mdt_blocking_ast) {
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
+       /* There is no lock conflict if l_blocking_lock == NULL,
+        * it indicates a blocking ast sent from ldlm_lock_decref_internal
+        * when the last reference to a local lock was released */
        if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
            lock->l_blocking_lock != NULL) {
-               if (mdt_cos_is_enabled(mdt) &&
-                   lock->l_client_cookie !=
-                   lock->l_blocking_lock->l_client_cookie)
-                       mdt_set_lock_sync(lock);
-               else if (mdt_slc_is_enabled(mdt) &&
-                        ldlm_is_cos_incompat(lock->l_blocking_lock))
+               if (mdt_cos_is_enabled(mdt)) {
+                       if (lock->l_client_cookie !=
+                           lock->l_blocking_lock->l_client_cookie)
+                               mdt_set_lock_sync(lock);
+               else if (mdt_slc_is_enabled(mdt) &&
+                          ldlm_is_cos_incompat(lock->l_blocking_lock)) {
                        mdt_set_lock_sync(lock);
+                       /*
+                        * we may do extra commit here, but there is a small
+                        * window to miss a commit: lock was unlocked (saved),
+                        * then a conflict lock queued and we come here, but
+                        * REP-ACK not received, so lock was not converted to
+                        * COS mode yet.
+                        * Fortunately this window is quite small, so the
+                        * extra commit should be rare (not to say distributed
+                        * operation is rare too).
+                        */
+                       commit_async = true;
+               }
+       } else if (lock->l_req_mode == LCK_COS &&
+                  lock->l_blocking_lock != NULL) {
+               commit_async = true;
        }
-        rc = ldlm_blocking_ast_nocheck(lock);
 
-        /* There is no lock conflict if l_blocking_lock == NULL,
-         * it indicates a blocking ast sent from ldlm_lock_decref_internal
-         * when the last reference to a local lock was released */
-        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
-                struct lu_env env;
+       rc = ldlm_blocking_ast_nocheck(lock);
+
+       if (commit_async) {
+               struct lu_env env;
 
                rc = lu_env_init(&env, LCT_LOCAL);
                if (unlikely(rc != 0))
                        CWARN("%s: lu_env initialization failed, cannot "
                              "start asynchronous commit: rc = %d\n",
                              obd->obd_name, rc);
-                else
-                        mdt_device_commit_async(&env, mdt);
-                lu_env_fini(&env);
-        }
-        RETURN(rc);
+               else
+                       mdt_device_commit_async(&env, mdt);
+               lu_env_fini(&env);
+       }
+       RETURN(rc);
 }
 
 /*
@@ -2392,11 +2531,15 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                break;
        }
        case LDLM_CB_CANCELING: {
+               struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+               struct mdt_device *mdt =
+                               mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
+
                LDLM_DEBUG(lock, "Revoke remote lock\n");
 
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
-               tgt_discard_slc_lock(lock);
+               tgt_discard_slc_lock(&mdt->mdt_lut, lock);
 
                /* once we cache lock, l_ast_data is set to mdt_object */
                if (lock->l_ast_data != NULL) {
@@ -2405,9 +2548,6 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               struct obd_device *obd;
-
-                               obd = ldlm_lock_to_ns(lock)->ns_obd;
                                CWARN("%s: lu_env initialization failed, object"
                                      "%p "DFID" is leaked!\n",
                                      obd->obd_name, mo,
@@ -2447,7 +2587,7 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
                        /* Lock is pinned by ldlm_handle_enqueue0() as it is
                         * a resend case, however, it could be already destroyed
                         * due to client eviction or a raced cancel RPC. */
-                       LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64,
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
                                          lhc->mlh_reg_lh.cookie);
                        RETURN(-ESTALE);
                }
@@ -2738,25 +2878,24 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       int cos;
-
-                       cos = (mdt_cos_is_enabled(mdt) ||
-                              mdt_slc_is_enabled(mdt));
+                       bool cos = mdt_cos_is_enabled(mdt);
+                       bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
 
-                       LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                       LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
 
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
-                               CDEBUG(D_HA, "request = %p reply state = %p"
-                                      " transno = "LPD64"\n", req,
-                                      req->rq_reply_state, req->rq_transno);
+                               LDLM_DEBUG(lock, "save lock request %p reply "
+                                       "state %p transno %lld\n", req,
+                                       req->rq_reply_state, req->rq_transno);
                                if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, cos);
+                               ptlrpc_save_lock(req, h, mode, cos,
+                                                convert_lock);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
@@ -2807,7 +2946,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
-                       tgt_save_slc_lock(lock, req->rq_transno);
+                       tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
+                                         req->rq_transno);
                        ldlm_lock_decref(h, mode);
                }
                h->cookie = 0ull;
@@ -2827,6 +2967,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
  * \param o mdt object
  * \param lh mdt lock handle referencing regular and PDO locks
  * \param decref force immediate lock releasing
+ *
+ * XXX o is not used and may be NULL, see hsm_cdt_request_completed().
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                       struct mdt_lock_handle *lh, int decref)
@@ -2941,6 +3083,13 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
 
+               /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+                * by default. If the target object has more ACL entries, then
+                * enlarge the buffer when necessary. */
+               if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
                 rc = req_capsule_server_pack(pill);
         }
         RETURN(rc);
@@ -2997,6 +3146,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_cross_ref = 0;
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
+       info->mti_big_acl_used = 0;
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -3185,14 +3335,14 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
                /* Lock is pinned by ldlm_handle_enqueue0() as it is
                 * a resend case, however, it could be already destroyed
                 * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64"\n",
+               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
                                  lh->mlh_reg_lh.cookie);
                lh->mlh_reg_lh.cookie = 0;
                RETURN(-ESTALE);
        }
 
        LASSERTF(new_lock != NULL,
-                "lockh "LPX64" flags "LPX64" rc %d\n",
+                "lockh %#llx flags %#llx : rc = %d\n",
                 lh->mlh_reg_lh.cookie, flags, result);
 
         /*
@@ -3279,7 +3429,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                lh->mlh_reg_mode = new_lock->l_granted_mode;
 
                LDLM_DEBUG(new_lock, "Restoring lock cookie");
-               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie %#llx",
                          lh->mlh_reg_lh.cookie);
                return;
        }
@@ -3298,7 +3448,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx",
                  dlmreq->lock_handle[0].cookie);
 }
 
@@ -3309,7 +3459,8 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 {
        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
        struct ldlm_reply      *ldlm_rep = NULL;
-       int rc, grc;
+       int rc;
+       ENTRY;
 
        /*
         * Initialize lhc->mlh_reg_lh either from a previously granted lock
@@ -3325,18 +3476,30 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
                        return rc;
        }
 
-       grc = mdt_getxattr(info);
-
-       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, 0);
+       rc = mdt_getxattr(info);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
-       if (ldlm_rep == NULL)
+
+       if (ldlm_rep == NULL ||
+           OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
+               mdt_object_unlock(info,  info->mti_object, lhc, 1);
                RETURN(err_serious(-EFAULT));
+       }
 
-       ldlm_rep->lock_policy_res2 = grc;
+       ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-       return rc;
+       /* This is left for interop instead of adding a new interop flag.
+        * LU-7433 */
+#if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0)
+       if (ldlm_rep->lock_policy_res2) {
+               mdt_object_unlock(info, info->mti_object, lhc, 1);
+               RETURN(ELDLM_LOCK_ABORTED);
+       }
+#endif
+
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+       RETURN(rc);
 }
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
@@ -3417,6 +3580,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
+       bool layout_change = false;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
@@ -3431,11 +3595,29 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (layout == NULL)
                RETURN(-EPROTO);
 
-       if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+       switch (layout->li_opc) {
+       case LAYOUT_INTENT_TRUNC:
+       case LAYOUT_INTENT_WRITE:
+               layout_change = true;
+               break;
+       case LAYOUT_INTENT_ACCESS:
+               break;
+       case LAYOUT_INTENT_READ:
+       case LAYOUT_INTENT_GLIMPSE:
+       case LAYOUT_INTENT_RELEASE:
+       case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), layout->li_opc);
-               RETURN(-EINVAL);
+               rc = -ENOTSUPP;
+               break;
+       default:
+               CERROR("%s: Unknown layout intent opc %d\n",
+                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+               rc = -EINVAL;
+               break;
        }
+       if (rc < 0)
+               RETURN(rc);
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
@@ -3456,12 +3638,67 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                        info->mti_mdt->mdt_max_mdsize = layout_size;
        }
 
+       /*
+        * set reply buffer size, so that ldlm_handle_enqueue0()->
+        * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+        */
        (*lockp)->l_lvb_type = LVB_T_LAYOUT;
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
-       GOTO(out_obj, rc);
+       if (rc)
+               GOTO(out_obj, rc);
+
+
+       if (layout_change) {
+               struct lu_buf *buf = &info->mti_buf;
+
+               /**
+                * mdt_layout_change is a reint operation, when the request
+                * is resent, layout write shouldn't reprocess it again.
+                */
+               rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+               if (rc)
+                       GOTO(out_obj, rc = rc < 0 ? rc : 0);
+
+               /**
+                * There is another resent case: the client's job has been
+                * done by another client, referring lod_declare_layout_change
+                * -EALREADY case, and it became a operation w/o transaction,
+                * so we should not do the layout change, otherwise
+                * mdt_layout_change() will try to cancel the granted server
+                * CR lock whose remote counterpart is still in hold on the
+                * client, and a deadlock ensues.
+                */
+               rc = mdt_check_resent_lock(info, obj, lhc);
+               if (rc <= 0)
+                       GOTO(out_obj, rc);
+
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               if (unlikely(req_is_replay(mdt_info_req(info)))) {
+                       buf->lb_buf = req_capsule_client_get(info->mti_pill,
+                                       &RMF_EADATA);
+                       buf->lb_len = req_capsule_get_size(info->mti_pill,
+                                       &RMF_EADATA, RCL_CLIENT);
+                       /*
+                        * If it's a replay of layout write intent RPC, the
+                        * client has saved the extended lovea when
+                        * it get reply then.
+                        */
+                       if (buf->lb_len > 0)
+                               mdt_fix_lov_magic(info, buf->lb_buf);
+               }
 
+               /*
+                * Instantiate some layout components, if @buf contains
+                * lovea, then it's a replay of the layout intent write
+                * RPC.
+                */
+               rc = mdt_layout_change(info, obj, layout, buf);
+               if (rc)
+                       GOTO(out_obj, rc);
+       }
 out_obj:
        mdt_object_put(info->mti_env, obj);
 
@@ -3618,6 +3855,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
                if (qmt == NULL)
                        RETURN(-EOPNOTSUPP);
 
+               if (mdt_rdonly(req->rq_export))
+                       RETURN(-EROFS);
+
                (*lockp)->l_lvb_type = LVB_T_LQUOTA;
                /* pass the request to quota master */
                rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
@@ -3634,8 +3874,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
        if (rc < 0)
                RETURN(rc);
 
-       if (flv->it_flags & MUTABOR &&
-           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+       if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
        if (flv->it_act != NULL) {
@@ -4492,58 +4731,64 @@ static struct tgt_opc_slice mdt_common_slice[] = {
 
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
-       struct md_device        *next = m->mdt_child;
-       struct lu_device        *d    = &m->mdt_lu_dev;
-       struct obd_device       *obd  = mdt2obd_dev(m);
-       struct lfsck_stop        stop;
-       ENTRY;
-
-       if (m->mdt_md_root != NULL) {
-               mdt_object_put(env, m->mdt_md_root);
-               m->mdt_md_root = NULL;
-       }
+       struct md_device *next = m->mdt_child;
+       struct lu_device *d = &m->mdt_lu_dev;
+       struct obd_device *obd = mdt2obd_dev(m);
+       struct lfsck_stop stop;
 
+       ENTRY;
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
-       target_recovery_fini(obd);
        ping_evictor_stop();
 
        if (m->mdt_opts.mo_coordinator)
                mdt_hsm_cdt_stop(m);
 
-       mdt_hsm_cdt_fini(m);
-
        mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
-        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+       mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
 
        if (m->mdt_namespace != NULL)
                ldlm_namespace_free_prior(m->mdt_namespace, NULL,
                                          d->ld_obd->obd_force);
 
-        obd_exports_barrier(obd);
-        obd_zombie_barrier();
+       obd_exports_barrier(obd);
+       obd_zombie_barrier();
 
-        mdt_procfs_fini(m);
+       mdt_quota_fini(env, m);
 
-        tgt_fini(env, &m->mdt_lut);
-        mdt_fs_cleanup(env, m);
-        upcall_cache_cleanup(m->mdt_identity_cache);
-        m->mdt_identity_cache = NULL;
+       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+
+       /* Calling the cleanup functions in the same order as in the mdt_init0
+        * error path
+        */
+       mdt_procfs_fini(m);
+
+       target_recovery_fini(obd);
+       upcall_cache_cleanup(m->mdt_identity_cache);
+       m->mdt_identity_cache = NULL;
+
+       mdt_fs_cleanup(env, m);
+
+       tgt_fini(env, &m->mdt_lut);
+
+       mdt_hsm_cdt_fini(m);
 
        if (m->mdt_namespace != NULL) {
                ldlm_namespace_free_post(m->mdt_namespace);
                d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
        }
 
-       mdt_quota_fini(env, m);
+       if (m->mdt_md_root != NULL) {
+               mdt_object_put(env, m->mdt_md_root);
+               m->mdt_md_root = NULL;
+       }
 
-       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+       mdt_seq_fini(env, m);
 
-        mdt_seq_fini(env, m);
-        mdt_fld_fini(env, m);
+       mdt_fld_fini(env, m);
 
        /*
         * Finish the stack
@@ -4709,7 +4954,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_tgt, rc);
 
-       tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
+       tgt_adapt_sptlrpc_conf(&m->mdt_lut);
 
         next = m->mdt_child;
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
@@ -4906,6 +5151,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                spin_lock_init(&mo->mot_write_lock);
                mutex_init(&mo->mot_lov_mutex);
                init_rwsem(&mo->mot_open_sem);
+               atomic_set(&mo->mot_open_count, 0);
                RETURN(o);
        }
        RETURN(NULL);
@@ -5040,7 +5286,7 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        ENTRY;
 
        if (KEY_IS(KEY_SPTLRPC_CONF)) {
-               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp));
                RETURN(rc);
        }
 
@@ -5060,6 +5306,10 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
  * connect flags from the obd_connect_data::ocd_connect_flags field of the
  * reply. \see mdt_connect().
  *
+ * Before 2.7.50 clients will send a struct obd_connect_data_v1 rather than a
+ * full struct obd_connect_data. So care must be taken when accessing fields
+ * that are not present in struct obd_connect_data_v1. See LU-16.
+ *
  * \param exp   the obd_export associated with this client/target pair
  * \param mdt   the target device for the connection
  * \param data  stores data for this connect request
@@ -5075,13 +5325,21 @@ static int mdt_connect_internal(struct obd_export *exp,
        LASSERT(data != NULL);
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
-       data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
+
+       if (mdt->mdt_bottom->dd_rdonly &&
+           !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_RDONLY))
+               RETURN(-EACCES);
+
+       if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
+               data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
+
        data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
        if (!(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
            !(data->ocd_connect_flags & OBD_CONNECT_IBITS)) {
                CWARN("%s: client %s does not support ibits lock, either "
-                     "very old or an invalid client: flags "LPX64"\n",
+                     "very old or an invalid client: flags %#llx\n",
                      mdt_obd_name(mdt), exp->exp_client_uuid.uuid,
                      data->ocd_connect_flags);
                return -EBADE;
@@ -5093,16 +5351,13 @@ static int mdt_connect_internal(struct obd_export *exp,
        if (!mdt->mdt_opts.mo_user_xattr)
                data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
 
-       if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
-               data->ocd_brw_size = min(data->ocd_brw_size,
-                                        (__u32)MD_MAX_BRW_SIZE);
+       if (OCD_HAS_FLAG(data, BRW_SIZE)) {
+               data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE);
                if (data->ocd_brw_size == 0) {
-                       CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
-                              " ocd_version: %x ocd_grant: %d "
-                              "ocd_index: %u ocd_brw_size is "
-                              "unexpectedly zero, network data "
-                              "corruption? Refusing connection of this"
-                              " client\n",
+                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
+                              "ocd_version: %x ocd_grant: %d ocd_index: %u "
+                              "ocd_brw_size unexpectedly zero, network data "
+                              "corruption? Refusing to connect this client\n",
                               mdt_obd_name(mdt),
                               exp->exp_client_uuid.uuid,
                               exp, data->ocd_connect_flags, data->ocd_version,
@@ -5247,7 +5502,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_obd->obd_fail) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -5256,9 +5511,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
         }
         info->mti_mdt = NULL;
         /* cleanup client slot early */
-        /* Do not erase record for recoverable client. */
-        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
-               tgt_client_del(&env, exp);
+       tgt_client_del(&env, exp);
         lu_env_fini(&env);
 
         RETURN(rc);
@@ -5496,7 +5749,7 @@ int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
        if (rc < 0)
                return rc;
 
-       return linkea_init(ldata);
+       return linkea_init_with_rec(ldata);
 }
 
 /**
@@ -5550,9 +5803,14 @@ static int mdt_path_current(struct mdt_thread_info *info,
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
 
-               mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
-               if (IS_ERR(mdt_obj))
-                       GOTO(out, rc = PTR_ERR(mdt_obj));
+               if (lu_fid_eq(mdt_object_fid(obj), tmpfid)) {
+                       mdt_obj = obj;
+                       mdt_object_get(info->mti_env, mdt_obj);
+               } else {
+                       mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
+                       if (IS_ERR(mdt_obj))
+                               GOTO(out, rc = PTR_ERR(mdt_obj));
+               }
 
                if (!mdt_object_exists(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
@@ -5698,16 +5956,16 @@ static int mdt_fid2path(struct mdt_thread_info *info,
        int    rc;
        ENTRY;
 
-       CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
+       CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
                PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
 
        if (!fid_is_sane(&fp->gf_fid))
                RETURN(-EINVAL);
 
        if (!fid_is_namespace_visible(&fp->gf_fid)) {
-               CWARN("%s: "DFID" is invalid, sequence should be "
-                     ">= "LPX64"\n", mdt_obd_name(mdt),
-                     PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
+               CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx"
+                      ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt),
+                      PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
                RETURN(-EINVAL);
        }
 
@@ -5735,7 +5993,7 @@ static int mdt_fid2path(struct mdt_thread_info *info,
 
        rc = mdt_path(info, obj, fp, root_fid);
 
-       CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
+       CDEBUG(D_INFO, "fid "DFID", path %s recno %#llx linkno %u\n",
               PFID(&fp->gf_fid), fp->gf_u.gf_path,
               fp->gf_recno, fp->gf_linkno);
 
@@ -5825,30 +6083,6 @@ int mdt_get_info(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
-/* Pass the ioc down */
-static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
-                        unsigned int cmd, int len, void *data)
-{
-       struct lu_context ioctl_session;
-       struct md_device *next = mdt->mdt_child;
-       int rc;
-       ENTRY;
-
-        rc = lu_context_init(&ioctl_session, LCT_SERVER_SESSION);
-       if (rc)
-               RETURN(rc);
-       ioctl_session.lc_thread = (struct ptlrpc_thread *)current;
-       lu_context_enter(&ioctl_session);
-       env->le_ses = &ioctl_session;
-
-       LASSERT(next->md_ops->mdo_iocontrol);
-       rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
-
-       lu_context_exit(&ioctl_session);
-       lu_context_fini(&ioctl_session);
-       RETURN(rc);
-}
-
 static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 {
        struct obd_ioctl_data *data = karg;
@@ -5930,7 +6164,9 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
-                rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+               rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
+                                                          mdt->mdt_child,
+                                                          cmd, len, karg);
                 break;
        case OBD_IOC_START_LFSCK: {
                struct md_device *next = mdt->mdt_child;
@@ -6007,7 +6243,7 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
        int rc;
        ENTRY;
 
-       if (!mdt->mdt_skip_lfsck) {
+       if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
                struct lfsck_start_param lsp;
 
                lsp.lsp_start = NULL;
@@ -6107,6 +6343,13 @@ static void mdt_key_fini(const struct lu_context *ctx,
                info->mti_big_lmm = NULL;
                info->mti_big_lmmsize = 0;
        }
+
+       if (info->mti_big_acl) {
+               OBD_FREE_LARGE(info->mti_big_acl, info->mti_big_aclsize);
+               info->mti_big_acl = NULL;
+               info->mti_big_aclsize = 0;
+       }
+
        OBD_FREE_PTR(info);
 }