Whamcloud - gitweb
LU-3544 fid: do open-by-fid by default
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 633edaa..d35526e 100644 (file)
@@ -123,8 +123,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
        ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
        if (!(o->mot_flags & MOF_LOV_CREATED)) {
-               if (p != NULL && (fid_is_obf(mdt_object_fid(p)) ||
-                                 fid_is_dot_lustre(mdt_object_fid(p))))
+               if (p != NULL && !fid_is_md_operative(mdt_object_fid(p)))
                        GOTO(unlock, rc = -EPERM);
 
                rc = mdo_create_data(info->mti_env,
@@ -419,7 +418,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info,
         if (ret == MDT_IOEPOCH_GETATTR && recovery) {
                 struct mdt_body *rep;
                 rep = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-                rep->valid |= OBD_MD_FLGETATTRLOCK;
+               rep->mbo_valid |= OBD_MD_FLGETATTRLOCK;
         }
 
         RETURN(rc ? : ret);
@@ -663,8 +662,36 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
        mfd->mfd_mode = mode;
 }
 
+/**
+ * prep ma_lmm/ma_lmv for md_attr from reply
+ */
+void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
+                             struct mdt_object *obj,
+                             struct md_attr *ma)
+{
+       LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+       if (S_ISDIR(obj->mot_header.loh_attr)) {
+               ma->ma_lmv = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmv_size > 0)
+                       ma->ma_need |= MA_LMV;
+       } else {
+               ma->ma_lmm = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmm_size > 0)
+                       ma->ma_need |= MA_LOV;
+       }
+}
+
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
-                        struct mdt_object *o, __u64 flags, int created)
+                       struct mdt_object *o, __u64 flags, int created,
+                       struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
@@ -692,6 +719,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 rc = mdt_create_data(info, p, o);
                 if (rc)
                         RETURN(rc);
+
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
         }
 
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
@@ -699,27 +729,28 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
 
         if (ma->ma_valid & MA_LOV) {
                 LASSERT(ma->ma_lmm_size != 0);
-                repbody->eadatasize = ma->ma_lmm_size;
-                if (isdir)
-                        repbody->valid |= OBD_MD_FLDIREA;
-                else
-                        repbody->valid |= OBD_MD_FLEASIZE;
-        }
+               repbody->mbo_eadatasize = ma->ma_lmm_size;
+               if (isdir)
+                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+               else
+                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
+       }
+
+       if (ma->ma_valid & MA_LMV) {
+               LASSERT(ma->ma_lmv_size != 0);
+               repbody->mbo_eadatasize = ma->ma_lmv_size;
+               LASSERT(isdir);
+               repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
+       }
 
