Whamcloud - gitweb
LU-8305 tests: add traces for sanity-sec
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 7f13b08..0b31287 100644 (file)
@@ -49,6 +49,7 @@
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#include <linux/pagemap.h>
 
 #include <dt_object.h>
 #include <lustre_acl.h>
@@ -356,8 +357,10 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
        struct mdt_device       *mdt = info->mti_mdt;
        struct mdt_body         *repbody;
-       char                    *fileset;
+       char                    *fileset = NULL, *buffer = NULL;
        int                      rc;
+       struct obd_export       *exp = info->mti_exp;
+       char                    *nodemap_fileset;
 
        ENTRY;
 
@@ -373,7 +376,29 @@ static int mdt_get_root(struct tgt_session_info *tsi)
                fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
                if (fileset == NULL)
                        GOTO(out, rc = err_serious(-EFAULT));
+       }
 
+       nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
+       if (nodemap_fileset && nodemap_fileset[0]) {
+               CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
+               if (fileset) {
+                       /* consider fileset from client as a sub-fileset
+                        * of the nodemap one */
+                       OBD_ALLOC(buffer, PATH_MAX + 1);
+                       if (buffer == NULL)
+                               GOTO(out, rc = err_serious(-ENOMEM));
+                       if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
+                                    nodemap_fileset, fileset) >= PATH_MAX + 1)
+                               GOTO(out, rc = err_serious(-EINVAL));
+                       fileset = buffer;
+               } else {
+                       /* enforce fileset as specified in the nodemap */
+                       fileset = nodemap_fileset;
+               }
+       }
+
+       if (fileset) {
+               CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
                rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
                if (rc < 0)
                        GOTO(out, rc = err_serious(rc));
@@ -385,6 +410,8 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        EXIT;
 out:
        mdt_thread_info_fini(info);
+       if (buffer)
+               OBD_FREE(buffer, PATH_MAX+1);
        return rc;
 }
 
@@ -424,15 +451,15 @@ static int mdt_statfs(struct tgt_session_info *tsi)
                rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
                if (rc)
                        GOTO(out, rc);
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                info->mti_mdt->mdt_osfs = *osfs;
                info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        } else {
                /** use cached statfs data */
-               spin_lock(&info->mti_mdt->mdt_osfs_lock);
+               spin_lock(&info->mti_mdt->mdt_lock);
                *osfs = info->mti_mdt->mdt_osfs;
-               spin_unlock(&info->mti_mdt->mdt_osfs_lock);
+               spin_unlock(&info->mti_mdt->mdt_lock);
        }
 
        if (rc == 0)
@@ -501,9 +528,9 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                         const struct lu_attr *attr, const struct lu_fid *fid)
 {
-       struct md_attr          *ma = &info->mti_attr;
-       struct obd_export       *exp = info->mti_exp;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
+       struct md_attr *ma = &info->mti_attr;
+       struct obd_export *exp = info->mti_exp;
+       struct lu_nodemap *nodemap = NULL;
 
        LASSERT(ma->ma_valid & MA_INODE);
 
@@ -527,6 +554,11 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_nlink = attr->la_nlink;
                b->mbo_valid |= OBD_MD_FLNLINK;
        }
+       if (attr->la_valid & (LA_UID|LA_GID)) {
+               nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       goto out;
+       }
        if (attr->la_valid & LA_UID) {
                b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
                                            NODEMAP_FS_TO_CLIENT,
@@ -539,6 +571,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                                            attr->la_gid);
                b->mbo_valid |= OBD_MD_FLGID;
        }
+
        b->mbo_mode = attr->la_mode;
        if (attr->la_valid & LA_MODE)
                b->mbo_valid |= OBD_MD_FLMODE;
@@ -552,9 +585,6 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                       PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
        }
 
-       if (info != NULL)
-               mdt_body_reverse_idmap(info, b);
-
        if (!(attr->la_valid & LA_TYPE))
                return;
 
@@ -588,6 +618,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
                CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
                       PFID(fid), (unsigned long long)b->mbo_size);
+
+out:
+       if (!IS_ERR_OR_NULL(nodemap))
+               nodemap_putref(nodemap);
 }
 
 static inline int mdt_body_has_lov(const struct lu_attr *la,
@@ -915,7 +949,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
        struct mdt_body         *repbody;
        struct lu_buf           *buffer = &info->mti_buf;
        struct obd_export       *exp = info->mti_exp;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
        int                      rc;
        int                      is_root;
        ENTRY;
@@ -984,7 +1017,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CERROR("%s: getattr error for "DFID": rc = %d\n",
+               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
+                      "%s: getattr error for "DFID": rc = %d\n",
                       mdt_obd_name(info->mti_mdt),
                       PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
@@ -1105,25 +1139,16 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                       repbody->mbo_max_mdsize);
        }
 
