Whamcloud - gitweb
LU-8359 ldlm: Wrong evict during failover
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 1582399..e281117 100644 (file)
@@ -50,7 +50,7 @@
 #include <dt_object.h>
 #include <lustre_acl.h>
 #include <lustre_export.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre_ioctl.h>
 #include <lustre_lfsck.h>
 #include <lustre_log.h>
 #include <lustre_net.h>
@@ -61,6 +61,7 @@
 #include <lustre_swab.h>
 #include <obd.h>
 #include <obd_support.h>
+#include <lustre_barrier.h>
 
 #include <llog_swab.h>
 
@@ -174,8 +175,8 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
        lh->mlh_type = MDT_PDO_LOCK;
 
        if (lu_name_is_valid(lname)) {
-               lh->mlh_pdo_hash = full_name_hash(lname->ln_name,
-                                                 lname->ln_namelen);
+               lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
+                                                    lname->ln_namelen);
                /* XXX Workaround for LU-2856
                 *
                 * Zero is a valid return value of full_name_hash, but
@@ -482,14 +483,18 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        const struct lu_env     *env = info->mti_env;
        struct md_object        *next = mdt_object_child(o);
        struct lu_buf           *buf = &info->mti_buf;
+       struct mdt_device       *mdt = info->mti_mdt;
        int rc;
 
+       ENTRY;
+
        buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
        buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
                                           RCL_SERVER);
        if (buf->lb_len == 0)
-               return 0;
+               RETURN(0);
 
+again:
        rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
        if (rc < 0) {
                if (rc == -ENODATA) {
@@ -499,17 +504,49 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                } else if (rc == -EOPNOTSUPP) {
                        rc = 0;
                } else {
+                       if (rc == -ERANGE &&
+                           exp_connect_large_acl(info->mti_exp) &&
+                           buf->lb_buf != info->mti_big_acl) {
+                               if (info->mti_big_acl == NULL) {
+                                       OBD_ALLOC_LARGE(info->mti_big_acl,
+                                                       mdt->mdt_max_ea_size);
+                                       if (info->mti_big_acl == NULL) {
+                                               CERROR("%s: unable to grow "
+                                                      DFID" ACL buffer\n",
+                                                      mdt_obd_name(mdt),
+                                                      PFID(mdt_object_fid(o)));
+                                               RETURN(-ENOMEM);
+                                       }
+
+                                       info->mti_big_aclsize =
+                                                       mdt->mdt_max_ea_size;
+                               }
+
+                               CDEBUG(D_INODE, "%s: grow the "DFID
+                                      " ACL buffer to size %d\n",
+                                      mdt_obd_name(mdt),
+                                      PFID(mdt_object_fid(o)),
+                                      mdt->mdt_max_ea_size);
+
+                               buf->lb_buf = info->mti_big_acl;
+                               buf->lb_len = info->mti_big_aclsize;
+
+                               goto again;
+                       }
+
                        CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
+                              mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
+               if (buf->lb_buf == info->mti_big_acl)
+                       info->mti_big_acl_used = 1;
+
                rc = nodemap_map_acl(nodemap, buf->lb_buf,
                                     rc, NODEMAP_FS_TO_CLIENT);
                /* if all ACLs mapped out, rc is still >= 0 */
                if (rc < 0) {
                        CERROR("%s: nodemap_map_acl unable to parse "DFID
-                              " ACL: rc = %d\n", mdt_obd_name(info->mti_mdt),
+                              " ACL: rc = %d\n", mdt_obd_name(mdt),
                               PFID(mdt_object_fid(o)), rc);
                } else {
                        repbody->mbo_aclsize = rc;
@@ -517,10 +554,35 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
                        rc = 0;
                }
        }
-       return rc;
+
+       RETURN(rc);
 }
 #endif
 
+/* XXX Look into layout in MDT layer. */
+static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
+{
+       struct lov_comp_md_v1   *comp_v1;
+       struct lov_mds_md       *v1;
+       int                      i;
+
+       if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
+               comp_v1 = (struct lov_comp_md_v1 *)lmm;
+
+               for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+                       v1 = (struct lov_mds_md *)((char *)comp_v1 +
+                               comp_v1->lcm_entries[i].lcme_offset);
+                       /* We don't support partial release for now */
+                       if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
+                               return false;
+               }
+               return true;
+       } else {
+               return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
+                       true : false;
+       }
+}
+
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid)
 {
@@ -568,6 +630,12 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_valid |= OBD_MD_FLGID;
        }
 
