Whamcloud - gitweb
LU-2739 mdt: Deny non-DNE client to access remote directory
[fs/lustre-release.git] / lustre / mdt / mdt_recovery.c
index b65f73d..9102eaa 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -41,9 +41,6 @@
  * Author: Pershin Mike <tappro@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include "mdt_internal.h"
@@ -109,7 +106,7 @@ static int mdt_clients_data_init(const struct lu_env *env,
                 off = lsd->lsd_client_start +
                         cl_idx * lsd->lsd_client_size;
 
-               rc = lut_client_data_read(env, &mdt->mdt_lut, lcd, &off, cl_idx);
+               rc = tgt_client_data_read(env, &mdt->mdt_lut, lcd, &off, cl_idx);
                 if (rc) {
                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
                                LAST_RCVD, cl_idx, off, rc);
@@ -149,25 +146,25 @@ static int mdt_clients_data_init(const struct lu_env *env,
                 mti->mti_exp = exp;
                 /* copy on-disk lcd to the export */
                 *exp->exp_target_data.ted_lcd = *lcd;
-               rc = lut_client_add(env, exp, cl_idx);
+               rc = tgt_client_add(env, exp, cl_idx);
                 /* can't fail existing */
                 LASSERTF(rc == 0, "rc = %d\n", rc);
                 /* VBR: set export last committed version */
                 exp->exp_last_committed = last_transno;
-                cfs_spin_lock(&exp->exp_lock);
-                exp->exp_connecting = 0;
-                exp->exp_in_recovery = 0;
-                cfs_spin_unlock(&exp->exp_lock);
-                obd->obd_max_recoverable_clients++;
-                class_export_put(exp);
-
-                CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
-                       cl_idx, last_transno);
-                /* protect __u64 value update */
-                cfs_spin_lock(&mdt->mdt_lut.lut_translock);
-                mdt->mdt_lut.lut_last_transno = max(last_transno,
-                                                mdt->mdt_lut.lut_last_transno);
-                cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+               spin_lock(&exp->exp_lock);
+               exp->exp_connecting = 0;
+               exp->exp_in_recovery = 0;
+               spin_unlock(&exp->exp_lock);
+               obd->obd_max_recoverable_clients++;
+               class_export_put(exp);
+
+               CDEBUG(D_OTHER, "client at idx %d has last_transno ="LPU64"\n",
+                      cl_idx, last_transno);
+               /* protect __u64 value update */
+               spin_lock(&mdt->mdt_lut.lut_translock);
+               mdt->mdt_lut.lut_last_transno = max(last_transno,
+                                               mdt->mdt_lut.lut_last_transno);
+               spin_unlock(&mdt->mdt_lut.lut_translock);
         }
 
 err_client:
