Whamcloud - gitweb
LU-2275 mdt: Avoid setting positive dispositions too early
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 20281b8..a614bf3 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -29,8 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -43,9 +40,6 @@
  * Author: Huang Hua <huanghua@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <lustre_acl.h>
@@ -57,6 +51,11 @@ static void mdt_mfd_get(void *mfdp)
 {
 }
 
+static struct portals_handle_ops mfd_handle_ops = {
+       .hop_addref = mdt_mfd_get,
+       .hop_free   = NULL,
+};
+
 /* Create a new mdt_file_data struct, initialize it,
  * and insert it to global hash table */
 struct mdt_file_data *mdt_mfd_new(void)
@@ -68,7 +67,7 @@ struct mdt_file_data *mdt_mfd_new(void)
         if (mfd != NULL) {
                 CFS_INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
                 CFS_INIT_LIST_HEAD(&mfd->mfd_list);
-                class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
+               class_handle_hash(&mfd->mfd_handle, &mfd_handle_ops);
         }
         RETURN(mfd);
 }
@@ -119,15 +118,18 @@ static int mdt_create_data(struct mdt_thread_info *info,
 
         ma->ma_need = MA_INODE | MA_LOV;
         ma->ma_valid = 0;
-        cfs_down(&o->mot_lov_sem);
+       mutex_lock(&o->mot_lov_mutex);
         if (!(o->mot_flags & MOF_LOV_CREATED)) {
                 rc = mdo_create_data(info->mti_env,
                                      p ? mdt_object_child(p) : NULL,
                                      mdt_object_child(o), spec, ma);
+               if (rc == 0)
+                       rc = mdt_attr_get_complex(info, o, ma);
+
                 if (rc == 0 && ma->ma_valid & MA_LOV)
                         o->mot_flags |= MOF_LOV_CREATED;
         }
-        cfs_up(&o->mot_lov_sem);
+       mutex_unlock(&o->mot_lov_mutex);
         RETURN(rc);
 }
 
@@ -143,7 +145,7 @@ int mdt_object_is_som_enabled(struct mdt_object *mo)
 
 /**
  * Re-enable Size-on-MDS.
- * Call under ->mot_ioepoch_sem.
+ * Call under ->mot_ioepoch_mutex.
  */
 static void mdt_object_som_enable(struct mdt_object *mo, __u64 ioepoch)
 {
@@ -156,7 +158,7 @@ static void mdt_object_som_enable(struct mdt_object *mo, __u64 ioepoch)
 
 /**
  * Open the IOEpoch. It is allowed if @writecount is not negative.
- * The epoch and writecount handling is performed under the mot_ioepoch_sem.
+ * The epoch and writecount handling is performed under the mot_ioepoch_mutex.
  */
 int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
                      int created)
@@ -170,14 +172,14 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
             !S_ISREG(lu_object_attr(&o->mot_obj.mo_lu)))
                 RETURN(0);
 
-        cfs_down(&o->mot_ioepoch_sem);
-        if (mdt_ioepoch_opened(o)) {
-                /* Epoch continues even if there is no writers yet. */
-                CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
-                       o->mot_ioepoch, PFID(mdt_object_fid(o)));
-        } else {
-                /* XXX: ->mdt_ioepoch is not initialized at the mount */
-                cfs_spin_lock(&mdt->mdt_ioepoch_lock);
+       mutex_lock(&o->mot_ioepoch_mutex);
+       if (mdt_ioepoch_opened(o)) {
+               /* Epoch continues even if there is no writers yet. */
+               CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID"\n",
+                      o->mot_ioepoch, PFID(mdt_object_fid(o)));
+       } else {
+               /* XXX: ->mdt_ioepoch is not initialized at the mount */
+               spin_lock(&mdt->mdt_ioepoch_lock);
                 if (mdt->mdt_ioepoch < info->mti_replayepoch)
                         mdt->mdt_ioepoch = info->mti_replayepoch;
 
@@ -188,16 +190,16 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
                 else
                         o->mot_ioepoch = mdt->mdt_ioepoch;
 
-                cfs_spin_unlock(&mdt->mdt_ioepoch_lock);
+               spin_unlock(&mdt->mdt_ioepoch_lock);
 
-                CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
-                       o->mot_ioepoch, PFID(mdt_object_fid(o)));
-                if (created)
-                        o->mot_flags |= MOF_SOM_CREATED;
-                cancel = 1;
-        }
-        o->mot_ioepoch_count++;
-        cfs_up(&o->mot_ioepoch_sem);
+               CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID"\n",
+                      o->mot_ioepoch, PFID(mdt_object_fid(o)));
+               if (created)
+                       o->mot_flags |= MOF_SOM_CREATED;
+               cancel = 1;
+       }
+       o->mot_ioepoch_count++;
+       mutex_unlock(&o->mot_ioepoch_mutex);
 
         /* Cancel Size-on-MDS attributes cached on clients for the open case.
          * In the truncate case, see mdt_reint_setattr(). */