+       if (attr->la_valid & LA_PROJID) {
+               /* TODO, nodemap for project id */
+               b->mbo_projid = attr->la_projid;
+               b->mbo_valid |= OBD_MD_FLPROJID;
+       }
+
        b->mbo_mode = attr->la_mode;
        if (attr->la_valid & LA_MODE)
                b->mbo_valid |= OBD_MD_FLMODE;
@@ -599,7 +667,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                 * b=22272 */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
        } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
-                  ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) {
+                  mdt_hsm_is_released(ma->ma_lmm)) {
                /* A released file stores its size on MDS. */
                /* But return 1 block for released file, unless tools like tar
                 * will consider it fully sparse. (LU-3864)
@@ -946,7 +1014,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        struct lu_buf           *buffer = &info->mti_buf;
        struct obd_export       *exp = info->mti_exp;
        int                      rc;
-       int                      is_root;
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1028,32 +1095,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-       is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
-
-       /* the Lustre protocol supposes to return default striping
-        * on the user-visible root if explicitly requested */
-       if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
-           (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) {
-               struct lu_fid      rootfid;
-               struct mdt_object *root;
-               struct mdt_device *mdt = info->mti_mdt;
-
-               rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
-               if (rc)
-                       RETURN(rc);
-               root = mdt_object_find(env, mdt, &rootfid);
-               if (IS_ERR(root))
-                       RETURN(PTR_ERR(root));
-               rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV);
-               mdt_object_put(info->mti_env, root);
-               if (unlikely(rc)) {
-                       CERROR("%s: getattr error for "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(o)), rc);
-                       RETURN(rc);
-               }
-       }
-
         if (likely(ma->ma_valid & MA_INODE))
                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
         else
@@ -1084,6 +1125,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
                        LASSERT(S_ISDIR(la->la_mode));
+                       mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
@@ -1200,6 +1242,12 @@ static int mdt_getattr(struct tgt_session_info *tsi)
 
        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
        rc = req_capsule_server_pack(pill);
        if (unlikely(rc != 0))
                GOTO(out, rc = err_serious(rc));
@@ -1228,6 +1276,60 @@ out:
 }
 
 /**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param[in] info     thread environment
+ * \param[in] obj      object
+ * \param[in] layout   layout intent
+ * \param[in] buf      buffer containing client's lovea, could be empty
+ *
+ * \retval 0   on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+                            struct mdt_object *obj,
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       int rc;
+       ENTRY;
+
+       CDEBUG(D_INFO, "got layout change request from client: "
+              "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+              layout->li_opc, layout->li_flags,
+              layout->li_start, layout->li_end);
+       if (layout->li_start >= layout->li_end) {
+               CERROR("Recieved an invalid layout change range [%llu, %llu) "
+                      "for "DFID"\n", layout->li_start, layout->li_end,
+                      PFID(mdt_object_fid(obj)));
+               RETURN(-EINVAL);
+       }
+
+       if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+               GOTO(out, rc = -EINVAL);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+                          MAY_WRITE);
+       if (rc)
+               GOTO(out, rc);
+
+       /* take layout lock to prepare layout change */
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, obj, lh,
+                            MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+                             buf);
+
+       mdt_object_unlock(info, obj, lh, 1);
+out:
+       RETURN(rc);
+}
+
+/**
  * Exchange MOF_LOV_CREATED flags between two objects after a
  * layout swap. No assumption is made on whether o1 or o2 have
  * created objects or not.
@@ -1860,6 +1962,13 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
 
+       /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+        * by default. If the target object has more ACL entries, then
+        * enlarge the buffer when necessary. */
+       if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+               req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                    LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
         rc = req_capsule_server_pack(pill);
         if (rc != 0) {
                 CERROR("Can't pack response, rc %d\n", rc);
@@ -2088,20 +2197,37 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        }
 
        id = oqctl->qc_id;