@@ -186,6 +183,7 @@ static int mdt_server_data_init(const struct lu_env *env,
         struct dt_object       *obj;
         struct lu_attr         *la;
         unsigned long last_rcvd_size;
+       __u32                   index;
         __u64 mount_count;
         int rc;
         ENTRY;
@@ -196,6 +194,13 @@ static int mdt_server_data_init(const struct lu_env *env,
         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
+       rc = server_name2index(obd->obd_name, &index, NULL);
+       if (rc < 0) {
+               CERROR("%s: Can not get index from obd_name: rc = %d\n",
+                      obd->obd_name, rc);
+               RETURN(rc);
+       }
+
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         LASSERT(mti != NULL);
         la = &mti->mti_attr.ma_attr;
@@ -219,12 +224,13 @@ static int mdt_server_data_init(const struct lu_env *env,
                 lsd->lsd_client_size = LR_CLIENT_SIZE;
                 lsd->lsd_feature_compat = OBD_COMPAT_MDT;
                 lsd->lsd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
-                lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
-                                            OBD_INCOMPAT_COMMON_LR |
-                                            OBD_INCOMPAT_MULTI_OI;
-        } else {
-                LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
-               rc = lut_server_data_read(env, &mdt->mdt_lut);
+               lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
+                                           OBD_INCOMPAT_COMMON_LR |
+                                           OBD_INCOMPAT_MULTI_OI;
+               lsd->lsd_osd_index = index;
+       } else {
+               LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
+               rc = tgt_server_data_read(env, &mdt->mdt_lut);
                 if (rc) {
                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
                         GOTO(out, rc);
@@ -239,7 +245,15 @@ static int mdt_server_data_init(const struct lu_env *env,
                 lsd->lsd_feature_compat |= OBD_COMPAT_MDT;
                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_MDT |
                                              OBD_INCOMPAT_COMMON_LR;
-        }
+               if (lsd->lsd_osd_index != index) {
+                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd is"
+                                          "different with the index %d in"
+                                          "config log, It might be disk"
+                                          "corruption!\n", obd->obd_name,
+                                          lsd->lsd_osd_index, index);
+                       GOTO(out, rc = -EINVAL);
+               }
+       }
         mount_count = lsd->lsd_mount_count;
 
         if (lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP) {
@@ -261,7 +275,7 @@ static int mdt_server_data_init(const struct lu_env *env,
                         LCONSOLE_WARN("Mounting %s at first time on 1.8 FS, "
                                       "remove all clients for interop needs\n",
                                       obd->obd_name);
-                       rc = lut_truncate_last_rcvd(env, &mdt->mdt_lut,
+                       rc = tgt_truncate_last_rcvd(env, &mdt->mdt_lut,
                                                    lsd->lsd_client_start);
                        if (rc)
                                GOTO(out, rc);
@@ -271,14 +285,11 @@ static int mdt_server_data_init(const struct lu_env *env,
                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
         }
 
-       if (lsi->lsi_ldd->ldd_flags & LDD_F_IAM_DIR)
-                lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
-
         lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
 
-        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
-        mdt->mdt_lut.lut_last_transno = lsd->lsd_last_transno;
-        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+       spin_lock(&mdt->mdt_lut.lut_translock);
+       mdt->mdt_lut.lut_last_transno = lsd->lsd_last_transno;
+       spin_unlock(&mdt->mdt_lut.lut_translock);
 
         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
@@ -309,18 +320,18 @@ static int mdt_server_data_init(const struct lu_env *env,
         if (rc)
                 GOTO(err_client, rc);
 
-        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
-        /* obd_last_committed is used for compatibility
-         * with other lustre recovery code */
-        obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
-        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+       spin_lock(&mdt->mdt_lut.lut_translock);
+       /* obd_last_committed is used for compatibility
+        * with other lustre recovery code */
+       obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
+       spin_unlock(&mdt->mdt_lut.lut_translock);
 
         obd->u.obt.obt_mount_count = mount_count + 1;
         obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count;
         lsd->lsd_mount_count = obd->u.obt.obt_mount_count;
 
         /* save it, so mount count and last_transno is current */
-       rc = lut_server_data_update(env, &mdt->mdt_lut, 0);
+       rc = tgt_server_data_update(env, &mdt->mdt_lut, 0);
         if (rc)
                 GOTO(err_client, rc);
 
@@ -353,15 +364,15 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
         ted = &req->rq_export->exp_target_data;
         LASSERT(ted);
 
-        cfs_mutex_lock(&ted->ted_lcd_lock);
-        lcd = ted->ted_lcd;
-        /* if the export has already been disconnected, we have no last_rcvd slot,
-         * update server data with latest transno then */
-        if (lcd == NULL) {
-                cfs_mutex_unlock(&ted->ted_lcd_lock);
+       mutex_lock(&ted->ted_lcd_lock);
+       lcd = ted->ted_lcd;
+       /* if the export has already been disconnected, we have no last_rcvd
+        * slot, update server data with latest transno then */
+       if (lcd == NULL) {
+               mutex_unlock(&ted->ted_lcd_lock);
                 CWARN("commit transaction for disconnected client %s: rc %d\n",
                       req->rq_export->exp_client_uuid.uuid, rc);
-               err = lut_server_data_write(mti->mti_env, &mdt->mdt_lut, th);
+               err = tgt_server_data_write(mti->mti_env, &mdt->mdt_lut, th);
                 RETURN(err);
         }
 
@@ -377,11 +388,11 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                                        lcd->lcd_last_close_transno,
                                        mti->mti_transno, req_is_replay(req));
                                 if (req_is_replay(req)) {
-                                        cfs_spin_lock(&req->rq_export->exp_lock);
-                                        req->rq_export->exp_vbr_failed = 1;
-                                        cfs_spin_unlock(&req->rq_export->exp_lock);
-                                }
-                                cfs_mutex_unlock(&ted->ted_lcd_lock);
+                                       spin_lock(&req->rq_export->exp_lock);
+                                       req->rq_export->exp_vbr_failed = 1;
+                                       spin_unlock(&req->rq_export->exp_lock);
+                               }
+                               mutex_unlock(&ted->ted_lcd_lock);
                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
                         }
                         lcd->lcd_last_close_transno = mti->mti_transno;
@@ -405,11 +416,11 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                                        lcd->lcd_last_transno,
                                        mti->mti_transno, req_is_replay(req));
                                 if (req_is_replay(req)) {
-                                        cfs_spin_lock(&req->rq_export->exp_lock);
-                                        req->rq_export->exp_vbr_failed = 1;
-                                        cfs_spin_unlock(&req->rq_export->exp_lock);
-                                }
-                                cfs_mutex_unlock(&ted->ted_lcd_lock);
+                                       spin_lock(&req->rq_export->exp_lock);
+                                       req->rq_export->exp_vbr_failed = 1;
+                                       spin_unlock(&req->rq_export->exp_lock);
+                               }
+                               mutex_unlock(&ted->ted_lcd_lock);
                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
                         }
                         lcd->lcd_last_transno = mti->mti_transno;
@@ -422,15 +433,39 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
                 lcd->lcd_last_data = mti->mti_opdata;
         }
 