@@ -215,45 +217,49 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
 /**
  * Update SOM on-disk attributes.
  * If enabling, write update inodes and lustre-ea with the proper IOEpoch,
- * mountid and attributes. If disabling, zero IOEpoch id in lustre-ea.
- * Call under ->mot_ioepoch_sem.
+ * mountid and attributes. If disabling, clean SOM xattr.
+ * Call under ->mot_ioepoch_mutex.
  */
 static int mdt_som_attr_set(struct mdt_thread_info *info,
-                            struct mdt_object *obj, __u64 ioepoch, int enable)
+                           struct mdt_object *obj, __u64 ioepoch, bool enable)
 {
-        struct md_attr *ma = &info->mti_attr;
-        int rc;
+       struct md_object        *next = mdt_object_child(obj);
+       int                      rc;
         ENTRY;
 
         CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64
                " on "DFID".\n", enable ? "update" : "disabling",
                ioepoch, PFID(mdt_object_fid(obj)));
 
-        ma->ma_valid |= MA_SOM;
-        ma->ma_som = &info->mti_u.som.data;
-        if (enable) {
-                struct mdt_device *mdt = info->mti_mdt;
-                struct lu_attr *la = &ma->ma_attr;
-
-                ma->ma_som->msd_ioepoch = ioepoch;
-                ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
-                ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
-                                         la->la_blocks : 0;
-                ma->ma_som->msd_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
-                ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
-        } else {
-                ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
-                ma->ma_attr.la_valid &= LA_ATIME;
-        }
-
-        /* Since we have opened the file, it is unnecessary
-         * to check permission when close it. Between the "open"
-         * and "close", maybe someone has changed the file mode
-         * or flags, or the file created mode do not permit wirte,
-         * and so on. Just set MDS_PERM_BYPASS for all the cases. */
-        ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+       if (enable) {
+               struct lu_buf           *buf = &info->mti_buf;
+               struct som_attrs        *attrs;
+               struct md_attr          *ma = &info->mti_attr;
+               struct lu_attr          *la = &ma->ma_attr;
+               struct obd_device       *obd = info->mti_mdt->mdt_lut.lut_obd;
+
+               attrs = (struct som_attrs *)info->mti_xattr_buf;
+               CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*attrs));
+
+               /* pack SOM attributes */
+               memset(attrs, 0, sizeof(*attrs));
+               attrs->som_ioepoch = ioepoch;
+               attrs->som_mountid = obd->u.obt.obt_mount_count;
+               if ((la->la_valid & LA_SIZE) != 0)
+                       attrs->som_size = la->la_size;
+               if ((la->la_valid & LA_BLOCKS) != 0)
+                       attrs->som_blocks = la->la_blocks;
+               lustre_som_swab(attrs);
+
+               /* update SOM attributes */
+               buf->lb_buf = attrs;
+               buf->lb_len = sizeof(*attrs);
+               rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_SOM, 0);
+       } else {
+               /* delete SOM attributes */
+               rc = mo_xattr_del(info->mti_env, next, XATTR_NAME_SOM);
+       }
 
-        rc = mdt_attr_set(info, obj, ma, 0);
         RETURN(rc);
 }
 
@@ -263,7 +269,7 @@ static inline int mdt_ioepoch_close_on_eviction(struct mdt_thread_info *info,
 {
         int rc = 0;
 
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         CDEBUG(D_INODE, "Eviction. Closing IOepoch "LPU64" on "DFID". "
                "Count %d\n", o->mot_ioepoch, PFID(mdt_object_fid(o)),
                o->mot_ioepoch_count);
@@ -276,7 +282,7 @@ static inline int mdt_ioepoch_close_on_eviction(struct mdt_thread_info *info,
                 rc = mdt_som_attr_set(info, o, o->mot_ioepoch, MDT_SOM_DISABLE);
                 mdt_object_som_enable(o, o->mot_ioepoch);
         }
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         RETURN(rc);
 }
 
@@ -291,7 +297,7 @@ static inline int mdt_ioepoch_close_on_replay(struct mdt_thread_info *info,
         int rc = MDT_IOEPOCH_CLOSED;
         ENTRY;
 
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         CDEBUG(D_INODE, "Replay. Closing epoch "LPU64" on "DFID". Count %d\n",
                o->mot_ioepoch, PFID(mdt_object_fid(o)), o->mot_ioepoch_count);
         o->mot_ioepoch_count--;
@@ -303,7 +309,7 @@ static inline int mdt_ioepoch_close_on_replay(struct mdt_thread_info *info,
 
         if (!mdt_ioepoch_opened(o))
                 mdt_object_som_enable(o, info->mti_ioepoch->ioepoch);
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
 
         RETURN(rc);
 }
@@ -332,7 +338,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
         la = &info->mti_attr.ma_attr;
         achange = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
 
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         o->mot_ioepoch_count--;
 
         tmp_ma = &info->mti_u.som.attr;
@@ -341,7 +347,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
         tmp_ma->ma_som = &info->mti_u.som.data;
         tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM;
         tmp_ma->ma_valid = 0;
-        rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma);
+       rc = mdt_attr_get_complex(info, o, tmp_ma);
         if (rc)
                 GOTO(error_up, rc);
 
@@ -399,7 +405,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
                 mdt_object_som_enable(o, o->mot_ioepoch);
         }
 
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         /* If recovery is needed, tell the client to perform GETATTR under
          * the lock. */
         if (ret == MDT_IOEPOCH_GETATTR && recovery) {
@@ -411,7 +417,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
         RETURN(rc ? : ret);
 
 error_up:
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         return rc;
 }
 