-       if (oqctl->qc_type == USRQUOTA)
+       switch (oqctl->qc_type) {
+       case USRQUOTA:
                id = nodemap_map_id(nodemap, NODEMAP_UID,
                                    NODEMAP_CLIENT_TO_FS, id);
-       else if (oqctl->qc_type == GRPQUOTA)
-               id = nodemap_map_id(nodemap, NODEMAP_UID,
+               break;
+       case GRPQUOTA:
+               id = nodemap_map_id(nodemap, NODEMAP_GID,
                                    NODEMAP_CLIENT_TO_FS, id);
-
+               break;
+       case PRJQUOTA:
+               /* todo: check/map project id */
+               id = oqctl->qc_id;
+               break;
+       default:
+               GOTO(out_nodemap, rc = -EOPNOTSUPP);
+       }
        repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
        if (repoqc == NULL)
                GOTO(out_nodemap, rc = err_serious(-EFAULT));
 
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
+               barrier_exit(tsi->tsi_tgt->lut_bottom);
+
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
+       if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
+               if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
+                       RETURN(-EINPROGRESS);
+       }
+
        switch (oqctl->qc_cmd) {
 
        case Q_GETINFO:
@@ -2311,49 +2437,66 @@ static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
  * \see ldlm_blocking_ast_nocheck
  */
 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
+                    void *data, int flag)
 {
-        struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        int rc;
-        ENTRY;
+       struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool commit_async = false;
+       int rc;
+       ENTRY;
 
-        if (flag == LDLM_CB_CANCELING)
-                RETURN(0);
+       if (flag == LDLM_CB_CANCELING)
+               RETURN(0);
 
-        lock_res_and_lock(lock);
-        if (lock->l_blocking_ast != mdt_blocking_ast) {
-                unlock_res_and_lock(lock);
-                RETURN(0);
-        }
+       lock_res_and_lock(lock);
+       if (lock->l_blocking_ast != mdt_blocking_ast) {
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
+       /* There is no lock conflict if l_blocking_lock == NULL,
+        * it indicates a blocking ast sent from ldlm_lock_decref_internal
+        * when the last reference to a local lock was released */
        if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
            lock->l_blocking_lock != NULL) {
-               if (mdt_cos_is_enabled(mdt) &&
-                   lock->l_client_cookie !=
-                   lock->l_blocking_lock->l_client_cookie)
-                       mdt_set_lock_sync(lock);
-               else if (mdt_slc_is_enabled(mdt) &&
-                        ldlm_is_cos_incompat(lock->l_blocking_lock))
+               if (mdt_cos_is_enabled(mdt)) {
+                       if (lock->l_client_cookie !=
+                           lock->l_blocking_lock->l_client_cookie)
+                               mdt_set_lock_sync(lock);
+               else if (mdt_slc_is_enabled(mdt) &&
+                          ldlm_is_cos_incompat(lock->l_blocking_lock)) {
                        mdt_set_lock_sync(lock);
+                       /*
+                        * we may do extra commit here, but there is a small
+                        * window to miss a commit: lock was unlocked (saved),
+                        * then a conflict lock queued and we come here, but
+                        * REP-ACK not received, so lock was not converted to
+                        * COS mode yet.
+                        * Fortunately this window is quite small, so the
+                        * extra commit should be rare (not to say distributed
+                        * operation is rare too).
+                        */
+                       commit_async = true;
+               }
+       } else if (lock->l_req_mode == LCK_COS &&
+                  lock->l_blocking_lock != NULL) {
+               commit_async = true;
        }
-        rc = ldlm_blocking_ast_nocheck(lock);
 
-        /* There is no lock conflict if l_blocking_lock == NULL,
-         * it indicates a blocking ast sent from ldlm_lock_decref_internal
-         * when the last reference to a local lock was released */
-        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
-                struct lu_env env;
+       rc = ldlm_blocking_ast_nocheck(lock);
+
+       if (commit_async) {
+               struct lu_env env;
 
                rc = lu_env_init(&env, LCT_LOCAL);
                if (unlikely(rc != 0))
                        CWARN("%s: lu_env initialization failed, cannot "
                              "start asynchronous commit: rc = %d\n",
                              obd->obd_name, rc);
-                else
-                        mdt_device_commit_async(&env, mdt);
-                lu_env_fini(&env);
-        }
-        RETURN(rc);
+               else
+                       mdt_device_commit_async(&env, mdt);
+               lu_env_fini(&env);
+       }
+       RETURN(rc);
 }
 
 /*
@@ -2735,10 +2878,8 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        struct mdt_device *mdt = info->mti_mdt;
                        struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
-                       int cos;
-
-                       cos = (mdt_cos_is_enabled(mdt) ||
-                              mdt_slc_is_enabled(mdt));
+                       bool cos = mdt_cos_is_enabled(mdt);
+                       bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
 
                        LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
@@ -2746,14 +2887,15 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
-                               CDEBUG(D_HA, "request = %p reply state = %p"
-                                      " transno = %lld\n", req,
-                                      req->rq_reply_state, req->rq_transno);
+                               LDLM_DEBUG(lock, "save lock request %p reply "
+                                       "state %p transno %lld\n", req,
+                                       req->rq_reply_state, req->rq_transno);
                                if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
                                        mode = LCK_COS;
                                }
-                               ptlrpc_save_lock(req, h, mode, cos);
+                               ptlrpc_save_lock(req, h, mode, cos,
+                                                convert_lock);
                        } else {
                                mdt_fid_unlock(h, mode);
                        }
@@ -2941,6 +3083,13 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
 
+               /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
+                * by default. If the target object has more ACL entries, then
+                * enlarge the buffer when necessary. */
+               if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
+                                            LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
+
                 rc = req_capsule_server_pack(pill);
         }
         RETURN(rc);