-        if (off <= 0) {
-                CERROR("client idx %d has offset %lld\n", ted->ted_lr_idx, off);
-                err = -EINVAL;
-        } else {
-               err = lut_client_data_write(mti->mti_env, &mdt->mdt_lut, lcd,
+       if (exp_connect_flags(mti->mti_exp) & OBD_CONNECT_LIGHTWEIGHT) {
+               /* Although lightweight (LW) connections have no slot in
+                * last_rcvd, we still want to maintain the in-memory
+                * lsd_client_data structure in order to properly handle reply
+                * reconstruction. */
+               struct lu_target        *tg = &mdt->mdt_lut;
+               bool                     update = false;
+
+               mutex_unlock(&ted->ted_lcd_lock);
+               err = 0;
+
+               /* All operations performed by LW clients are synchronous and
+                * we store the committed transno in the last_rcvd header */
+               spin_lock(&tg->lut_translock);
+               if (mti->mti_transno > tg->lut_lsd.lsd_last_transno) {
+                       tg->lut_lsd.lsd_last_transno = mti->mti_transno;
+                       update = true;
+               }
+               spin_unlock(&tg->lut_translock);
+
+               if (update)
+                       err = tgt_server_data_write(mti->mti_env, tg, th);
+       } else if (off <= 0) {
+               CERROR("%s: client idx %d has offset %lld\n",
+                      mdt2obd_dev(mdt)->obd_name, ted->ted_lr_idx, off);
+               mutex_unlock(&ted->ted_lcd_lock);
+               err = -EINVAL;
+       } else {
+               err = tgt_client_data_write(mti->mti_env, &mdt->mdt_lut, lcd,
                                            &off, th);
-        }
-        cfs_mutex_unlock(&ted->ted_lcd_lock);
-        RETURN(err);
+               mutex_unlock(&ted->ted_lcd_lock);
+       }
+       RETURN(err);
 }
 
 extern struct lu_context_key mdt_thread_key;