@@ -480,7 +486,7 @@ int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
              !(info->mti_attr.ma_attr.la_valid & LA_SIZE)))
                 act = MDT_SOM_DISABLE;
 
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         /* Mark the object it is the recovery state if we failed to obtain
          * SOM attributes. */
         if (act == MDT_SOM_DISABLE)
@@ -494,7 +500,7 @@ int mdt_som_au_close(struct mdt_thread_info *info, struct mdt_object *o)
                         rc = mdt_som_attr_set(info, o, ioepoch, act);
                 mdt_object_som_enable(o, ioepoch);
         }
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         RETURN(rc);
 }
 
@@ -502,9 +508,9 @@ int mdt_write_read(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         rc = o->mot_writecount;
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         RETURN(rc);
 }
 
@@ -512,21 +518,21 @@ int mdt_write_get(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         if (o->mot_writecount < 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount++;
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         RETURN(rc);
 }
 
 void mdt_write_put(struct mdt_object *o)
 {
         ENTRY;
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         o->mot_writecount--;
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         EXIT;
 }
 
@@ -534,44 +540,57 @@ static int mdt_write_deny(struct mdt_object *o)
 {
         int rc = 0;
         ENTRY;
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         if (o->mot_writecount > 0)
                 rc = -ETXTBSY;
         else
                 o->mot_writecount--;
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         RETURN(rc);
 }
 
 static void mdt_write_allow(struct mdt_object *o)
 {
         ENTRY;
-        cfs_down(&o->mot_ioepoch_sem);
+       mutex_lock(&o->mot_ioepoch_mutex);
         o->mot_writecount++;
-        cfs_up(&o->mot_ioepoch_sem);
+       mutex_unlock(&o->mot_ioepoch_mutex);
         EXIT;
 }
 
 /* there can be no real transaction so prepare the fake one */
-static void mdt_empty_transno(struct mdt_thread_info* info)
+static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
 {
-        struct mdt_device *mdt = info->mti_mdt;
-        struct ptlrpc_request *req = mdt_info_req(info);
+        struct mdt_device      *mdt = info->mti_mdt;
+        struct ptlrpc_request  *req = mdt_info_req(info);
+        struct tg_export_data  *ted;
+        struct lsd_client_data *lcd;
 
         ENTRY;
         /* transaction has occurred already */
         if (lustre_msg_get_transno(req->rq_repmsg) != 0)
                 RETURN_EXIT;
 
-        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
-        if (info->mti_transno == 0) {
-                info->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
-        } else {
-                /* should be replay */
-                if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
-                        mdt->mdt_lut.lut_last_transno = info->mti_transno;
-        }
-        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+       spin_lock(&mdt->mdt_lut.lut_translock);
+       if (rc != 0) {
+               if (info->mti_transno != 0) {
+                       struct obd_export *exp = req->rq_export;
+
+                       CERROR("%s: replay trans "LPU64" NID %s: rc = %d\n",
+                               mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
+                               info->mti_transno,
+                               libcfs_nid2str(exp->exp_connection->c_peer.nid),
+                               rc);
+                       RETURN_EXIT;
+               }
+       } else if (info->mti_transno == 0) {
+               info->mti_transno = ++mdt->mdt_lut.lut_last_transno;
+       } else {
+               /* should be replay */
+               if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
+                       mdt->mdt_lut.lut_last_transno = info->mti_transno;
+       }
+       spin_unlock(&mdt->mdt_lut.lut_translock);
 
         CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
                         info->mti_transno,
@@ -579,6 +598,49 @@ static void mdt_empty_transno(struct mdt_thread_info* info)
 
         req->rq_transno = info->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
+
+        /* update lcd in memory only for resent cases */
+        ted = &req->rq_export->exp_target_data;
+        LASSERT(ted);
+       mutex_lock(&ted->ted_lcd_lock);
+        lcd = ted->ted_lcd;
+       if (info->mti_transno < lcd->lcd_last_transno &&
+           info->mti_transno != 0) {
+               /* This should happen during replay. Do not update
+                * last rcvd info if replay req transno < last transno,
+                * otherwise the following resend(after replay) can not
+                * be checked correctly by xid */
+               mutex_unlock(&ted->ted_lcd_lock);
+               CDEBUG(D_HA, "%s: transno = "LPU64" < last_transno = "LPU64"\n",
+                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
+                       info->mti_transno, lcd->lcd_last_transno);
+               RETURN_EXIT;
+       }
+
+        if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
+            lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
+               if (info->mti_transno != 0)
+                       lcd->lcd_last_close_transno = info->mti_transno;
+                lcd->lcd_last_close_xid = req->rq_xid;
+                lcd->lcd_last_close_result = rc;
+        } else {
+                /* VBR: save versions in last_rcvd for reconstruct. */
+                __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                if (pre_versions) {
+                        lcd->lcd_pre_versions[0] = pre_versions[0];
+                        lcd->lcd_pre_versions[1] = pre_versions[1];
+                        lcd->lcd_pre_versions[2] = pre_versions[2];
+                        lcd->lcd_pre_versions[3] = pre_versions[3];
+                }
+               if (info->mti_transno != 0)
+                       lcd->lcd_last_transno = info->mti_transno;
+
+               lcd->lcd_last_xid = req->rq_xid;
+                lcd->lcd_last_result = rc;
+                lcd->lcd_last_data = info->mti_opdata;
+        }
+       mutex_unlock(&ted->ted_lcd_lock);
+
         EXIT;
 }
 