@@ -2997,6 +3146,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_cross_ref = 0;
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
+       info->mti_big_acl_used = 0;
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -3430,6 +3580,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        struct layout_intent *layout;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
+       bool layout_change = false;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
@@ -3444,11 +3595,29 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (layout == NULL)
                RETURN(-EPROTO);
 
-       if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+       switch (layout->li_opc) {
+       case LAYOUT_INTENT_TRUNC:
+       case LAYOUT_INTENT_WRITE:
+               layout_change = true;
+               break;
+       case LAYOUT_INTENT_ACCESS:
+               break;
+       case LAYOUT_INTENT_READ:
+       case LAYOUT_INTENT_GLIMPSE:
+       case LAYOUT_INTENT_RELEASE:
+       case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
                       mdt_obd_name(info->mti_mdt), layout->li_opc);
-               RETURN(-EINVAL);
+               rc = -ENOTSUPP;
+               break;
+       default:
+               CERROR("%s: Unknown layout intent opc %d\n",
+                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+               rc = -EINVAL;
+               break;
        }
+       if (rc < 0)
+               RETURN(rc);
 
        fid = &info->mti_tmp_fid2;
        fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
@@ -3469,12 +3638,67 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                        info->mti_mdt->mdt_max_mdsize = layout_size;
        }
 
+       /*
+        * set reply buffer size, so that ldlm_handle_enqueue0()->
+        * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+        */
        (*lockp)->l_lvb_type = LVB_T_LAYOUT;
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
        rc = req_capsule_server_pack(info->mti_pill);
-       GOTO(out_obj, rc);
+       if (rc)
+               GOTO(out_obj, rc);
+
+
+       if (layout_change) {
+               struct lu_buf *buf = &info->mti_buf;
+
+               /**
+                * mdt_layout_change is a reint operation, when the request
+                * is resent, layout write shouldn't reprocess it again.
+                */
+               rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+               if (rc)
+                       GOTO(out_obj, rc = rc < 0 ? rc : 0);
 
+               /**
+                * There is another resent case: the client's job has been
+                * done by another client, referring lod_declare_layout_change
+                * -EALREADY case, and it became a operation w/o transaction,
+                * so we should not do the layout change, otherwise
+                * mdt_layout_change() will try to cancel the granted server
+                * CR lock whose remote counterpart is still in hold on the
+                * client, and a deadlock ensues.
+                */
+               rc = mdt_check_resent_lock(info, obj, lhc);
+               if (rc <= 0)
+                       GOTO(out_obj, rc);
+
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               if (unlikely(req_is_replay(mdt_info_req(info)))) {
+                       buf->lb_buf = req_capsule_client_get(info->mti_pill,
+                                       &RMF_EADATA);
+                       buf->lb_len = req_capsule_get_size(info->mti_pill,
+                                       &RMF_EADATA, RCL_CLIENT);
+                       /*
+                        * If it's a replay of layout write intent RPC, the
+                        * client has saved the extended lovea when
+                        * it get reply then.
+                        */
+                       if (buf->lb_len > 0)
+                               mdt_fix_lov_magic(info, buf->lb_buf);
+               }
+
+               /*
+                * Instantiate some layout components, if @buf contains
+                * lovea, then it's a replay of the layout intent write
+                * RPC.
+                */
+               rc = mdt_layout_change(info, obj, layout, buf);
+               if (rc)
+                       GOTO(out_obj, rc);
+       }
 out_obj:
        mdt_object_put(info->mti_env, obj);
 
