Whamcloud - gitweb
LU-354 test: Change dev_set_rdonly() check to warning
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index e1cdd5c..55ab803 100644 (file)
@@ -28,9 +28,8 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -213,7 +212,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
         lh->mlh_reg_mode = lm;
         lh->mlh_type = MDT_PDO_LOCK;
 
-        if (name != NULL) {
+        if (name != NULL && (name[0] != '\0')) {
                 LASSERT(namelen > 0);
                 lh->mlh_pdo_hash = full_name_hash(name, namelen);
         } else {
@@ -469,6 +468,32 @@ static inline int mdt_body_has_lov(const struct lu_attr *la,
                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
 }
 
+void mdt_client_compatibility(struct mdt_thread_info *info)
+{
+        struct mdt_body       *body;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct obd_export     *exp = req->rq_export;
+        struct md_attr        *ma = &info->mti_attr;
+        struct lu_attr        *la = &ma->ma_attr;
+        ENTRY;
+
+        if (exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK)
+                /* the client can deal with 16-bit lmm_stripe_count */
+                RETURN_EXIT;
+
+        body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+
+        if (!mdt_body_has_lov(la, body))
+                RETURN_EXIT;
+
+        /* now we have a reply with a lov for a client not compatible with the
+         * layout lock so we have to clean the layout generation number */
+        if (S_ISREG(la->la_mode))
+                ma->ma_lmm->lmm_layout_gen = 0;
+        EXIT;
+}
+
+
 static int mdt_getattr_internal(struct mdt_thread_info *info,
                                 struct mdt_object *o, int ma_need)
 {
@@ -740,6 +765,7 @@ out_shrink:
         if (rc == 0)
                 mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
 
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -789,6 +815,7 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
         LASSERT(!info->mti_cross_ref);
 
         /* Only got the fid of this obj by name */
+        fid_zero(child_fid);
         rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                         &info->mti_spec);
 #if 0
@@ -869,8 +896,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         name = NULL;
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                                "ldlm_rep = %p\n",
-                               PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
-                               ldlm_rep);
+                               PFID(mdt_object_fid(parent)),
+                               PFID(&reqbody->fid2), ldlm_rep);
                 } else {
                         lname = mdt_name(info->mti_env, (char *)name, namelen);
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
@@ -941,21 +968,26 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
 
         if (lname) {
-                /* step 1: lock parent */
-                lhp = &info->mti_lh[MDT_LH_PARENT];
-                mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
-                rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                     MDT_LOCAL_LOCK);
-                if (unlikely(rc != 0))
-                        RETURN(rc);
+                /* step 1: lock parent only if parent is a directory */
+                if (S_ISDIR(lu_object_attr(&parent->mot_obj.mo_lu))) {
+                        lhp = &info->mti_lh[MDT_LH_PARENT];
+                        mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+                        rc = mdt_object_lock(info, parent, lhp,
+                                             MDS_INODELOCK_UPDATE,
+                                             MDT_LOCAL_LOCK);
+                        if (unlikely(rc != 0))
+                                RETURN(rc);
+                }
 
                 /* step 2: lookup child's fid by name */
+                fid_zero(child_fid);
                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                                 &info->mti_spec);
 
                 if (rc != 0) {
                         if (rc == -ENOENT)
-                                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                                mdt_set_disposition(info, ldlm_rep,
+                                                    DISP_LOOKUP_NEG);
                         GOTO(out_parent, rc);
                 } else
                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