-       if (exp_connect_rmtclient(info->mti_exp) &&
-           reqbody->mbo_valid & OBD_MD_FLRMTPERM) {
-               void *buf = req_capsule_server_get(pill, &RMF_ACL);
-
-               /* mdt_getattr_lock only */
-               rc = mdt_pack_remote_perm(info, o, buf);
-               if (rc) {
-                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = 0;
-                       RETURN(rc);
-               } else {
-                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
-               }
-       }
 #ifdef CONFIG_FS_POSIX_ACL
-       else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
-                (reqbody->mbo_valid & OBD_MD_FLACL))
+       if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
+                (reqbody->mbo_valid & OBD_MD_FLACL)) {
+               struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+
                rc = mdt_pack_acl2body(info, repbody, o, nodemap);
+               nodemap_putref(nodemap);
+       }
 #endif
 
 out:
@@ -1188,18 +1213,13 @@ static int mdt_getattr(struct tgt_session_info *tsi)
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
-       if (reqbody->mbo_valid & OBD_MD_FLRMTPERM)
-               rc = mdt_init_ucred(info, reqbody);
-       else
-               rc = mdt_check_ucred(info);
+       rc = mdt_check_ucred(info);
        if (unlikely(rc))
                GOTO(out_shrink, rc);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
 
        rc = mdt_getattr_internal(info, obj, 0);
-       if (reqbody->mbo_valid & OBD_MD_FLRMTPERM)
-                mdt_exit_ucred(info);
         EXIT;
 out_shrink:
         mdt_client_compatibility(info);
@@ -1221,17 +1241,13 @@ out:
  */
 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
 {
-       __u64   o1_flags;
+       unsigned int o1_lov_created = o1->mot_lov_created;
 
        mutex_lock(&o1->mot_lov_mutex);
        mutex_lock(&o2->mot_lov_mutex);
 
-       o1_flags = o1->mot_flags;
-       o1->mot_flags = (o1->mot_flags & ~MOF_LOV_CREATED) |
-                       (o2->mot_flags & MOF_LOV_CREATED);
-
-       o2->mot_flags = (o2->mot_flags & ~MOF_LOV_CREATED) |
-                       (o1_flags & MOF_LOV_CREATED);
+       o1->mot_lov_created = o2->mot_lov_created;
+       o2->mot_lov_created = o1_lov_created;
 
        mutex_unlock(&o2->mot_lov_mutex);
        mutex_unlock(&o1->mot_lov_mutex);
@@ -1803,6 +1819,26 @@ free_rdpg:
        return rc;
 }
 
+static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
+{
+       struct lu_ucred *uc = mdt_ucred_check(info);
+       struct lu_attr *attr = &info->mti_attr.ma_attr;
+
+       if (uc == NULL)
+               return -EINVAL;
+
+       if (op != REINT_SETATTR) {
+               if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
+                       attr->la_uid = uc->uc_fsuid;
+               /* for S_ISGID, inherit gid from his parent, such work will be
+                * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
+               if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
+                       attr->la_gid = uc->uc_fsgid;
+       }
+
+       return 0;
+}
+
 static int mdt_reint_internal(struct mdt_thread_info *info,
                               struct mdt_lock_handle *lhc,
                               __u32 op)
@@ -2021,7 +2057,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        int                      id, rc;
        struct mdt_device       *mdt = mdt_exp2dev(exp);
        struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap = exp->exp_target_data.ted_nodemap;
+       struct lu_nodemap       *nodemap;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
@@ -2032,51 +2068,30 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (rc)
                RETURN(err_serious(rc));
 
+       nodemap = nodemap_get_from_exp(exp);
+       if (IS_ERR(nodemap))
+               RETURN(PTR_ERR(nodemap));
+
        switch (oqctl->qc_cmd) {
                /* master quotactl */
        case Q_SETINFO:
        case Q_SETQUOTA:
                if (!nodemap_can_setquota(nodemap))
-                       RETURN(-EPERM);
+                       GOTO(out_nodemap, rc = -EPERM);
        case Q_GETINFO:
        case Q_GETQUOTA:
                if (qmt == NULL)
-                       RETURN(-EOPNOTSUPP);
+                       GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
        case Q_GETOINFO:
        case Q_GETOQUOTA:
                break;
        default:
                CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               RETURN(-EFAULT);
+               GOTO(out_nodemap, rc = -EFAULT);
        }
 
-       /* map uid/gid for remote client */
        id = oqctl->qc_id;
-       if (exp_connect_rmtclient(exp)) {
-               struct lustre_idmap_table *idmap;
-
-               idmap = exp->exp_mdt_data.med_idmap;
-
-               if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
-                            oqctl->qc_cmd != Q_GETINFO))
-                       RETURN(-EPERM);
-
-               if (oqctl->qc_type == USRQUOTA)
-                       id = lustre_idmap_lookup_uid(NULL, idmap, 0,
-                                                    oqctl->qc_id);
-               else if (oqctl->qc_type == GRPQUOTA)
-                       id = lustre_idmap_lookup_gid(NULL, idmap, 0,
-                                                    oqctl->qc_id);
-               else
-                       RETURN(-EINVAL);
-
-               if (id == CFS_IDMAP_NOTFOUND) {
-                       CDEBUG(D_QUOTA, "no mapping for id %u\n", oqctl->qc_id);
-                       RETURN(-EACCES);
-               }
-       }
-
        if (oqctl->qc_type == USRQUOTA)
                id = nodemap_map_id(nodemap, NODEMAP_UID,
                                    NODEMAP_CLIENT_TO_FS, id);