@@ -3631,6 +3855,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
                if (qmt == NULL)
                        RETURN(-EOPNOTSUPP);
 
+               if (mdt_rdonly(req->rq_export))
+                       RETURN(-EROFS);
+
                (*lockp)->l_lvb_type = LVB_T_LQUOTA;
                /* pass the request to quota master */
                rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
@@ -3647,8 +3874,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
        if (rc < 0)
                RETURN(rc);
 
-       if (flv->it_flags & MUTABOR &&
-           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+       if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
        if (flv->it_act != NULL) {
@@ -4505,12 +4731,12 @@ static struct tgt_opc_slice mdt_common_slice[] = {
 
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
-       struct md_device        *next = m->mdt_child;
-       struct lu_device        *d    = &m->mdt_lu_dev;
-       struct obd_device       *obd  = mdt2obd_dev(m);
-       struct lfsck_stop        stop;
-       ENTRY;
+       struct md_device *next = m->mdt_child;
+       struct lu_device *d = &m->mdt_lu_dev;
+       struct obd_device *obd = mdt2obd_dev(m);
+       struct lfsck_stop stop;
 
+       ENTRY;
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
@@ -4521,24 +4747,34 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        if (m->mdt_opts.mo_coordinator)
                mdt_hsm_cdt_stop(m);
 
-       mdt_hsm_cdt_fini(m);
-
        mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
-        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+       mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
 
        if (m->mdt_namespace != NULL)
                ldlm_namespace_free_prior(m->mdt_namespace, NULL,
                                          d->ld_obd->obd_force);
 
-        obd_exports_barrier(obd);
-        obd_zombie_barrier();
+       obd_exports_barrier(obd);
+       obd_zombie_barrier();
 
-        mdt_procfs_fini(m);
+       mdt_quota_fini(env, m);
 
-        tgt_fini(env, &m->mdt_lut);
-        mdt_fs_cleanup(env, m);
-        upcall_cache_cleanup(m->mdt_identity_cache);
-        m->mdt_identity_cache = NULL;
+       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+
+       /* Calling the cleanup functions in the same order as in the mdt_init0
+        * error path
+        */
+       mdt_procfs_fini(m);
+
+       target_recovery_fini(obd);
+       upcall_cache_cleanup(m->mdt_identity_cache);
+       m->mdt_identity_cache = NULL;
+
+       mdt_fs_cleanup(env, m);
+
+       tgt_fini(env, &m->mdt_lut);
+
+       mdt_hsm_cdt_fini(m);
 
        if (m->mdt_namespace != NULL) {
                ldlm_namespace_free_post(m->mdt_namespace);
@@ -4550,12 +4786,9 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                m->mdt_md_root = NULL;
        }
 
-       mdt_quota_fini(env, m);
-
-       cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
+       mdt_seq_fini(env, m);
 
-        mdt_seq_fini(env, m);
-        mdt_fld_fini(env, m);
+       mdt_fld_fini(env, m);
 
        /*
         * Finish the stack
@@ -4721,7 +4954,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_tgt, rc);
 
-       tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
+       tgt_adapt_sptlrpc_conf(&m->mdt_lut);
 
         next = m->mdt_child;
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
@@ -4918,6 +5151,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                spin_lock_init(&mo->mot_write_lock);
                mutex_init(&mo->mot_lov_mutex);
                init_rwsem(&mo->mot_open_sem);
+               atomic_set(&mo->mot_open_count, 0);
                RETURN(o);
        }
        RETURN(NULL);
@@ -5052,7 +5286,7 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        ENTRY;
 
        if (KEY_IS(KEY_SPTLRPC_CONF)) {
-               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp));
                RETURN(rc);
        }
 
@@ -5092,6 +5326,11 @@ static int mdt_connect_internal(struct obd_export *exp,
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
 
+       if (mdt->mdt_bottom->dd_rdonly &&
+           !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_RDONLY))
+               RETURN(-EACCES);
+
        if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
                data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
 
@@ -5112,16 +5351,13 @@ static int mdt_connect_internal(struct obd_export *exp,
        if (!mdt->mdt_opts.mo_user_xattr)
                data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
 
-       if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
-               data->ocd_brw_size = min(data->ocd_brw_size,
-                                        (__u32)MD_MAX_BRW_SIZE);
+       if (OCD_HAS_FLAG(data, BRW_SIZE)) {
+               data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE);
                if (data->ocd_brw_size == 0) {
-                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx"
-                              " ocd_version: %x ocd_grant: %d "
-                              "ocd_index: %u ocd_brw_size is "
-                              "unexpectedly zero, network data "
-                              "corruption? Refusing connection of this"
-                              " client\n",
+                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
+                              "ocd_version: %x ocd_grant: %d ocd_index: %u "
+                              "ocd_brw_size unexpectedly zero, network data "
+                              "corruption? Refusing to connect this client\n",
                               mdt_obd_name(mdt),
                               exp->exp_client_uuid.uuid,
                               exp, data->ocd_connect_flags, data->ocd_version,
@@ -5266,7 +5502,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
                                rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
 
                        /* Don't unlink orphan on failover umount, LU-184 */
-                       if (exp->exp_flags & OBD_OPT_FAILOVER) {
+                       if (exp->exp_obd->obd_fail) {
                                ma->ma_valid = MA_FLAGS;
                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                        }
@@ -5275,9 +5511,7 @@ static int mdt_export_cleanup(struct obd_export *exp)
         }
         info->mti_mdt = NULL;
         /* cleanup client slot early */
-        /* Do not erase record for recoverable client. */
-        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
-               tgt_client_del(&env, exp);
+       tgt_client_del(&env, exp);
         lu_env_fini(&env);
 
         RETURN(rc);
@@ -5515,7 +5749,7 @@ int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
        if (rc < 0)
                return rc;
 
-       return linkea_init(ldata);
+       return linkea_init_with_rec(ldata);
 }
 
 /**
@@ -5569,9 +5803,14 @@ static int mdt_path_current(struct mdt_thread_info *info,
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
 
-               mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
-               if (IS_ERR(mdt_obj))
-                       GOTO(out, rc = PTR_ERR(mdt_obj));
+               if (lu_fid_eq(mdt_object_fid(obj), tmpfid)) {
+                       mdt_obj = obj;
+                       mdt_object_get(info->mti_env, mdt_obj);
+               } else {
+                       mdt_obj = mdt_object_find(info->mti_env, mdt, tmpfid);
+                       if (IS_ERR(mdt_obj))
+                               GOTO(out, rc = PTR_ERR(mdt_obj));
+               }
 
                if (!mdt_object_exists(mdt_obj)) {
                        mdt_object_put(info->mti_env, mdt_obj);
@@ -5844,30 +6083,6 @@ int mdt_get_info(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
-/* Pass the ioc down */
-static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
-                        unsigned int cmd, int len, void *data)
-{
-       struct lu_context ioctl_session;
-       struct md_device *next = mdt->mdt_child;
-       int rc;
-       ENTRY;
-
-        rc = lu_context_init(&ioctl_session, LCT_SERVER_SESSION);
-       if (rc)
-               RETURN(rc);
-       ioctl_session.lc_thread = (struct ptlrpc_thread *)current;
-       lu_context_enter(&ioctl_session);
-       env->le_ses = &ioctl_session;
-
-       LASSERT(next->md_ops->mdo_iocontrol);
-       rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
-
-       lu_context_exit(&ioctl_session);
-       lu_context_fini(&ioctl_session);
-       RETURN(rc);
-}
-
 static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 {
        struct obd_ioctl_data *data = karg;
@@ -5949,7 +6164,9 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
-                rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+               rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
+                                                          mdt->mdt_child,
+                                                          cmd, len, karg);
                 break;
        case OBD_IOC_START_LFSCK: {
                struct md_device *next = mdt->mdt_child;
@@ -6026,7 +6243,7 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
        int rc;
        ENTRY;
 
-       if (!mdt->mdt_skip_lfsck) {
+       if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
                struct lfsck_start_param lsp;
 
                lsp.lsp_start = NULL;
@@ -6126,6 +6343,13 @@ static void mdt_key_fini(const struct lu_context *ctx,
                info->mti_big_lmm = NULL;
                info->mti_big_lmmsize = 0;
        }
+
+       if (info->mti_big_acl) {
+               OBD_FREE_LARGE(info->mti_big_acl, info->mti_big_aclsize);
+               info->mti_big_acl = NULL;
+               info->mti_big_aclsize = 0;
+       }
+
        OBD_FREE_PTR(info);
 }