@@ -650,7 +712,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         rc = mo_open(info->mti_env, mdt_object_child(o),
                      created ? flags | MDS_OPEN_CREATED : flags);
         if (rc)
-                RETURN(rc);
+                GOTO(err_out, rc);
 
         mfd = mdt_mfd_new();
         if (mfd != NULL) {
@@ -687,13 +749,17 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                                        "cookie=" LPX64"\n", mfd,
                                        PFID(mdt_object_fid(mfd->mfd_object)),
                                        info->mti_rr.rr_handle->cookie);
-                                cfs_spin_lock(&med->med_open_lock);
-                                class_handle_unhash(&old_mfd->mfd_handle);
-                                cfs_list_del_init(&old_mfd->mfd_list);
-                                cfs_spin_unlock(&med->med_open_lock);
+                               spin_lock(&med->med_open_lock);
+                               class_handle_unhash(&old_mfd->mfd_handle);
+                               cfs_list_del_init(&old_mfd->mfd_list);
+                               spin_unlock(&med->med_open_lock);
                                 /* no attr update for that close */
                                 la->la_valid = 0;
+                                ma->ma_valid |= MA_FLAGS;
+                                ma->ma_attr_flags |= MDS_RECOV_OPEN;
                                 mdt_mfd_close(info, old_mfd);
+                                ma->ma_attr_flags &= ~MDS_RECOV_OPEN;
+                                ma->ma_valid &= ~MA_FLAGS;
                         }
                         CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
                                info->mti_rr.rr_handle->cookie);
@@ -703,28 +769,37 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
 
                 if (req->rq_export->exp_disconnected) {
-                        cfs_spin_lock(&med->med_open_lock);
-                        class_handle_unhash(&mfd->mfd_handle);
-                        cfs_list_del_init(&mfd->mfd_list);
-                        cfs_spin_unlock(&med->med_open_lock);
-                        mdt_mfd_close(info, mfd);
-                } else {
-                        cfs_spin_lock(&med->med_open_lock);
-                        cfs_list_add(&mfd->mfd_list, &med->med_open_head);
-                        cfs_spin_unlock(&med->med_open_lock);
+                       spin_lock(&med->med_open_lock);
+                       class_handle_unhash(&mfd->mfd_handle);
+                       cfs_list_del_init(&mfd->mfd_list);
+                       spin_unlock(&med->med_open_lock);
+                       mdt_mfd_close(info, mfd);
+               } else {
+                       spin_lock(&med->med_open_lock);
+                       cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+                       spin_unlock(&med->med_open_lock);
                 }
 
-                mdt_empty_transno(info);
-        } else
-                rc = -ENOMEM;
+                mdt_empty_transno(info, rc);
+        } else {
+                GOTO(err_out, rc = -ENOMEM);
+        }
 
         RETURN(rc);
-}
 
+err_out:
+        if (flags & FMODE_WRITE)
+                        /* XXX We also need to close io epoch here.
+                         * See LU-1220 - green */
+                mdt_write_put(o);
+        else if (flags & FMODE_EXEC)
+                mdt_write_allow(o);
+        return rc;
+}
 