@@ -2086,7 +2101,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
 
        repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
        if (repoqc == NULL)
-               RETURN(err_serious(-EFAULT));
+               GOTO(out_nodemap, rc = err_serious(-EFAULT));
 
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
@@ -2110,14 +2125,20 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
 
        default:
                CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               RETURN(-EFAULT);
+               GOTO(out_nodemap, rc = -EFAULT);
        }
 
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
        *repoqc = *oqctl;
-       RETURN(rc);
+
+       EXIT;
+
+out_nodemap:
+       nodemap_putref(nodemap);
+
+       return rc;
 }
 
 /** clone llog ctxt from child (mdd)
@@ -2169,21 +2190,9 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env,
  */
 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
 {
-       int rc;
-
-       rc = mdt_handle_idmap(tsi);
-       if (unlikely(rc)) {
-               struct ptlrpc_request   *req = tgt_ses_req(tsi);
-               __u32                    opc;
-
-               opc = lustre_msg_get_opc(req->rq_reqmsg);
-               if (opc == SEC_CTX_INIT || opc == SEC_CTX_INIT_CONT)
-                       sptlrpc_svc_ctx_invalidate(req);
-       }
-
        CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
 
-       return rc;
+       return 0;
 }
 
 /*
@@ -2366,12 +2375,13 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag)
 {
-       struct lustre_handle lockh;
-       int               rc;
+       int rc = 0;
        ENTRY;
 
        switch (flag) {
-       case LDLM_CB_BLOCKING:
+       case LDLM_CB_BLOCKING: {
+               struct lustre_handle lockh;
+
                ldlm_lock2handle(lock, &lockh);
                rc = ldlm_cli_cancel(&lockh,
                        ldlm_is_atomic_cb(lock) ? 0 : LCF_ASYNC);
@@ -2380,17 +2390,46 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                        RETURN(rc);
                }
                break;
-       case LDLM_CB_CANCELING:
-               LDLM_DEBUG(lock, "Revoke remote lock");
+       }
+       case LDLM_CB_CANCELING: {
+               LDLM_DEBUG(lock, "Revoke remote lock\n");
+
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
                tgt_discard_slc_lock(lock);
+
+               /* once we cache lock, l_ast_data is set to mdt_object */
+               if (lock->l_ast_data != NULL) {
+                       struct mdt_object *mo = lock->l_ast_data;
+                       struct lu_env env;
+
+                       rc = lu_env_init(&env, LCT_MD_THREAD);
+                       if (unlikely(rc != 0)) {
+                               struct obd_device *obd;
+
+                               obd = ldlm_lock_to_ns(lock)->ns_obd;
+                               CWARN("%s: lu_env initialization failed, object"
+                                     "%p "DFID" is leaked!\n",
+                                     obd->obd_name, mo,
+                                     PFID(mdt_object_fid(mo)));
+                               RETURN(rc);
+                       }
+
+                       if (lock->l_policy_data.l_inodebits.bits &
+                           (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)) {
+                               rc = mo_invalidate(&env, mdt_object_child(mo));
+                               mo->mot_cache_attr = 0;
+                       }
+                       mdt_object_put(&env, mo);
+                       lu_env_fini(&env);
+               }
                break;
+       }
        default:
                LBUG();
        }
 
