Whamcloud - gitweb
LU-4179 mdt: skip open lock enqueue during resent
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 76ed060..bc3f42d 100644 (file)
@@ -664,7 +664,8 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
-                        struct mdt_object *o, __u64 flags, int created)
+                       struct mdt_object *o, __u64 flags, int created,
+                       struct ldlm_reply *rep)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
@@ -692,6 +693,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 rc = mdt_create_data(info, p, o);
                 if (rc)
                         RETURN(rc);
+
+               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_DISP_STRIPE)
+                       mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
         }
 
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
@@ -713,13 +717,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                         repbody->ioepoch = o->mot_ioepoch;
                 }
         } else if (flags & MDS_FMODE_EXEC) {
-               /* if file is released, we can't deny write because we must
-                * restore (write) it to access it.*/
-               if ((ma->ma_valid & MA_HSM) &&
-                   (ma->ma_hsm.mh_flags & HS_RELEASED))
-                       rc = 0;
-               else
-                       rc = mdt_write_deny(o);
+               rc = mdt_write_deny(o);
         }
         if (rc)
                 RETURN(rc);
@@ -905,7 +903,7 @@ int mdt_finish_open(struct mdt_thread_info *info,
         }
 #endif
 
-       if (info->mti_mdt->mdt_opts.mo_mds_capa &&
+       if (info->mti_mdt->mdt_lut.lut_mds_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
@@ -917,8 +915,7 @@ int mdt_finish_open(struct mdt_thread_info *info,
                         RETURN(rc);
                 repbody->valid |= OBD_MD_FLMDSCAPA;
         }
-
-       if (info->mti_mdt->mdt_opts.mo_oss_capa &&
+       if (info->mti_mdt->mdt_lut.lut_oss_capa &&
            exp_connect_flags(exp) & OBD_CONNECT_OSS_CAPA &&
            S_ISREG(lu_object_attr(&o->mot_obj))) {
                 struct lustre_capa *capa;
@@ -986,15 +983,15 @@ int mdt_finish_open(struct mdt_thread_info *info,
                                         repbody->valid |= OBD_MD_FLEASIZE;
                         }
                        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
-                        RETURN(0);
-                }
-        }
+                       RETURN(0);
+               }
+       }
 
-        rc = mdt_mfd_open(info, p, o, flags, created);
+       rc = mdt_mfd_open(info, p, o, flags, created, rep);
        if (!rc)
                mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 extern void mdt_req_from_lcd(struct ptlrpc_request *req,
@@ -1222,11 +1219,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                if (open_flags & MDS_OPEN_LOCK) {
                        if (open_flags & FMODE_WRITE)
                                lm = LCK_CW;
-                       /* if file is released, we can't deny write because we must
-                        * restore (write) it to access it. */
-                       else if ((open_flags & MDS_FMODE_EXEC) &&
-                                !((ma->ma_valid & MA_HSM) &&
-                                  (ma->ma_hsm.mh_flags & HS_RELEASED)))
+                       else if (open_flags & MDS_FMODE_EXEC)
                                lm = LCK_PR;
                        else
                                lm = LCK_CR;
@@ -1492,12 +1485,6 @@ out:
        return rc;
 }
 
-int mdt_pin(struct mdt_thread_info* info)
-{
-        ENTRY;
-        RETURN(err_serious(-EOPNOTSUPP));
-}
-
 /* Cross-ref request. Currently it can only be a pure open (w/o create) */
 static int mdt_cross_open(struct mdt_thread_info *info,
                          const struct lu_fid *parent_fid,
@@ -1823,23 +1810,32 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                ma->ma_need |= MA_HSM;
                                result = mdt_attr_get_complex(info, child, ma);
                        } else {
-                               /*object non-exist!!!*/
-                               LBUG();
+                               /*object non-exist!!! Likely an fs corruption*/
+                               CERROR("%s: name %s present, but fid " DFID
+                                      " invalid\n",mdt_obd_name(info->mti_mdt),
+                                      rr->rr_name, PFID(child_fid));
+                               GOTO(out_child, result = -EIO);
                        }
                }
         }
 
-        LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
-
-       /* get openlock if this is not replay and if a client requested it */
-       if (!req_is_replay(req)) {
-               rc = mdt_object_open_lock(info, child, lhc, &ibits);
-               if (rc != 0)
-                       GOTO(out_child_unlock, result = rc);
-               else if (create_flags & MDS_OPEN_LOCK)
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               /* the open lock might already be gotten in
+                * mdt_intent_fixup_resent */
+               LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+               if (create_flags & MDS_OPEN_LOCK)
                        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_LOCK);