-static int mdt_finish_open(struct mdt_thread_info *info,
-                           struct mdt_object *p, struct mdt_object *o,
-                           __u64 flags, int created, struct ldlm_reply *rep)
+int mdt_finish_open(struct mdt_thread_info *info,
+                    struct mdt_object *p, struct mdt_object *o,
+                    __u64 flags, int created, struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct obd_export       *exp = req->rq_export;
@@ -747,6 +822,17 @@ static int mdt_finish_open(struct mdt_thread_info *info,
         islnk = S_ISLNK(la->la_mode);
         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
 
+       /* LU-2275, simulate broken behaviour (esp. prevalent in
+        * pre-2.4 servers where a very strange reply is sent on error
+        * that looks like it was actually almost succesful and a failure at the
+        * same time */
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_NEGATIVE_POSITIVE)) {
+               mdt_set_disposition(info, rep, DISP_OPEN_LOCK | \
+                                   DISP_OPEN_OPEN | DISP_LOOKUP_NEG | \
+                                   DISP_LOOKUP_POS);
+               RETURN(-ENOENT);
+       }
+
         if (exp_connect_rmtclient(exp)) {
                 void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
 
@@ -826,8 +912,6 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                 RETURN(0);
         }
 
-        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
-
         /*
          * We need to return the existing object's fid back, so it is done here,
          * after preparing the reply.
@@ -851,15 +935,14 @@ static int mdt_finish_open(struct mdt_thread_info *info,
 
         mfd = NULL;
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                cfs_spin_lock(&med->med_open_lock);
-                cfs_list_for_each(t, &med->med_open_head) {
-                        mfd = cfs_list_entry(t, struct mdt_file_data, mfd_list);
-                        if (mfd->mfd_xid == req->rq_xid) {
-                                break;
-                        }
-                        mfd = NULL;
-                }
-                cfs_spin_unlock(&med->med_open_lock);
+               spin_lock(&med->med_open_lock);
+               cfs_list_for_each(t, &med->med_open_head) {
+                       mfd = cfs_list_entry(t, struct mdt_file_data, mfd_list);
+                       if (mfd->mfd_xid == req->rq_xid)
+                               break;
+                       mfd = NULL;
+               }
+               spin_unlock(&med->med_open_lock);
 
                 if (mfd != NULL) {
                         repbody->handle.cookie = mfd->mfd_handle.h_cookie;
@@ -872,11 +955,15 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                                 else
                                         repbody->valid |= OBD_MD_FLEASIZE;
                         }
+                       mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
                         RETURN(0);
                 }
         }
 
         rc = mdt_mfd_open(info, p, o, flags, created);
+       if (!rc)
+               mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+
         RETURN(rc);
 }
 
@@ -957,11 +1044,9 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                 }
                 rc = mdt_object_exists(child);
                 if (rc > 0) {
-                        struct md_object *next;
 
                         mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
-                        next = mdt_object_child(child);
-                        rc = mo_attr_get(env, next, ma);
+                       rc = mdt_attr_get_complex(info, child, ma);
                         if (rc == 0)
                               rc = mdt_finish_open(info, parent, child,
                                                    flags, 1, ldlm_rep);
@@ -992,10 +1077,9 @@ out:
         LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
 }
 
-static int mdt_open_by_fid(struct mdt_thread_info* info,
-                           struct ldlm_reply *rep)
+int mdt_open_by_fid(struct mdt_thread_info* info,
+                    struct ldlm_reply *rep)
 {
-        const struct lu_env     *env = info->mti_env;
         __u32                    flags = info->mti_spec.sp_cr_flags;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct md_attr          *ma = &info->mti_attr;
@@ -1013,7 +1097,7 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
                                                 DISP_LOOKUP_EXECD |
                                                 DISP_LOOKUP_POS));
 
-                rc = mo_attr_get(env, mdt_object_child(o), ma);
+               rc = mdt_attr_get_complex(info, o, ma);
                 if (rc == 0)
                         rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
         } else if (rc == 0) {
@@ -1031,9 +1115,8 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         RETURN(rc);
 }
 
-static int mdt_open_anon_by_fid(struct mdt_thread_info *info,
-                                struct ldlm_reply *rep, 
-                                struct mdt_lock_handle *lhc)
+int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
+                        struct mdt_lock_handle *lhc)
 {
         const struct lu_env     *env   = info->mti_env;
         struct mdt_device       *mdt   = info->mti_mdt;
@@ -1046,7 +1129,7 @@ static int mdt_open_anon_by_fid(struct mdt_thread_info *info,
         ldlm_mode_t              lm;
         ENTRY;
 
-        if (md_should_create(flags)) {
+       if (md_should_create(flags) && !(flags & MDS_OPEN_HAS_EA)) {
                 if (!lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
                         parent = mdt_object_find(env, mdt, rr->rr_fid1);
                         if (IS_ERR(parent)) {
@@ -1075,8 +1158,7 @@ static int mdt_open_anon_by_fid(struct mdt_thread_info *info,
                 GOTO(out, rc);
         }
         mdt_set_disposition(info, rep, (DISP_IT_EXECD |
-                                        DISP_LOOKUP_EXECD |
-                                        DISP_LOOKUP_POS));
+                                       DISP_LOOKUP_EXECD));
 
         if (flags & FMODE_WRITE)
                 lm = LCK_CW;
@@ -1093,7 +1175,7 @@ static int mdt_open_anon_by_fid(struct mdt_thread_info *info,
         if (rc)
                 GOTO(out, rc);
 
-        rc = mo_attr_get(env, mdt_object_child(o), ma);
+       rc = mdt_attr_get_complex(info, o, ma);
         if (rc)
                 GOTO(out, rc);
 
@@ -1108,13 +1190,17 @@ static int mdt_open_anon_by_fid(struct mdt_thread_info *info,
                 }
         }
 
-        if (flags & MDS_OPEN_LOCK)
-                mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
         rc = mdt_finish_open(info, parent, o, flags, 0, rep);
 
         if (!(flags & MDS_OPEN_LOCK) || rc)
                 mdt_object_unlock(info, o, lhc, 1);
 
+       if (!rc) {
+               mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+               if (flags & MDS_OPEN_LOCK)
+                       mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
+       }
+
         GOTO(out, rc);
 out:
         mdt_object_put(env, o);
@@ -1130,9 +1216,9 @@ int mdt_pin(struct mdt_thread_info* info)
 }
 
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
-static int mdt_cross_open(struct mdt_thread_info* info,
-                          const struct lu_fid *fid,
-                          struct ldlm_reply *rep, __u32 flags)
+int mdt_cross_open(struct mdt_thread_info* info,
+                   const struct lu_fid *fid,
+                   struct ldlm_reply *rep, __u32 flags)
 {
         struct md_attr    *ma = &info->mti_attr;
         struct mdt_object *o;
@@ -1152,7 +1238,7 @@ static int mdt_cross_open(struct mdt_thread_info* info,
                         goto out;
 
                 mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
-                rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma);
+               rc = mdt_attr_get_complex(info, o, ma);
                 if (rc == 0)
                         rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
         } else if (rc == 0) {
@@ -1199,7 +1285,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_MDS_PAUSE_OPEN, OBD_FAIL_ONCE,
                                (obd_timeout + 1) / 4);
 
-        mdt_counter_incr(req->rq_export, LPROC_MDT_OPEN);
+       mdt_counter_incr(req, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
@@ -1230,33 +1316,35 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                PFID(rr->rr_fid2), create_flags,
                ma->ma_attr.la_mode, msg_flags);
 
-        if (req_is_replay(req) ||
-            (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
-                /* This is a replay request or from liblustre with ea. */
-                result = mdt_open_by_fid(info, ldlm_rep);
-
-                if (result != -ENOENT) {
-                        if (req->rq_export->exp_libclient &&
-                            create_flags&MDS_OPEN_HAS_EA)
-                                GOTO(out, result = 0);
-                        GOTO(out, result);
-                }
-                /*
-                 * We didn't find the correct object, so we need to re-create it
-                 * via a regular replay.
-                 */
-                if (!(create_flags & MDS_OPEN_CREAT)) {
-                        DEBUG_REQ(D_ERROR, req,
-                                  "OPEN & CREAT not in open replay.");
-                        GOTO(out, result = -EFAULT);
-                }
-                CDEBUG(D_INFO, "Open replay did find object, continue as "
-                       "regular open\n");
-        } else if (rr->rr_namelen == 0 && !info->mti_cross_ref &&
-                   create_flags & MDS_OPEN_LOCK) {
-                result = mdt_open_anon_by_fid(info, ldlm_rep, lhc);
-                GOTO(out, result);
-        }
+       if (req_is_replay(req) ||
+           (req->rq_export->exp_libclient && create_flags & MDS_OPEN_HAS_EA)) {
+               /* This is a replay request or from liblustre with ea. */
+               result = mdt_open_by_fid(info, ldlm_rep);
+
+               if (result != -ENOENT) {
+                       if (req->rq_export->exp_libclient &&
+                           create_flags & MDS_OPEN_HAS_EA)
+                               GOTO(out, result = 0);
+                       GOTO(out, result);
+               }
+               /* We didn't find the correct object, so we need to re-create it
+                * via a regular replay. */
+               if (!(create_flags & MDS_OPEN_CREAT)) {
+                       DEBUG_REQ(D_ERROR, req,
+                                 "OPEN & CREAT not in open replay/by_fid.");
+                       GOTO(out, result = -EFAULT);
+               }
+               CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
+       } else if ((rr->rr_namelen == 0 && !info->mti_cross_ref &&
+                   create_flags & MDS_OPEN_LOCK) ||
+                  (create_flags & MDS_OPEN_BY_FID)) {
+               result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
+               if (result != -ENOENT && !(create_flags & MDS_OPEN_CREAT))
+                       GOTO(out, result);
+               if (unlikely(rr->rr_namelen == 0))
+                       GOTO(out, result = -EINVAL);
+               CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
+       }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
                 GOTO(out, result = err_serious(-ENOMEM));