-        if (flags & FMODE_WRITE) {
-                rc = mdt_write_get(o);
-                if (rc == 0) {
-                        mdt_ioepoch_open(info, o, created);
-                        repbody->ioepoch = o->mot_ioepoch;
+       if (flags & FMODE_WRITE) {
+               rc = mdt_write_get(o);
+               if (rc == 0) {
+                       mdt_ioepoch_open(info, o, created);
+                       repbody->mbo_ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-               /* if file is released, we can't deny write because we must
-                * restore (write) it to access it.*/
-               if ((ma->ma_valid & MA_HSM) &&
-                   (ma->ma_hsm.mh_flags & HS_RELEASED))
-                       rc = 0;
-               else
-                       rc = mdt_write_deny(o);
+               rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -794,7 +825,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                mfd->mfd_old_handle.cookie = info->mti_rr.rr_handle->cookie;
        }
 
-       repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+       repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
 
        if (req->rq_export->exp_disconnected) {
                spin_lock(&med->med_open_lock);
@@ -813,13 +844,14 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
         RETURN(rc);
 
 err_out:
-        if (flags & FMODE_WRITE)
-                        /* XXX We also need to close io epoch here.
-                         * See LU-1220 - green */
-                mdt_write_put(o);
-        else if (flags & FMODE_EXEC)
-                mdt_write_allow(o);
-        return rc;
+       if (flags & FMODE_WRITE)
+               /* XXX We also need to close io epoch here.
+                * See LU-1220 - green */
+               mdt_write_put(o);
+       else if (flags & MDS_FMODE_EXEC)
+               mdt_write_allow(o);
+
+       return rc;
 }
 
 int mdt_finish_open(struct mdt_thread_info *info,
@@ -867,11 +899,11 @@ int mdt_finish_open(struct mdt_thread_info *info,
 
                 rc = mdt_pack_remote_perm(info, o, buf);
                 if (rc) {
-                        repbody->valid &= ~OBD_MD_FLRMTPERM;
-                        repbody->aclsize = 0;
-                } else {
-                        repbody->valid |= OBD_MD_FLRMTPERM;
-                        repbody->aclsize = sizeof(struct mdt_remote_perm);
+                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
+                       repbody->mbo_aclsize = 0;
+               } else {
+                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
+                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
                 }
         }
 #ifdef CONFIG_FS_POSIX_ACL
@@ -888,8 +920,8 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                           XATTR_NAME_ACL_ACCESS);
                         if (rc < 0) {
                                 if (rc == -ENODATA) {
-                                        repbody->aclsize = 0;
-                                        repbody->valid |= OBD_MD_FLACL;
+                                       repbody->mbo_aclsize = 0;
+                                       repbody->mbo_valid |= OBD_MD_FLACL;
                                         rc = 0;
                                 } else if (rc == -EOPNOTSUPP) {
                                         rc = 0;
@@ -897,15 +929,15 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                         CERROR("got acl size: %d\n", rc);
                                 }
                         } else {
-                                repbody->aclsize = rc;
-                                repbody->valid |= OBD_MD_FLACL;
+                               repbody->mbo_aclsize = rc;
+                               repbody->mbo_valid |= OBD_MD_FLACL;
                                 rc = 0;
                         }
                 }
         }
 #endif
 
-       if (info->mti_mdt->mdt_opts.mo_mds_capa &&
+       if (info->mti_mdt->mdt_lut.lut_mds_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
@@ -915,10 +947,9 @@ int mdt_finish_open(struct mdt_thread_info *info,
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
-                repbody->valid |= OBD_MD_FLMDSCAPA;
+               repbody->mbo_valid |= OBD_MD_FLMDSCAPA;
         }
-
-       if (info->mti_mdt->mdt_opts.mo_oss_capa &&
+       if (info->mti_mdt->mdt_lut.lut_oss_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA &&
            S_ISREG(lu_object_attr(&o->mot_obj))) {
                 struct lustre_capa *capa;
@@ -929,7 +960,7 @@ int mdt_finish_open(struct mdt_thread_info *info,
                 rc = mo_capa_get(info->mti_env, mdt_object_child(o), capa, 0);
                 if (rc)
                         RETURN(rc);
-                repbody->valid |= OBD_MD_FLOSSCAPA;
+               repbody->mbo_valid |= OBD_MD_FLOSSCAPA;
         }
 
         /*
@@ -975,26 +1006,26 @@ int mdt_finish_open(struct mdt_thread_info *info,
                spin_unlock(&med->med_open_lock);
 
                 if (mfd != NULL) {
-                        repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-                        /*set repbody->ea_size for resent case*/
-                        if (ma->ma_valid & MA_LOV) {
-                                LASSERT(ma->ma_lmm_size != 0);
-                                repbody->eadatasize = ma->ma_lmm_size;
-                                if (isdir)
-                                        repbody->valid |= OBD_MD_FLDIREA;
-                                else
-                                        repbody->valid |= OBD_MD_FLEASIZE;
+                       repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie;
+                       /* set repbody->ea_size for resent case */
+                       if (ma->ma_valid & MA_LOV) {
+                               LASSERT(ma->ma_lmm_size != 0);
+                               repbody->mbo_eadatasize = ma->ma_lmm_size;
+                               if (isdir)
+                                       repbody->mbo_valid |= OBD_MD_FLDIREA;
+                               else
+                                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
                         }
                        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
-                        RETURN(0);
-                }
-        }
+                       RETURN(0);
+               }
+       }
 
-        rc = mdt_mfd_open(info, p, o, flags, created);
+       rc = mdt_mfd_open(info, p, o, flags, created, rep);
        if (!rc)
                mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
@@ -1023,13 +1054,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
        ma->ma_need = MA_INODE | MA_HSM;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
         ma->ma_valid = 0;
 
         mdt_req_from_lcd(req, lcd);
@@ -1060,18 +1085,19 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                         mdt_export_evict(exp);
                         RETURN_EXIT;
                 }
-                child = mdt_object_find(env, mdt, rr->rr_fid2);
-                if (IS_ERR(child)) {
-                        rc = PTR_ERR(child);
-                        LCONSOLE_WARN("Child "DFID" lookup error %d."
-                                      " Evicting client %s with export %s.\n",
-                                      PFID(mdt_object_fid(child)), rc,
-                                      obd_uuid2str(&exp->exp_client_uuid),
-                                      obd_export_nid2str(exp));
-                        mdt_object_put(env, parent);
-                        mdt_export_evict(exp);
-                        RETURN_EXIT;
-                }
+
+               child = mdt_object_find(env, mdt, rr->rr_fid2);
+               if (IS_ERR(child)) {
+                       rc = PTR_ERR(child);
+                       LCONSOLE_WARN("cannot lookup child "DFID": rc = %d; "
+                                     "evicting client %s with export %s\n",
+                                     PFID(rr->rr_fid2), rc,
+                                     obd_uuid2str(&exp->exp_client_uuid),
+                                     obd_export_nid2str(exp));
+                       mdt_object_put(env, parent);
+                       mdt_export_evict(exp);
+                       RETURN_EXIT;
+               }
 
                if (unlikely(mdt_object_remote(child))) {
                        /* the child object was created on remote server */
@@ -1081,13 +1107,14 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                                mdt_object_put(env, child);
                                GOTO(out, rc = -EIO);
                        }
-                       repbody->fid1 = *rr->rr_fid2;
-                       repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                       repbody->mbo_fid1 = *rr->rr_fid2;
+                       repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                        rc = 0;
                } else {
                        if (mdt_object_exists(child)) {
                                mdt_set_capainfo(info, 1, rr->rr_fid2,
                                                 BYPASS_CAPA);
+                               mdt_prep_ma_buf_from_rep(info, child, ma);
                                rc = mdt_attr_get_complex(info, child, ma);
                                if (rc == 0)
                                        rc = mdt_finish_open(info, parent,
@@ -1138,15 +1165,15 @@ int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
                                                DISP_LOOKUP_EXECD |
                                                DISP_LOOKUP_POS));
                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-                repbody->fid1 = *rr->rr_fid2;
-                repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+               repbody->mbo_fid1 = *rr->rr_fid2;
+               repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                 rc = 0;
        } else {
                if (mdt_object_exists(o)) {
                        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                                        DISP_LOOKUP_EXECD |
                                                        DISP_LOOKUP_POS));