@@ -462,7 +497,9 @@ static int mdt_txn_start_cb(const struct lu_env *env,
        if (rc)
                return rc;
 
-       if (mti->mti_mos != NULL)
+       /* we probably should not set local transno to the remote object
+        * on another storage, What about VBR on remote object? XXX */
+       if (mti->mti_mos != NULL && !mdt_object_remote(mti->mti_mos))
                rc = dt_declare_version_set(env, mdt_obj2dt(mti->mti_mos), th);
 
        return rc;
@@ -479,10 +516,8 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         req = mdt_info_req(mti);
 
-        if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
-                mti->mti_no_need_trans = 0;
-                return 0;
-        }
+       if (mti->mti_mdt == NULL || req == NULL)
+               return 0;
 
         if (mti->mti_has_trans) {
                 /* XXX: currently there are allowed cases, but the wrong cases
@@ -493,11 +528,12 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         }
 
         mti->mti_has_trans = 1;
-        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
+       spin_lock(&mdt->mdt_lut.lut_translock);
         if (txn->th_result != 0) {
                 if (mti->mti_transno != 0) {
-                        CERROR("Replay transno "LPU64" failed: rc %d\n",
-                               mti->mti_transno, txn->th_result);
+                       CERROR("Replay transno "LPU64" failed: rc %d\n",
+                               mti->mti_transno, txn->th_result);
+                       return 0;
                 }
         } else if (mti->mti_transno == 0) {
                 mti->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
@@ -506,12 +542,16 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
                 if (mti->mti_transno > mdt->mdt_lut.lut_last_transno)
                         mdt->mdt_lut.lut_last_transno = mti->mti_transno;
         }
-        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+       spin_unlock(&mdt->mdt_lut.lut_translock);
         /* sometimes the reply message has not been successfully packed */
         LASSERT(req != NULL && req->rq_repmsg != NULL);
 
         /** VBR: set new versions */
-        if (txn->th_result == 0 && mti->mti_mos != NULL) {
+       /* we probably should not set local transno to the remote object
+        * on another storage, What about VBR on remote object? XXX */
+       if (txn->th_result == 0 && mti->mti_mos != NULL &&
+           !mdt_object_remote(mti->mti_mos)) {
+
                 dt_version_set(env, mdt_obj2dt(mti->mti_mos),
                                mti->mti_transno, txn);
                 mti->mti_mos = NULL;
@@ -524,7 +564,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
         req->rq_transno = mti->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
         /* if can't add callback, do sync write */
-        txn->th_sync |= !!lut_last_commit_cb_add(txn, &mdt->mdt_lut,
+        txn->th_sync |= !!tgt_last_commit_cb_add(txn, &mdt->mdt_lut,
                                                  mti->mti_exp,
                                                  mti->mti_transno);
         return mdt_last_rcvd_update(mti, txn);
@@ -534,8 +574,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
                  struct obd_device *obd,
                  struct lustre_sb_info *lsi)
 {
-        struct lu_fid fid;
-        struct dt_object *o;
         int rc = 0;
         ENTRY;
 
@@ -553,28 +591,8 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
         rc = mdt_server_data_init(env, mdt, lsi);
-        if (rc)
-                RETURN(rc);
 
-        o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
-        if (!IS_ERR(o)) {
-                mdt->mdt_ck_obj = o;
-                rc = mdt_capa_keys_init(env, mdt);
-                if (rc)
-                        GOTO(put_ck_object, rc);
-        } else {
-                rc = PTR_ERR(o);
-                CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
-                GOTO(disconnect_exports, rc);
-        }
-        RETURN(0);
-
-put_ck_object:
-        lu_object_put(env, &o->do_lu);
-        mdt->mdt_ck_obj = NULL;
-disconnect_exports:
-        class_disconnect_exports(obd);
-        return rc;
+       RETURN(rc);
 }
 
 void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
@@ -592,14 +610,14 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
 /* reconstruction code */
 static void mdt_steal_ack_locks(struct ptlrpc_request *req)
 {
+       struct ptlrpc_service_part *svcpt;
         struct obd_export         *exp = req->rq_export;
         cfs_list_t                *tmp;
         struct ptlrpc_reply_state *oldrep;
-        struct ptlrpc_service     *svc;
         int                        i;
 
         /* CAVEAT EMPTOR: spinlock order */
-        cfs_spin_lock(&exp->exp_lock);
+       spin_lock(&exp->exp_lock);
         cfs_list_for_each (tmp, &exp->exp_outstanding_replies) {
                 oldrep = cfs_list_entry(tmp, struct ptlrpc_reply_state,
                                         rs_exp_list);
@@ -613,16 +631,16 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                                 lustre_msg_get_opc(req->rq_reqmsg),
                                 oldrep->rs_opc);
 
-                svc = oldrep->rs_service;
-                cfs_spin_lock (&svc->srv_rs_lock);
+               svcpt = oldrep->rs_svcpt;
+               spin_lock(&svcpt->scp_rep_lock);
 
                 cfs_list_del_init (&oldrep->rs_exp_list);
 
-                CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
-                      " o%d NID %s\n",
-                      oldrep->rs_nlocks, oldrep,
-                      oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_opc,
-                      libcfs_nid2str(exp->exp_connection->c_peer.nid));
+               CDEBUG(D_HA, "Stealing %d locks from rs %p x"LPD64".t"LPD64
+                      " o%d NID %s\n",
+                      oldrep->rs_nlocks, oldrep,
+                      oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_opc,
+                      libcfs_nid2str(exp->exp_connection->c_peer.nid));
 
                 for (i = 0; i < oldrep->rs_nlocks; i++)
                         ptlrpc_save_lock(req, &oldrep->rs_locks[i],
@@ -630,14 +648,14 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                 oldrep->rs_nlocks = 0;
 
                 DEBUG_REQ(D_HA, req, "stole locks for");
-                cfs_spin_lock(&oldrep->rs_lock);
-                ptlrpc_schedule_difficult_reply (oldrep);
-                cfs_spin_unlock(&oldrep->rs_lock);
-
-                cfs_spin_unlock (&svc->srv_rs_lock);
-                break;
-        }
-        cfs_spin_unlock(&exp->exp_lock);
+               spin_lock(&oldrep->rs_lock);
+               ptlrpc_schedule_difficult_reply(oldrep);
+               spin_unlock(&oldrep->rs_lock);
+
+               spin_unlock(&svcpt->scp_rep_lock);
+               break;
+       }
+       spin_unlock(&exp->exp_lock);
 }
 
 /**
@@ -720,15 +738,19 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti,
         body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
         mti->mti_attr.ma_need = MA_INODE;
         mti->mti_attr.ma_valid = 0;
-        rc = mo_attr_get(mti->mti_env, mdt_object_child(child), &mti->mti_attr);
-        if (rc == -EREMOTE) {
-                /* object was created on remote server */
-                req->rq_status = rc;
-                body->valid |= OBD_MD_MDS;
-        }
-        mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
-                           mdt_object_fid(child));
-        mdt_object_put(mti->mti_env, child);
+       rc = mdt_attr_get_complex(mti, child, &mti->mti_attr);
+       if (rc == -EREMOTE) {
+               /* object was created on remote server */
+               if (!mdt_is_dne_client(exp))
+                       /* Return -EIO for old client */
+                       rc = -EIO;
+
+               req->rq_status = rc;
+               body->valid |= OBD_MD_MDS;
+       }
+       mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
+                          mdt_object_fid(child));
+       mdt_object_put(mti->mti_env, child);
 }
 
 static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
@@ -760,7 +782,7 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
         }
         mti->mti_attr.ma_need = MA_INODE;
         mti->mti_attr.ma_valid = 0;
-        mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
+       mdt_attr_get_complex(mti, obj, &mti->mti_attr);
         mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
                            mdt_object_fid(obj));
         if (mti->mti_ioepoch && (mti->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
@@ -769,17 +791,17 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
 
                 repbody = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
                 repbody->ioepoch = obj->mot_ioepoch;
-                cfs_spin_lock(&med->med_open_lock);
-                cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
-                        if (mfd->mfd_xid == req->rq_xid)
-                                break;
-                }
-                LASSERT(&mfd->mfd_list != &med->med_open_head);
-                cfs_spin_unlock(&med->med_open_lock);
-                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-        }
-
-        mdt_object_put(mti->mti_env, obj);
+               spin_lock(&med->med_open_lock);
+               cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+                       if (mfd->mfd_xid == req->rq_xid)
+                               break;
+               }
+               LASSERT(&mfd->mfd_list != &med->med_open_head);
+               spin_unlock(&med->med_open_lock);
+               repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+       }
+
+       mdt_object_put(mti->mti_env, obj);
 }
 
 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,