@@ -1313,15 +1401,18 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
                          PFID(child_fid));
-        } else {
-                /*
-                 * Check for O_EXCL is moved to the mdt_finish_open(), we need to
-                 * return FID back in that case.
-                 */
-                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
-        }
-
-        child = mdt_object_find(info->mti_env, mdt, child_fid);
+               /* In the function below, .hs_keycmp resolves to
+                * lu_obj_hop_keycmp() */
+               /* coverity[overrun-buffer-val] */
+               child = mdt_object_new(info->mti_env, mdt, child_fid);
+       } else {
+               /*
+                * Check for O_EXCL is moved to the mdt_finish_open(), we need to
+                * return FID back in that case.
+                */
+               mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+               child = mdt_object_find(info->mti_env, mdt, child_fid);
+       }
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
@@ -1332,6 +1423,9 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
+                if (mdt_object_obf(parent))
+                        GOTO(out_child, result = -EPERM);
+
                 /* save versions in reply */
                 mdt_version_get_save(info, parent, 0);
                 mdt_version_get_save(info, child, 1);
@@ -1363,14 +1457,18 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         GOTO(out_child, result);
                 } else {
+
+                       /* XXX: we should call this once, see few lines below */
+                       if (result == 0)
+                               result = mdt_attr_get_complex(info, child, ma);
+
                         if (result != 0)
                                 GOTO(out_child, result);
                 }
                 created = 1;
         } else {
                 /* We have to get attr & lov ea for this object */
-                result = mo_attr_get(info->mti_env, mdt_object_child(child),
-                                     ma);
+               result = mdt_attr_get_complex(info, child, ma);
                 /*
                  * The object is on remote node, return its FID for remote open.
                  */
@@ -1444,15 +1542,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                              created, ldlm_rep);
         if (rc) {
                 result = rc;
-                if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                         /* openlock was acquired and mdt_finish_open failed -
                            drop the openlock */
                         mdt_object_unlock(info, child, lhc, 1);
+                       mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+               }
                 if (created) {
                         ma->ma_need = 0;
                         ma->ma_valid = 0;
                         ma->ma_cookie_size = 0;
-                        info->mti_no_need_trans = 1;
                         rc = mdo_unlink(info->mti_env,
                                         mdt_object_child(parent),
                                         mdt_object_child(child),
@@ -1460,6 +1559,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                         &info->mti_attr);
                         if (rc != 0)
                                 CERROR("Error in cleanup of open\n");
+                       mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                 }
         }
         EXIT;