-
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
                        rc = mdt_attr_get_complex(info, o, ma);
                        if (rc == 0)
                                rc = mdt_finish_open(info, NULL, o, flags, 0,
@@ -1219,34 +1246,29 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                /* normal open holds read mode of open sem */
                down_read(&obj->mot_open_sem);
 
-               if (open_flags & MDS_OPEN_LOCK) {
-                       if (open_flags & FMODE_WRITE)
-                               lm = LCK_CW;
-                       /* if file is released, we can't deny write because we must
-                        * restore (write) it to access it. */
-                       else if ((open_flags & MDS_FMODE_EXEC) &&
-                                !((ma->ma_valid & MA_HSM) &&
-                                  (ma->ma_hsm.mh_flags & HS_RELEASED)))
-                               lm = LCK_PR;
-                       else
-                               lm = LCK_CR;
+               if (open_flags & FMODE_WRITE)
+                       lm = LCK_CW;
+               else if (open_flags & MDS_FMODE_EXEC)
+                       lm = LCK_PR;
+               else
+                       lm = LCK_CR;
 
+               if (open_flags & MDS_OPEN_LOCK) {
                        *ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN;
-               } else if (atomic_read(&obj->mot_lease_count) > 0) {
-                       if (open_flags & FMODE_WRITE)
-                               lm = LCK_CW;
-                       else
-                               lm = LCK_CR;
-
-                       /* revoke lease */
-                       *ibits = MDS_INODELOCK_OPEN;
+               } else if (open_flags & (FMODE_WRITE | MDS_FMODE_EXEC) ||
+                          atomic_read(&obj->mot_lease_count) > 0) {
+                       /* We need to flush conflicting locks or revoke a lease.
+                        * In either case there is no need to acquire a layout
+                        * lock since it won't be returned to the client. */
                        try_layout = false;
-
+                       *ibits = MDS_INODELOCK_OPEN;
                        lhc = &info->mti_lh[MDT_LH_LOCAL];
                }
-               CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
-                       PFID(mdt_object_fid(obj)),
-                       atomic_read(&obj->mot_open_count), lm);
+
+               CDEBUG(D_INODE, "normal open FID = "DFID", open_count = %d, "
+                      "lm = %d\n",
+                      PFID(mdt_object_fid(obj)),
+                      atomic_read(&obj->mot_open_count), lm);
        }
 
        mdt_lock_reg_init(lhc, lm);