@@ -1002,10 +1034,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 relock:
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
-                mdt_lock_reg_init(lhc, LCK_PR);
+                if (child_bits == MDS_INODELOCK_LAYOUT)
+                        mdt_lock_reg_init(lhc, LCK_CR);
+                else
+                        mdt_lock_reg_init(lhc, LCK_PR);
 
                 if (mdt_object_exists(child) == 0) {
-                        LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+                        LU_OBJECT_DEBUG(D_INODE, info->mti_env,
                                         &child->mot_obj.mo_lu,
                                         "Object doesn't exist!\n");
                         GOTO(out_child, rc = -ENOENT);
@@ -1021,6 +1056,12 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
+                        /* layout lock is used only on regular files */
+                        if ((ma->ma_valid & MA_INODE) &&
+                            (ma->ma_attr.la_valid & LA_MODE) &&
+                            !S_ISREG(ma->ma_attr.la_mode))
+                                child_bits &= ~MDS_INODELOCK_LAYOUT;
+
                         /* If the file has not been changed for some time, we
                          * return not only a LOOKUP lock, but also an UPDATE
                          * lock and this might save us RPC on later STAT. For
@@ -1108,6 +1149,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         mdt_exit_ucred(info);
         EXIT;
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -1562,6 +1604,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
 out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -2239,7 +2282,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
                  lh->mlh_pdo_hash != 0 &&
                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) {
-                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 10);
+                OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
         }
 
         RETURN(rc);
@@ -3048,6 +3091,7 @@ enum mdt_it_code {
         MDT_IT_UNLINK,
         MDT_IT_TRUNC,
         MDT_IT_GETXATTR,
+        MDT_IT_LAYOUT,
         MDT_IT_NR
 };
 
@@ -3118,6 +3162,11 @@ static struct mdt_it_flavor {
                 .it_fmt   = NULL,
                 .it_flags = 0,
                 .it_act   = NULL
+        },
+        [MDT_IT_LAYOUT] = {
+                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
+                .it_flags = HABEO_REFERO,
+                .it_act   = mdt_intent_getattr
         }
 };
 
@@ -3293,8 +3342,18 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         case MDT_IT_GETATTR:
                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
+        case MDT_IT_LAYOUT: {
+                static int printed = 0;
+
+                if (!printed) {
+                        CERROR("layout lock not supported by this version\n");
+                        printed = 1;
+                }
+                GOTO(out_shrink, rc = -EINVAL);
+                break;
+        }
         default:
-                CERROR("Unhandled till now");
+                CERROR("Unsupported intent (%d)\n", opcode);
                 GOTO(out_shrink, rc = -EINVAL);
         }
 
@@ -3325,6 +3384,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
 out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
+        mdt_client_compatibility(info);
         mdt_shrink_reply(info);
         return rc;
 }
@@ -3448,6 +3508,9 @@ static int mdt_intent_code(long itcode)
         case IT_GETXATTR:
                 rc = MDT_IT_GETXATTR;
                 break;
+        case IT_LAYOUT:
+                rc = MDT_IT_LAYOUT;
+                break;
         default:
                 CERROR("Unknown intent opcode: %ld\n", itcode);
                 rc = -EINVAL;
@@ -3912,6 +3975,10 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         /*
          * setattr service configuration.
+         *
+         * XXX To keep the compatibility with old client(< 2.2), we need to
+         * preserve this portal for a certain time, it should be removed
+         * eventually. LU-617.
          */
         conf = (typeof(conf)) {
                 .psc_nbufs           = MDS_NBUFS,
@@ -4181,8 +4248,11 @@ static struct lu_device *mdt_layer_setup(struct lu_env *env,
         lu_device_get(d);
         lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
 
-        RETURN(d);
+        cfs_spin_lock(&d->ld_site->ls_ld_lock);
+        cfs_list_add_tail(&d->ld_linkage, &d->ld_site->ls_ld_linkage);
+        cfs_spin_unlock(&d->ld_site->ls_ld_lock);
 
+        RETURN(d);
 out_alloc:
         ldt->ldt_ops->ldto_device_free(env, d);
         type->typ_refcnt--;
@@ -4319,6 +4389,9 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         mdt_obd_llog_cleanup(obd);
         obd_exports_barrier(obd);
         obd_zombie_barrier();
+
+        mdt_procfs_fini(m);
+
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
 #endif
@@ -4354,10 +4427,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
          */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
-        lprocfs_free_per_client_stats(obd);
-        lprocfs_free_obd_stats(obd);
-        mdt_procfs_fini(m);
-
         if (ls) {
                 struct md_site *mite;
 
@@ -4395,52 +4464,6 @@ static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
         return 0;
 }
 
-static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
-{
-        char *p = options;
-
-        m->mdt_opts.mo_mds_capa = 1;
-        m->mdt_opts.mo_oss_capa = 1;
-#ifdef CONFIG_FS_POSIX_ACL
-        /* ACLs should be enabled by default (b=13829) */
-        m->mdt_opts.mo_acl = 1;
-        LCONSOLE_INFO("Enabling ACL\n");
-#else
-        m->mdt_opts.mo_acl = 0;
-        LCONSOLE_INFO("Disabling ACL\n");
-#endif
-
-        if (!options)
-                return;
-
-        while (*options) {
-                int len;
-
-                while (*p && *p != ',')
-                        p++;
-
-                len = p - options;
-                if ((len == sizeof("user_xattr") - 1) &&
-                    (memcmp(options, "user_xattr", len) == 0)) {
-                        m->mdt_opts.mo_user_xattr = 1;
-                        LCONSOLE_INFO("Enabling user_xattr\n");
-                } else if ((len == sizeof("nouser_xattr") - 1) &&
-                           (memcmp(options, "nouser_xattr", len) == 0)) {
-                        m->mdt_opts.mo_user_xattr = 0;
-                        LCONSOLE_INFO("Disabling user_xattr\n");
-                } else if ((len == sizeof("noacl") - 1) &&
-                           (memcmp(options, "noacl", len) == 0)) {
-                        m->mdt_opts.mo_acl = 0;
-                        LCONSOLE_INFO("Disabling ACL\n");
-                }
-
-                if (!*p)
-                        break;
-
-                options = ++p;
-        }
-}
-
 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
 
 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
@@ -4456,11 +4479,10 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         struct lu_site            *s;
         struct md_site            *mite;
         const char                *identity_upcall = "NONE";
-#ifdef HAVE_QUOTA_SUPPORT
         struct md_device          *next;
-#endif
         int                        rc;
         int                        node_id;
+        mntopt_t                   mntopts;
         ENTRY;
 
         md_device_init(&m->mdt_md_dev, ldt);
@@ -4486,8 +4508,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
         m->mdt_som_conf = 0;
 
-        m->mdt_opts.mo_user_xattr = 0;
-        m->mdt_opts.mo_acl = 0;
         m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
         lmi = server_get_mount_2(dev);
         if (lmi == NULL) {
@@ -4495,7 +4515,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 RETURN(-EFAULT);
         } else {
                 lsi = s2lsi(lmi->lmi_sb);
-                fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
                 /* CMD is supported only in IAM mode */
                 ldd = lsi->lsi_ldd;
                 LASSERT(num);
@@ -4513,6 +4532,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         cfs_spin_lock_init(&m->mdt_ioepoch_lock);
         m->mdt_opts.mo_compat_resname = 0;
+        m->mdt_opts.mo_mds_capa = 1;
+        m->mdt_opts.mo_oss_capa = 1;
         m->mdt_capa_timeout = CAPA_TIMEOUT;
         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
@@ -4540,12 +4561,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_free_site, rc);
         }
 
-        rc = mdt_procfs_init(m, dev);
-        if (rc) {
-                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
-                GOTO(err_fini_proc, rc);
-        }
-
         /* set server index */
         lu_site2md(s)->ms_node_id = node_id;
 
@@ -4569,7 +4584,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
         if (rc) {
                 CERROR("Can't init device stack, rc %d\n", rc);
-                GOTO(err_fini_proc, rc);
+                GOTO(err_lu_site, rc);
         }
 
         rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
@@ -4597,19 +4612,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         /* set obd_namespace for compatibility with old code */
         obd->obd_namespace = m->mdt_namespace;
 
-        /* XXX: to support suppgid for ACL, we enable identity_upcall
-         * by default, otherwise, maybe got unexpected -EACCESS. */
-        if (m->mdt_opts.mo_acl)
-                identity_upcall = MDT_IDENTITY_UPCALL_PATH;
-
-        m->mdt_identity_cache = upcall_cache_init(obd->obd_name, identity_upcall,
-                                                  &mdt_identity_upcall_cache_ops);
-        if (IS_ERR(m->mdt_identity_cache)) {
-                rc = PTR_ERR(m->mdt_identity_cache);
-                m->mdt_identity_cache = NULL;
-                GOTO(err_free_ns, rc);
-        }
-
         cfs_timer_init(&m->mdt_ck_timer, mdt_ck_timer_callback, m);
 
         rc = mdt_ck_thread_start(m);
@@ -4630,8 +4632,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         mdt_adapt_sptlrpc_conf(obd, 1);
 
-#ifdef HAVE_QUOTA_SUPPORT
         next = m->mdt_child;
+#ifdef HAVE_QUOTA_SUPPORT
         rc = next->md_ops->mdo_quota.mqo_setup(env, next, lmi->lmi_mnt);
         if (rc)
                 GOTO(err_llog_cleanup, rc);
@@ -4640,11 +4642,45 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         server_put_mount_2(dev, lmi->lmi_mnt);
         lmi = NULL;
 
+        rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
+                                         &mntopts);
+        if (rc)
+                GOTO(err_quota, rc);
+
+        if (mntopts & MNTOPT_USERXATTR)
+                m->mdt_opts.mo_user_xattr = 1;
+        else
+                m->mdt_opts.mo_user_xattr = 0;
+
+        if (mntopts & MNTOPT_ACL)
+                m->mdt_opts.mo_acl = 1;
+        else
+                m->mdt_opts.mo_acl = 0;
+
+        /* XXX: to support suppgid for ACL, we enable identity_upcall
+         * by default, otherwise, maybe got unexpected -EACCESS. */
+        if (m->mdt_opts.mo_acl)
+                identity_upcall = MDT_IDENTITY_UPCALL_PATH;
+
+        m->mdt_identity_cache = upcall_cache_init(obd->obd_name,identity_upcall,
+                                                &mdt_identity_upcall_cache_ops);
+        if (IS_ERR(m->mdt_identity_cache)) {
+                rc = PTR_ERR(m->mdt_identity_cache);
+                m->mdt_identity_cache = NULL;
+                GOTO(err_quota, rc);
+        }
+
         target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
 
+        rc = mdt_procfs_init(m, dev);
+        if (rc) {
+                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
+                GOTO(err_recovery, rc);
+        }
+
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
-                GOTO(err_recovery, rc);
+                GOTO(err_procfs, rc);
 
         ping_evictor_start();
 
@@ -4668,8 +4704,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 err_stop_service:
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
+err_procfs:
+        mdt_procfs_fini(m);
 err_recovery:
         target_recovery_fini(obd);
+        upcall_cache_cleanup(m->mdt_identity_cache);
+        m->mdt_identity_cache = NULL;
+err_quota:
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
 #endif
@@ -4682,8 +4723,6 @@ err_capa:
         cfs_timer_disarm(&m->mdt_ck_timer);
         mdt_ck_thread_stop(m);
 err_free_ns:
-        upcall_cache_cleanup(m->mdt_identity_cache);
-        m->mdt_identity_cache = NULL;
         ldlm_namespace_free(m->mdt_namespace, NULL, 0);
         obd->obd_namespace = m->mdt_namespace = NULL;
 err_fini_seq:
@@ -4694,8 +4733,7 @@ err_lut:
         lut_fini(env, &m->mdt_lut);
 err_fini_stack:
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
-err_fini_proc:
-        mdt_procfs_fini(m);
+err_lu_site:
         lu_site_fini(s);
 err_free_site:
         OBD_FREE_PTR(mite);
@@ -4785,8 +4823,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_init(o, h, d);
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
-                cfs_sema_init(&mo->mot_ioepoch_sem, 1);
-                cfs_sema_init(&mo->mot_lov_sem, 1);
+                cfs_mutex_init(&mo->mot_ioepoch_mutex);
+                cfs_mutex_init(&mo->mot_lov_mutex);
                 RETURN(o);
         } else
                 RETURN(NULL);