+       } else {
+               /* get openlock if this isn't replay and client requested it */
+               if (!req_is_replay(req)) {
+                       rc = mdt_object_open_lock(info, child, lhc, &ibits);
+                       if (rc != 0)
+                               GOTO(out_child_unlock, result = rc);
+                       else if (create_flags & MDS_OPEN_LOCK)
+                               mdt_set_disposition(info, ldlm_rep,
+                                                   DISP_OPEN_LOCK);
+               }
        }
-
        /* Try to open it now. */
        rc = mdt_finish_open(info, parent, child, create_flags,
                             created, ldlm_rep);
@@ -2243,22 +2239,23 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        RETURN(rc ? rc : ret);
 }
 
-int mdt_close(struct mdt_thread_info *info)
+int mdt_close(struct tgt_session_info *tsi)
 {
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
         struct mdt_object      *o;
         struct md_attr         *ma = &info->mti_attr;
         struct mdt_body        *repbody = NULL;
-        struct ptlrpc_request  *req = mdt_info_req(info);
         int rc, ret = 0;
         ENTRY;
 
        mdt_counter_incr(req, LPROC_MDT_CLOSE);
-        /* Close may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Close may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
         LASSERT(info->mti_ioepoch);
 
@@ -2272,7 +2269,7 @@ int mdt_close(struct mdt_thread_info *info)
                 if (rc == 0)
                         mdt_fix_reply(info);
                mdt_exit_ucred(info);
-                RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
         }
 
         /* Continue to close handle even if we can not pack reply */
@@ -2327,13 +2324,15 @@ int mdt_close(struct mdt_thread_info *info)
         }
 
        mdt_exit_ucred(info);
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
-                RETURN(err_serious(-ENOMEM));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK))
+               GOTO(out, rc = err_serious(-ENOMEM));
 
-        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
-                                 OBD_FAIL_MDS_CLOSE_NET_REP))
-                info->mti_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
-        RETURN(rc ? rc : ret);
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_CLOSE_NET_REP,
+                                OBD_FAIL_MDS_CLOSE_NET_REP))
+               tsi->tsi_reply_fail_id = OBD_FAIL_MDS_CLOSE_NET_REP;
+out:
+       mdt_thread_info_fini(info);
+       RETURN(rc ? rc : ret);
 }
 
 /**
@@ -2344,35 +2343,35 @@ int mdt_close(struct mdt_thread_info *info)
  * and got a trasid. Waiting for such DONE_WRITING is not reliable, so just
  * skip attributes and reconstruct the reply here.
  */
-int mdt_done_writing(struct mdt_thread_info *info)
+int mdt_done_writing(struct tgt_session_info *tsi)
 {
-        struct ptlrpc_request   *req = mdt_info_req(info);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mdt_thread_info  *info = tsi2mdt_info(tsi);
         struct mdt_body         *repbody = NULL;
         struct mdt_export_data  *med;
         struct mdt_file_data    *mfd;
         int rc;
         ENTRY;
 
-        rc = req_capsule_server_pack(info->mti_pill);
-        if (rc)
-                RETURN(err_serious(rc));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
-        repbody = req_capsule_server_get(info->mti_pill,
-                                         &RMF_MDT_BODY);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       repbody->eadatasize = 0;
+       repbody->aclsize = 0;
 
-        /* Done Writing may come with the Size-on-MDS update. Unpack it. */
-        rc = mdt_close_unpack(info);
-        if (rc)
-                RETURN(err_serious(rc));
+       /* Done Writing may come with the Size-on-MDS update. Unpack it. */
+       rc = mdt_close_unpack(info);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
 
        if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) {
                mdt_exit_ucred(info);
-               RETURN(lustre_msg_get_status(req->rq_repmsg));
+               GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
        }
 
-        med = &info->mti_exp->exp_mdt_data;
+       med = &info->mti_exp->exp_mdt_data;
        spin_lock(&med->med_open_lock);
        mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle,
                             req_is_replay(req));
@@ -2414,5 +2413,7 @@ int mdt_done_writing(struct mdt_thread_info *info)
         mdt_empty_transno(info, rc);
 error_ucred:
        mdt_exit_ucred(info);
+out:
+       mdt_thread_info_fini(info);
        RETURN(rc);
 }