@@ -1283,6 +1305,15 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        ", open_flags = "LPO64"\n",
                        PFID(mdt_object_fid(obj)), open_flags);
 
+               /* We cannot enqueue another lock for the same resource we
+                * already have a lock for, due to mechanics of waiting list
+                * iterating in ldlm, see LU-3601.
+                * As such we'll drop the open lock we just got above here,
+                * it's ok not to have this open lock as it's main purpose is to
+                * flush unused cached client open handles. */
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
+
                LASSERT(!try_layout);
                mdt_lock_handle_init(ll);
                mdt_lock_reg_init(ll, LCK_EX);
@@ -1372,12 +1403,13 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
                rc = 1;
        }
 
-       if (rc != 0) {
+       if (rc != 0 || !lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                struct ldlm_reply       *ldlm_rep;
 
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
                mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
-               mdt_object_unlock(info, obj, lhc, 1);
+               if (lustre_handle_is_used(&lhc->mlh_reg_lh))
+                       mdt_object_unlock(info, obj, lhc, 1);
        }
        RETURN_EXIT;
 }
@@ -1447,6 +1479,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
 
        mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       mdt_prep_ma_buf_from_rep(info, o, ma);
        if (flags & MDS_OPEN_RELEASE)
                ma->ma_need |= MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
@@ -1458,9 +1491,14 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
        if (flags & MDS_OPEN_RELEASE && !mdt_hsm_release_allow(ma))
                GOTO(out, rc = -EPERM);
 
-       rc = mdt_object_open_lock(info, o, lhc, &ibits);
-        if (rc)
-                GOTO(out_unlock, rc);
+       rc = mdt_check_resent_lock(info, o, lhc);
+       if (rc < 0) {
+               GOTO(out, rc);
+       } else if (rc > 0) {
+               rc = mdt_object_open_lock(info, o, lhc, &ibits);
+               if (rc)
+                       GOTO(out_unlock, rc);
+       }
 
         if (ma->ma_valid & MA_PFID) {
                 parent = mdt_object_find(env, mdt, &ma->ma_pfid);
@@ -1492,12 +1530,6 @@ out:
        return rc;
 }
 
-int mdt_pin(struct mdt_thread_info* info)
-{
-        ENTRY;
-        RETURN(err_serious(-EOPNOTSUPP));
-}
-
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
 static int mdt_cross_open(struct mdt_thread_info *info,
                          const struct lu_fid *parent_fid,
@@ -1530,6 +1562,7 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (rc)
                                goto out;
 
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
                        mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
                        rc = mdt_attr_get_complex(info, o, ma);
                        if (rc != 0)
@@ -1572,7 +1605,6 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         __u64                    create_flags = info->mti_spec.sp_cr_flags;
        __u64                    ibits = 0;
         struct mdt_reint_record *rr = &info->mti_rr;
-        struct lu_name          *lname;
         int                      result, rc;
         int                      created = 0;
         __u32                    msg_flags;
@@ -1584,13 +1616,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        mdt_counter_incr(req, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
         ma->ma_need = MA_INODE;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
         ma->ma_valid = 0;
 
         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
@@ -1606,11 +1632,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
             info->mti_spec.u.sp_ea.eadata == NULL)
                 GOTO(out, result = err_serious(-EINVAL));
 
-        CDEBUG(D_INODE, "I am going to open "DFID"/(%s->"DFID") "
-               "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
-               PFID(rr->rr_fid1), rr->rr_name,
-               PFID(rr->rr_fid2), create_flags,
-               ma->ma_attr.la_mode, msg_flags);
+       CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
+              "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
+              PFID(rr->rr_fid1), PNAME(&rr->rr_name),
+              PFID(rr->rr_fid2), create_flags,
+              ma->ma_attr.la_mode, msg_flags);
+
        if (info->mti_cross_ref) {
                /* This is cross-ref open */
                mdt_set_disposition(info, ldlm_rep,
@@ -1638,25 +1665,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        GOTO(out, result = -EFAULT);
                }
                CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
-       } else if ((rr->rr_namelen == 0 && create_flags & MDS_OPEN_LOCK) ||
-                  (create_flags & MDS_OPEN_BY_FID)) {
+       } else if (create_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) {
+               /*
+                * MDS_OPEN_LOCK is checked for backward compatibility with 2.1
+                * client.
+                */
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
-               /* If result is 0 then open by FID has found the file
-                * and there is nothing left for us to do here.  More
-                * generally if it is anything other than -ENOENT or
-                * -EREMOTE then we return that now.  If -ENOENT and
-                * MDS_OPEN_CREAT is set then we must create the file
-                * below.  If -EREMOTE then we need to return a LOOKUP
-                * lock to the client, which we do below.  Hence this
-                * odd looking condition.  See LU-2523. */
-               if (!(result == -ENOENT && (create_flags & MDS_OPEN_CREAT)) &&
-                   result != -EREMOTE)
-                       GOTO(out, result);
-
-               if (unlikely(rr->rr_namelen == 0))
-                       GOTO(out, result = -EINVAL);
-
-               CDEBUG(D_INFO, "No object(2), continue as regular open.\n");
+               if (result < 0)
+                       CDEBUG(D_INFO, "no object for "DFID": %d\n",
+                              PFID(rr->rr_fid2), result);
+               GOTO(out, result);
        }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