@@ -5186,19 +5224,20 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 static int mdt_init_export(struct obd_export *exp)
 {
         struct mdt_export_data *med = &exp->exp_mdt_data;
-        int                     rc = 0;
+        int                     rc;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
         cfs_spin_lock_init(&med->med_open_lock);
-        cfs_sema_init(&med->med_idmap_sem, 1);
+        cfs_mutex_init(&med->med_idmap_mutex);
         med->med_idmap = NULL;
         cfs_spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         cfs_spin_unlock(&exp->exp_lock);
 
         /* self-export doesn't need client data and ldlm initialization */
-        if (unlikely(exp == exp->exp_obd->obd_self_export))
+        if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
+                                     &exp->exp_client_uuid)))
                 RETURN(0);
 
         rc = lut_client_alloc(exp);
@@ -5218,7 +5257,10 @@ static int mdt_destroy_export(struct obd_export *exp)
                 mdt_cleanup_idmap(&exp->exp_mdt_data);
 
         target_destroy_export(exp);
-        if (unlikely(exp == exp->exp_obd->obd_self_export))
+        /* destroy can be called from failed obd_setup, so
+         * checking uuid is safer than obd_self_export */
+        if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
+                                     &exp->exp_client_uuid)))
                 RETURN(0);
 
         ldlm_destroy_export(exp);