@@ -1517,7 +1617,7 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
         ma->ma_valid &= ~MA_INODE;
 
         if (!MFD_CLOSED(mode))
-                rc = mo_close(info->mti_env, next, ma);
+                rc = mo_close(info->mti_env, next, ma, mode);
 
         if (ret == MDT_IOEPOCH_GETATTR || ret == MDT_IOEPOCH_OPENED) {
                 struct mdt_export_data *med;
@@ -1530,10 +1630,10 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
                 LASSERT(mdt_info_req(info));
                 med = &mdt_info_req(info)->rq_export->exp_mdt_data;
-                cfs_spin_lock(&med->med_open_lock);
-                cfs_list_add(&mfd->mfd_list, &med->med_open_head);
-                class_handle_hash_back(&mfd->mfd_handle);
-                cfs_spin_unlock(&med->med_open_lock);
+               spin_lock(&med->med_open_lock);
+               cfs_list_add(&mfd->mfd_list, &med->med_open_head);
+               class_handle_hash_back(&mfd->mfd_handle);
+               spin_unlock(&med->med_open_lock);
 
                 if (ret == MDT_IOEPOCH_OPENED) {
                         ret = 0;
@@ -1561,7 +1661,7 @@ int mdt_close(struct mdt_thread_info *info)
         int rc, ret = 0;
         ENTRY;
 
-        mdt_counter_incr(req->rq_export, LPROC_MDT_CLOSE);
+       mdt_counter_incr(req, LPROC_MDT_CLOSE);
         /* Close may come with the Size-on-MDS update. Unpack it. */
         rc = mdt_close_unpack(info);
         if (rc)
@@ -1575,8 +1675,10 @@ int mdt_close(struct mdt_thread_info *info)
                              info->mti_mdt->mdt_max_cookiesize);
         rc = req_capsule_server_pack(info->mti_pill);
         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
+                mdt_client_compatibility(info);
                 if (rc == 0)
-                        mdt_shrink_reply(info);
+                        mdt_fix_reply(info);
+               mdt_exit_ucred(info);
                 RETURN(lustre_msg_get_status(req->rq_repmsg));
         }
 