@@ -1665,9 +1683,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_disposition(info, ldlm_rep,
                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       if (!lu_name_is_valid(&rr->rr_name))
+               GOTO(out, result = -EPROTO);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lh, (create_flags & MDS_OPEN_CREAT) ?
-                          LCK_PW : LCK_PR, rr->rr_name, rr->rr_namelen);
+       mdt_lock_pdo_init(lh,
+                         (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
+                         &rr->rr_name);
 
         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
@@ -1681,12 +1703,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
         fid_zero(child_fid);
 
-        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                            lname, child_fid, &info->mti_spec);
-        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
-                 "looking for "DFID"/%s, result fid="DFID"\n",
-                 PFID(mdt_object_fid(parent)), rr->rr_name, PFID(child_fid));
+       result = mdo_lookup(info->mti_env, mdt_object_child(parent),
+                           &rr->rr_name, child_fid, &info->mti_spec);
+
+       LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
+                "looking for "DFID"/"DNAME", found FID = "DFID"\n",
+                PFID(mdt_object_fid(parent)), PNAME(&rr->rr_name),
+                PFID(child_fid));
 
         if (result != 0 && result != -ENOENT && result != -ESTALE)
                 GOTO(out_parent, result);
@@ -1731,15 +1754,15 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
                /* Create under OBF and .lustre is not permitted */
-               if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+               if (!fid_is_md_operative(rr->rr_fid1))
                        GOTO(out_child, result = -EPERM);
 
-                /* save versions in reply */
-                mdt_version_get_save(info, parent, 0);
-                mdt_version_get_save(info, child, 1);
+               /* save versions in reply */
+               mdt_version_get_save(info, parent, 0);
+               mdt_version_get_save(info, child, 1);
 
-                /* version of child will be changed */
-                info->mti_mos = child;
+               /* version of child will be changed */
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
@@ -1755,17 +1778,17 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 info->mti_spec.sp_cr_lookup = 0;
                 info->mti_spec.sp_feat = &dt_directory_features;
 
-                result = mdo_create(info->mti_env,
-                                    mdt_object_child(parent),
-                                    lname,
-                                    mdt_object_child(child),
-                                    &info->mti_spec,
-                                    &info->mti_attr);
+               result = mdo_create(info->mti_env,
+                                   mdt_object_child(parent),
+                                   &rr->rr_name,
+                                   mdt_object_child(child),
+                                   &info->mti_spec,
+                                   &info->mti_attr);
                 if (result == -ERESTART) {
                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         GOTO(out_child, result);
                 } else {
-
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
                        /* XXX: we should call this once, see few lines below */
                        if (result == 0)
                                result = mdt_attr_get_complex(info, child, ma);
@@ -1786,22 +1809,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                          */
                         LASSERT(lhc != NULL);
 
-                        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                                struct ldlm_lock *lock;
-
-                                LASSERT(msg_flags & MSG_RESENT);
-
-                                lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
-                                if (!lock) {
-                                        CERROR("Invalid lock handle "LPX64"\n",
-                                               lhc->mlh_reg_lh.cookie);
-                                        LBUG();
-                                }
-                                LASSERT(fid_res_name_eq(mdt_object_fid(child),
-                                                        &lock->l_resource->lr_name));
-                                LDLM_LOCK_PUT(lock);
-                                rc = 0;
-                        } else {
+                       rc = mdt_check_resent_lock(info, child, lhc);
+                       if (rc < 0) {
+                               GOTO(out_child, result = rc);
+                       } else if (rc > 0) {
                                 mdt_lock_handle_init(lhc);
                                 mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -1809,37 +1820,48 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                                      MDS_INODELOCK_LOOKUP,
                                                      MDT_CROSS_LOCK);
                         }