-       RETURN(0);
+       RETURN(rc);
 }
 
 int mdt_check_resent_lock(struct mdt_thread_info *info,
@@ -2430,7 +2469,8 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
 
 int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
                           const struct lu_fid *fid, struct lustre_handle *lh,
-                          enum ldlm_mode mode, __u64 ibits, bool nonblock)
+                          enum ldlm_mode mode, __u64 ibits, bool nonblock,
+                          bool cache)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
        union ldlm_policy_data *policy = &mti->mti_policy;
@@ -2451,12 +2491,24 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
        einfo->ei_res_id = res_id;
        if (nonblock)
                einfo->ei_nonblock = 1;
+       if (cache) {
+               /*
+                * if we cache lock, couple lock with mdt_object, so that object
+                * can be easily found in lock ASTs.
+                */
+               mdt_object_get(mti->mti_env, o);
+               einfo->ei_cbdata = o;
+       }
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
        rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
                            policy);
+       if (rc < 0 && cache) {
+               mdt_object_put(mti->mti_env, o);
+               einfo->ei_cbdata = NULL;
+       }
        RETURN(rc);
 }
 
@@ -2605,7 +2657,8 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
                                            &lh->mlh_rreg_lh,
                                            lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE, nonblock);
+                                           MDS_INODELOCK_UPDATE, nonblock,
+                                           false);
                if (rc != ELDLM_OK) {
                        if (local_lh != NULL)
                                mdt_object_unlock(info, o, local_lh, rc);
@@ -2733,17 +2786,24 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
  * \param decref force immediate lock releasing
  */
 static void mdt_save_remote_lock(struct mdt_thread_info *info,
-                                struct lustre_handle *h, enum ldlm_mode mode,
-                                int decref)
+                                struct mdt_object *o, struct lustre_handle *h,
+                                enum ldlm_mode mode, int decref)
 {
        ENTRY;
 
        if (lustre_handle_is_used(h)) {
+               struct ldlm_lock *lock = ldlm_handle2lock(h);
+
+               if (o != NULL &&
+                   (lock->l_policy_data.l_inodebits.bits &
+                    (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
+                       mo_invalidate(info->mti_env, mdt_object_child(o));
+
                if (decref || !info->mti_has_trans ||
                    !(mode & (LCK_PW | LCK_EX))) {
                        ldlm_lock_decref_and_cancel(h, mode);
+                       LDLM_LOCK_PUT(lock);
                } else {
-                       struct ldlm_lock *lock = ldlm_handle2lock(h);
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
@@ -2775,7 +2835,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
 
        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
-       mdt_save_remote_lock(info, &lh->mlh_rreg_lh, lh->mlh_rreg_mode, decref);
+       mdt_save_remote_lock(info, o, &lh->mlh_rreg_lh, lh->mlh_rreg_mode,
+                            decref);
 
        EXIT;
 }
@@ -2985,11 +3046,6 @@ struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi)
 
 static int mdt_tgt_connect(struct tgt_session_info *tsi)
 {
-       struct ptlrpc_request   *req = tgt_ses_req(tsi);
-       int                      rc;
-
-       ENTRY;
-
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
            cfs_fail_val ==
            tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
@@ -2997,17 +3053,7 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi)
                schedule_timeout(msecs_to_jiffies(3 * MSEC_PER_SEC));
        }
 
-       rc = tgt_connect(tsi);
-       if (rc != 0)
-               RETURN(rc);
-
-       rc = mdt_init_idmap(tsi);
-       if (rc != 0)
-               GOTO(err, rc);
-       RETURN(0);
-err:
-       obd_disconnect(class_export_get(req->rq_export));
-       return rc;
+       return tgt_connect(tsi);
 }
 
 enum mdt_it_code {
@@ -3121,7 +3167,7 @@ static int
 mdt_intent_lock_replace(struct mdt_thread_info *info,
                        struct ldlm_lock **lockp,
                        struct mdt_lock_handle *lh,
-                       __u64 flags)
+                       __u64 flags, int result)
 {
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct ldlm_lock       *lock = *lockp;
@@ -3135,8 +3181,19 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
                 RETURN(0);
         }
 
-        LASSERTF(new_lock != NULL,
-                 "lockh "LPX64"\n", lh->mlh_reg_lh.cookie);
+       if (new_lock == NULL && (flags & LDLM_FL_RESENT)) {
+               /* Lock is pinned by ldlm_handle_enqueue0() as it is
+                * a resend case, however, it could be already destroyed
+                * due to client eviction or a raced cancel RPC. */
+               LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64"\n",
+                                 lh->mlh_reg_lh.cookie);
+               lh->mlh_reg_lh.cookie = 0;
+               RETURN(-ESTALE);
+       }
+
+       LASSERTF(new_lock != NULL,
+                "lockh "LPX64" flags "LPX64" rc %d\n",
+                lh->mlh_reg_lh.cookie, flags, result);
 
         /*
          * If we've already given this lock to a client once, then we should
@@ -3270,7 +3327,7 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 
        grc = mdt_getxattr(info);
 
-       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, 0);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -3339,7 +3396,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
         }
 
-       rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
         EXIT;
 out_ucred:
         mdt_exit_ucred(info);
@@ -3409,7 +3466,7 @@ out_obj:
        mdt_object_put(info->mti_env, obj);
 
        if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh))
-               rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
+               rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
 
 out:
        lhc->mlh_reg_lh.cookie = 0;
@@ -3464,7 +3521,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
        if (lustre_handle_is_used(&lhc->mlh_reg_lh) &&
            (rc == 0 || rc == -MDT_EREMOTE_OPEN)) {
                rep->lock_policy_res2 = 0;
-               rc = mdt_intent_lock_replace(info, lockp, lhc, flags);
+               rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
                RETURN(rc);
        }
 
@@ -4441,6 +4498,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        struct lfsck_stop        stop;
        ENTRY;
 
+       if (m->mdt_md_root != NULL) {
+               mdt_object_put(env, m->mdt_md_root);
+               m->mdt_md_root = NULL;
+       }
+
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
@@ -4561,7 +4623,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        init_rwsem(&m->mdt_squash.rsi_sem);
-       spin_lock_init(&m->mdt_osfs_lock);
+       spin_lock_init(&m->mdt_lock);
        m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
        m->mdt_enable_remote_dir_gid = 0;
@@ -4897,8 +4959,10 @@ static int mdt_object_print(const struct lu_env *env, void *cookie,
        struct mdt_object *mdto = mdt_obj((struct lu_object *)o);
 
        return (*p)(env, cookie,
-                   LUSTRE_MDT_NAME"-object@%p(flags=%d, writecount=%d)",
-                   mdto, mdto->mot_flags, mdto->mot_write_count);
+                   LUSTRE_MDT_NAME"-object@%p(%s %s, writecount=%d)",
+                   mdto, mdto->mot_lov_created ? "lov_created" : "",
+                   mdto->mot_cache_attr ? "cache_attr" : "",
+                   mdto->mot_write_count);
 }
 
 static int mdt_prepare(const struct lu_env *env,
@@ -5011,6 +5075,7 @@ static int mdt_connect_internal(struct obd_export *exp,
        LASSERT(data != NULL);
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
+       data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
        data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
        if (!(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
@@ -5252,6 +5317,8 @@ static int mdt_obd_connect(const struct lu_env *env,
        ENTRY;
 
        LASSERT(env != NULL);
+       LASSERT(data != NULL);
+
        if (!exp || !obd || !cluuid)
                RETURN(-EINVAL);
 
@@ -5268,7 +5335,7 @@ static int mdt_obd_connect(const struct lu_env *env,
         * XXX: probably not very appropriate method is used now
         *      at some point we should find a better one
         */
-       if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL &&
+       if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) &&
            !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) &&
            !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
                rc = obd_get_info(env, mdt->mdt_child_exp,
@@ -5354,8 +5421,6 @@ static int mdt_init_export(struct obd_export *exp)
 
        INIT_LIST_HEAD(&med->med_open_head);
        spin_lock_init(&med->med_open_lock);
-       mutex_init(&med->med_idmap_mutex);
-       med->med_idmap = NULL;
        spin_lock(&exp->exp_lock);
        exp->exp_connecting = 1;
        spin_unlock(&exp->exp_lock);
@@ -5387,9 +5452,6 @@ static int mdt_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
-        if (exp_connect_rmtclient(exp))
-                mdt_cleanup_idmap(&exp->exp_mdt_data);
-
         target_destroy_export(exp);
         /* destroy can be called from failed obd_setup, so
          * checking uuid is safer than obd_self_export */
@@ -5850,13 +5912,15 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         if (rc)
                 RETURN(rc);
 
-        switch (cmd) {
-        case OBD_IOC_SYNC:
-                rc = mdt_device_sync(&env, mdt);
-                break;
-        case OBD_IOC_SET_READONLY:
-                rc = dt->dd_ops->dt_ro(&env, dt);
-                break;
+       switch (cmd) {
+       case OBD_IOC_SYNC:
+               rc = mdt_device_sync(&env, mdt);
+               break;
+       case OBD_IOC_SET_READONLY:
+               rc = dt_sync(&env, dt);
+               if (rc == 0)
+                       rc = dt_ro(&env, dt);
+               break;
        case OBD_IOC_ABORT_RECOVERY:
                CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
                obd->obd_abort_recovery = 1;