@@ -1602,19 +1704,19 @@ int mdt_close(struct mdt_thread_info *info)
         }
 
         med = &req->rq_export->exp_mdt_data;
-        cfs_spin_lock(&med->med_open_lock);
-        mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
-        if (mdt_mfd_closed(mfd)) {
-                cfs_spin_unlock(&med->med_open_lock);
-                CDEBUG(D_INODE, "no handle for file close: fid = "DFID
-                       ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
-                       info->mti_ioepoch->handle.cookie);
-                /** not serious error since bug 3633 */
-                rc = -ESTALE;
-        } else {
-                class_handle_unhash(&mfd->mfd_handle);
-                cfs_list_del_init(&mfd->mfd_list);
-                cfs_spin_unlock(&med->med_open_lock);
+       spin_lock(&med->med_open_lock);
+       mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+       if (mdt_mfd_closed(mfd)) {
+               spin_unlock(&med->med_open_lock);
+               CDEBUG(D_INODE, "no handle for file close: fid = "DFID
+                      ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
+                      info->mti_ioepoch->handle.cookie);
+               /** not serious error since bug 3633 */
+               rc = -ESTALE;
+       } else {
+               class_handle_unhash(&mfd->mfd_handle);
+               cfs_list_del_init(&mfd->mfd_list);
+               spin_unlock(&med->med_open_lock);
 
                 /* Do not lose object before last unlink. */
                 o = mfd->mfd_object;
@@ -1622,12 +1724,15 @@ int mdt_close(struct mdt_thread_info *info)
                 ret = mdt_mfd_close(info, mfd);
                 if (repbody != NULL)
                         rc = mdt_handle_last_unlink(info, o, ma);
-                mdt_empty_transno(info);
+                mdt_empty_transno(info, rc);
                 mdt_object_put(info->mti_env, o);
         }
-        if (repbody != NULL)
-                mdt_shrink_reply(info);
+        if (repbody != NULL) {
+                mdt_client_compatibility(info);
+                rc = mdt_fix_reply(info);
+        }
 
+       mdt_exit_ucred(info);
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
                 RETURN(err_serious(-ENOMEM));
 
@@ -1668,14 +1773,16 @@ int mdt_done_writing(struct mdt_thread_info *info)
         if (rc)
                 RETURN(err_serious(rc));
 
-        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL))
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+       if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
+               mdt_exit_ucred(info);
+               RETURN(lustre_msg_get_status(req->rq_repmsg));
+       }
 
         med = &info->mti_exp->exp_mdt_data;
-        cfs_spin_lock(&med->med_open_lock);
-        mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
-        if (mfd == NULL) {
-                cfs_spin_unlock(&med->med_open_lock);
+       spin_lock(&med->med_open_lock);
+       mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
+       if (mfd == NULL) {
+               spin_unlock(&med->med_open_lock);
                 CDEBUG(D_INODE, "no handle for done write: fid = "DFID
                        ": cookie = "LPX64" ioepoch = "LPU64"\n",
                        PFID(info->mti_rr.rr_fid1),
@@ -1683,18 +1790,19 @@ int mdt_done_writing(struct mdt_thread_info *info)
                        info->mti_ioepoch->ioepoch);
                 /* If this is a replay, reconstruct the transno. */
                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                        mdt_empty_transno(info);
-                        RETURN(info->mti_ioepoch->flags & MF_SOM_AU ?
-                               -EAGAIN : 0);
-                }
-                RETURN(-ESTALE);
+                        rc = info->mti_ioepoch->flags & MF_SOM_AU ?
+                             -EAGAIN : 0;
+                        mdt_empty_transno(info, rc);
+               } else
+                       rc = -ESTALE;
+               GOTO(error_ucred, rc);
         }
 
         LASSERT(mfd->mfd_mode == MDS_FMODE_EPOCH ||
                 mfd->mfd_mode == MDS_FMODE_TRUNC);
         class_handle_unhash(&mfd->mfd_handle);
         cfs_list_del_init(&mfd->mfd_list);
-        cfs_spin_unlock(&med->med_open_lock);
+       spin_unlock(&med->med_open_lock);
 
         /* Set EPOCH CLOSE flag if not set by client. */
         info->mti_ioepoch->flags |= MF_EPOCH_CLOSE;
@@ -1702,12 +1810,14 @@ int mdt_done_writing(struct mdt_thread_info *info)
 
         info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
         OBD_ALLOC_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
-        if (info->mti_attr.ma_lmm == NULL)
-                RETURN(-ENOMEM);
+       if (info->mti_attr.ma_lmm == NULL)
+               GOTO(error_ucred, rc = -ENOMEM);
 
         rc = mdt_mfd_close(info, mfd);
 
         OBD_FREE_LARGE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
-        mdt_empty_transno(info);
-        RETURN(rc);
+        mdt_empty_transno(info, rc);
+error_ucred:
+       mdt_exit_ucred(info);
+       RETURN(rc);
 }