-                        repbody->fid1 = *mdt_object_fid(child);
-                        repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                       repbody->mbo_fid1 = *mdt_object_fid(child);
+                       repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                         if (rc != 0)
                                 result = rc;
                        else
                                result = -EREMOTE;
                         GOTO(out_child, result);
+               } else if (mdt_object_exists(child)) {
+                       /* We have to get attr & LOV EA & HSM for this
+                        * object. */
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
+                       ma->ma_need |= MA_HSM;
+                       result = mdt_attr_get_complex(info, child, ma);
                } else {
-                       if (mdt_object_exists(child)) {
-                               /* We have to get attr & LOV EA & HSM for this
-                                * object */
-                               ma->ma_need |= MA_HSM;
-                               result = mdt_attr_get_complex(info, child, ma);
-                       } else {
-                               /*object non-exist!!!*/
-                               LBUG();
-                       }
+                       /* Object does not exist. Likely FS corruption. */
+                       CERROR("%s: name '"DNAME"' present, but FID "
+                              DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
+                              PNAME(&rr->rr_name), PFID(child_fid));
+                       GOTO(out_child, result = -EIO);
                }
         }
 
-        LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
-
-       /* get openlock if this is not replay and if a client requested it */
-       if (!req_is_replay(req)) {
-               rc = mdt_object_open_lock(info, child, lhc, &ibits);
-               if (rc != 0)
-                       GOTO(out_child_unlock, result = rc);
-               else if (create_flags & MDS_OPEN_LOCK)
+       rc = mdt_check_resent_lock(info, child, lhc);
+       if (rc < 0) {
+               GOTO(out_child, result = rc);
+       } else if (rc == 0) {
+               /* the open lock might already be gotten in
+                * ldlm_handle_enqueue() */
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+       } else {
+               /* get openlock if this isn't replay and client requested it */
+               if (!req_is_replay(req)) {
+                       rc = mdt_object_open_lock(info, child, lhc, &ibits);
+                       if (rc != 0)
+                               GOTO(out_child_unlock, result = rc);
+                       else if (create_flags & MDS_OPEN_LOCK)
+                               mdt_set_disposition(info, ldlm_rep,
+                                                   DISP_OPEN_LOCK);
+               }
        }
-
        /* Try to open it now. */
        rc = mdt_finish_open(info, parent, child, create_flags,
                             created, ldlm_rep);
@@ -1859,11 +1881,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                if (created) {
                        ma->ma_need = 0;
                        ma->ma_valid = 0;
-                       ma->ma_cookie_size = 0;
                        rc = mdo_unlink(info->mti_env,
                                        mdt_object_child(parent),
                                        mdt_object_child(child),
-                                       lname,
+                                       &rr->rr_name,
                                        &info->mti_attr, 0);
                        if (rc != 0)
                                CERROR("%s: "DFID" cleanup of open: rc = %d\n",
@@ -1898,8 +1919,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
        struct lu_fid *local_root_fid = &info->mti_tmp_fid1;
        struct mdt_object *obj = NULL;
        struct mdt_object *local_root;
-       static const char name[] = "i_am_nobody";
-       struct lu_name *lname;
+       static const struct lu_name lname = {
+               .ln_name = "i_am_nobody",
+               .ln_namelen = sizeof("i_am_nobody") - 1,
+       };
        struct lu_ucred *uc;
        cfs_cap_t uc_cap_save;
        int rc;
@@ -1929,12 +1952,10 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
        }
 
-       lname = mdt_name(env, (char *)name, sizeof(name) - 1);
-
        uc = lu_ucred(env);
        uc_cap_save = uc->uc_cap;
-       uc->uc_cap |= 1 << CAP_DAC_OVERRIDE;
-       rc = mdo_create(env, mdt_object_child(local_root), lname,
+       uc->uc_cap |= 1 << CFS_CAP_DAC_OVERRIDE;
+       rc = mdo_create(env, mdt_object_child(local_root), &lname,
                        mdt_object_child(obj), spec, attr);
        uc->uc_cap = uc_cap_save;
        if (rc < 0) {
@@ -1975,6 +1996,9 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        int                     rc2;
        ENTRY;
 
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
        data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
        if (data == NULL)
                RETURN(-EPROTO);
@@ -2128,7 +2152,7 @@ out_unlock:
                struct mdt_body *repbody;
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
-               repbody->valid |= OBD_MD_FLRELEASED;
+               repbody->mbo_valid |= OBD_MD_FLRELEASED;
        }
 
 out_reprocess:
@@ -2240,36 +2264,39 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        RETURN(rc ? rc : ret);
 }
 
-int mdt_close(struct mdt_thread_info *info)
+int mdt_close(struct tgt_session_info *tsi)
 {
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
         struct mdt_object      *o;
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
-        struct ptlrpc_request  *req = mdt_info_req(info);
         int rc, ret = 0;
         ENTRY;
 
        mdt_counter_incr(req, LPROC_MDT_CLOSE);
-        /* Close may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Close may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
         LASSERT(info->mti_ioepoch);
 
+       /* These fields are no longer used and are left for compatibility.
+        * size is always zero */
         req_capsule_set_size(info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
-                             info->mti_mdt->mdt_max_mdsize);
+                            0);
         req_capsule_set_size(info->mti_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                             info->mti_mdt->mdt_max_cookiesize);
+                            0);
         rc = req_capsule_server_pack(info->mti_pill);
         if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
                 mdt_client_compatibility(info);
                 if (rc == 0)
                         mdt_fix_reply(info);
                mdt_exit_ucred(info);
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
         }
 
         /* Continue to close handle even if we can not pack reply */