@@ -5297,11 +5339,11 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
-static int mdt_obd_notify(struct obd_device *host,
+static int mdt_obd_notify(struct obd_device *obd,
                           struct obd_device *watched,
                           enum obd_notify_event ev, void *data)
 {
-        struct mdt_device *mdt = mdt_dev(host->obd_lu_dev);
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_device *next = mdt->mdt_child;
 #endif
@@ -5359,7 +5401,7 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
 
         obj = mdt_object_find(env, mdt, &fp->gf_fid);
         if (obj == NULL || IS_ERR(obj)) {
-                CDEBUG(D_IOCTL, "no object "DFID": %ld\n",PFID(&fp->gf_fid),
+                CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
                        PTR_ERR(obj));
                 RETURN(-EINVAL);
         }
@@ -5459,6 +5501,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
         struct mdt_lock_handle  *lh;
         int rc;
         ENTRY;
+
         CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
         if (!fid_is_sane(fid))
                 RETURN(-EINVAL);
@@ -5478,8 +5521,11 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  * fid, this is error to find remote object here
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
+        } else if (rc == 0) {
+                *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+                rc = -ENOENT;
         } else {
-                version = mo_version_get(mti->mti_env, mdt_object_child(obj));
+                version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
                *(__u64 *)data->ioc_inlbuf2 = version;
                 rc = 0;
         }
@@ -5508,7 +5554,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                dt->dd_ops->dt_ro(&env, dt);
+                rc = dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("Aborting recovery for device %s\n", obd->obd_name);
@@ -5693,15 +5739,6 @@ LU_KEY_INIT_FINI(mdt, struct mdt_thread_info);
 /* context key: mdt_thread_key */
 LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
 
-/* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
-LU_KEY_INIT_FINI(mdt_txn, struct mdt_txn_info);
-
-struct lu_context_key mdt_txn_key = {
-        .lct_tags = LCT_TX_HANDLE,
-        .lct_init = mdt_txn_key_init,
-        .lct_fini = mdt_txn_key_fini
-};
-
 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
 {
         return md_ucred(info->mti_env);
@@ -5744,7 +5781,7 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
 }
 
 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
-LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
+LU_TYPE_INIT_FINI(mdt, &mdt_thread_key);
 
 static struct lu_device_type_operations mdt_device_type_ops = {
         .ldto_init = mdt_type_init,