@@ -2281,14 +2308,9 @@ int mdt_close(struct mdt_thread_info *info)
                 ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
                                                        &RMF_MDT_MD,
                                                        RCL_SERVER);
-                ma->ma_cookie = req_capsule_server_get(info->mti_pill,
-                                                       &RMF_LOGCOOKIES);
-                ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
-                                                          &RMF_LOGCOOKIES,
-                                                          RCL_SERVER);
                 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
-                repbody->eadatasize = 0;
-                repbody->aclsize = 0;
+               repbody->mbo_eadatasize = 0;
+               repbody->mbo_aclsize = 0;
         } else {
                 rc = err_serious(rc);
         }
@@ -2324,13 +2346,15 @@ int mdt_close(struct mdt_thread_info *info)
         }
 
        mdt_exit_ucred(info);
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
-                RETURN(err_serious(-ENOMEM));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
+               GOTO(out, rc = err_serious(-ENOMEM));
 
-        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
-                                 OBD_FAIL_MDS_CLOSE_NET_REP))
-                info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
-        RETURN(rc ? rc : ret);
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
+                                OBD_FAIL_MDS_CLOSE_NET_REP))
+               tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
+out:
+       mdt_thread_info_fini(info);
+       RETURN(rc ? rc : ret);
 }
 
 /**
@@ -2341,35 +2365,35 @@ int mdt_close(struct mdt_thread_info *info)
  * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
  * skip attributes and reconstruct the reply here.
  */
-int mdt_done_writing(struct mdt_thread_info *info)
+int mdt_done_writing(struct tgt_session_info *tsi)
 {
-        struct ptlrpc_request   *req = mdt_info_req(info);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
         int rc;
         ENTRY;
 
-        rc = req_capsule_server_pack(info->mti_pill);
-        if (rc)
-                RETURN(err_serious(rc));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
-        repbody = req_capsule_server_get(info->mti_pill,
-                                         &RMF_MDT_BODY);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       repbody->mbo_eadatasize = 0;
+       repbody->mbo_aclsize = 0;
 
-        /* Done Writing may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Done Writing may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
                mdt_exit_ucred(info);
-               RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
        }
 
-        med = &info->mti_exp->exp_mdt_data;
+       med = &info->mti_exp->exp_mdt_data;
        spin_lock(&med->med_open_lock);
        mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
                             req_is_replay(req));
@@ -2411,5 +2435,7 @@ int mdt_done_writing(struct mdt_thread_info *info)
         mdt_empty_transno(info, rc);
 error_ucred:
        mdt_exit_ucred(info);
+out:
+       mdt_thread_info_fini(info);
        RETURN(rc);
 }