Whamcloud - gitweb
LU-11952 mdt: fix reconstruct open
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index d59e92d..cc1afe1 100644 (file)
@@ -45,6 +45,9 @@
 
 static const char mfd_open_handle_owner[] = "mdt";
 
+static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep,
+                          struct mdt_lock_handle *lhc);
+
 /* Create a new mdt_file_data struct, initialize it,
  * and insert it to global hash table */
 struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
@@ -509,7 +512,7 @@ err_out:
 
 static int mdt_finish_open(struct mdt_thread_info *info,
                           struct mdt_object *p, struct mdt_object *o,
-                          u64 open_flags, int created,
+                          u64 open_flags,
                           struct ldlm_reply *rep)
 {
        struct ptlrpc_request   *req = mdt_info_req(info);
@@ -519,12 +522,14 @@ static int mdt_finish_open(struct mdt_thread_info *info,
        struct lu_attr          *la  = &ma->ma_attr;
        struct mdt_file_data    *mfd;
        struct mdt_body         *repbody;
+       int                      created;
        int                      rc = 0;
        int                      isreg, isdir, islnk;
        struct list_head        *t;
        ENTRY;
 
         LASSERT(ma->ma_valid & MA_INODE);
+       created = mdt_get_disposition(rep, DISP_OPEN_CREATE);
 
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
@@ -613,19 +618,20 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-EAGAIN);
 
        mfd = NULL;
-       if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+       if (info->mti_rr.rr_flags & MRF_OPEN_RESEND) {
                spin_lock(&med->med_open_lock);
                list_for_each(t, &med->med_open_head) {
                        mfd = list_entry(t, struct mdt_file_data, mfd_list);
-                       if (mfd->mfd_xid == req->rq_xid)
+                       if (mfd->mfd_xid == req->rq_xid) {
+                               repbody->mbo_open_handle.cookie =
+                                               mfd->mfd_open_handle.h_cookie;
                                break;
+                       }
                        mfd = NULL;
                }
                spin_unlock(&med->med_open_lock);
 
                if (mfd != NULL) {
-                       repbody->mbo_open_handle.cookie =
-                               mfd->mfd_open_handle.h_cookie;
                        /* set repbody->ea_size for resent case */
                        if (ma->ma_valid & MA_LOV) {
                                LASSERT(ma->ma_lmm_size != 0);
@@ -638,6 +644,10 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                        mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
                        RETURN(0);
                }
+               /* if we have a real resend (not a resend afrer failover), it
+                * means close is already happend, so lets return error
+                */
+               RETURN(-ESTALE);
        }
 
        rc = mdt_mfd_open(info, p, o, open_flags, created, rep);
@@ -650,100 +660,43 @@ static int mdt_finish_open(struct mdt_thread_info *info,
 void mdt_reconstruct_open(struct mdt_thread_info *info,
                          struct mdt_lock_handle *lhc)
 {
-       const struct lu_env *env = info->mti_env;
-       struct mdt_device *mdt = info->mti_mdt;
        struct req_capsule *pill = info->mti_pill;
        struct ptlrpc_request *req = mdt_info_req(info);
-       struct md_attr *ma = &info->mti_attr;
        struct mdt_reint_record *rr = &info->mti_rr;
-       u64 open_flags = info->mti_spec.sp_cr_flags;
+       struct md_attr          *ma   = &info->mti_attr;
        struct ldlm_reply *ldlm_rep;
-       struct mdt_object *parent;
-       struct mdt_object *child;
-       struct mdt_body *repbody;
        u64 opdata;
        int rc;
        ENTRY;
 
        LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
        ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
-       repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
        ma->ma_need = MA_INODE | MA_HSM;
        ma->ma_valid = 0;
-
        opdata = mdt_req_from_lrd(req, info->mti_reply_data);
        mdt_set_disposition(info, ldlm_rep, opdata);
 
        CDEBUG(D_INODE, "This is reconstruct open: disp=%#llx, result=%d\n",
                ldlm_rep->lock_policy_res1, req->rq_status);
-
-        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
-            req->rq_status != 0)
+       if (req->rq_status)
                 /* We did not create successfully, return error to client. */
                 GOTO(out, rc = req->rq_status);
 
-        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE)) {
-                struct obd_export *exp = req->rq_export;
-                /*
-                 * We failed after creation, but we do not know in which step
-                 * we failed. So try to check the child object.
-                 */
-                parent = mdt_object_find(env, mdt, rr->rr_fid1);
-                if (IS_ERR(parent)) {
-                        rc = PTR_ERR(parent);
-                        LCONSOLE_WARN("Parent "DFID" lookup error %d."
-                                      " Evicting client %s with export %s.\n",
-                                      PFID(rr->rr_fid1), rc,
-                                      obd_uuid2str(&exp->exp_client_uuid),
-                                      obd_export_nid2str(exp));
-                        mdt_export_evict(exp);
-                        RETURN_EXIT;
-                }
-
-               child = mdt_object_find(env, mdt, rr->rr_fid2);
-               if (IS_ERR(child)) {
-                       rc = PTR_ERR(child);
-                       LCONSOLE_WARN("cannot lookup child "DFID": rc = %d; "
-                                     "evicting client %s with export %s\n",
-                                     PFID(rr->rr_fid2), rc,
-                                     obd_uuid2str(&exp->exp_client_uuid),
-                                     obd_export_nid2str(exp));
-                       mdt_object_put(env, parent);
-                       mdt_export_evict(exp);
-                       RETURN_EXIT;
-               }
-
-               if (unlikely(mdt_object_remote(child))) {
-                       mdt_object_put(env, parent);
-                       mdt_object_put(env, child);
-                       /* the child object was created on remote server */
-                       if (!mdt_is_dne_client(exp))
-                               /* Return -EIO for old client */
-                               GOTO(out, rc = -EIO);
-                       repbody->mbo_fid1 = *rr->rr_fid2;
-                       repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
-                       GOTO(out, rc = 0);
-               }
-               if (mdt_object_exists(child)) {
-                       mdt_prep_ma_buf_from_rep(info, child, ma);
-                       rc = mdt_attr_get_complex(info, child, ma);
-                       if (!rc)
-                               rc = mdt_finish_open(info, parent, child,
-                                                    open_flags, 1, ldlm_rep);
-                       mdt_object_put(env, parent);
-                       mdt_object_put(env, child);
-                       if (!rc)
-                               mdt_pack_size2body(info, rr->rr_fid2,
-                                                  &lhc->mlh_reg_lh);
-                       GOTO(out, rc);
-               }
-               /* the child does not exist, we should do regular open */
-               mdt_object_put(env, parent);
-               mdt_object_put(env, child);
+       /* tg_reply_data is just memory only  structure, so any non zero fid
+        * means a real resend not a resend after recovery which need to be
+        * handled as regular open
+        */
+       if (likely(!fid_is_zero(&info->mti_reply_data->trd_object))) {
+               rr->rr_fid2 = &info->mti_reply_data->trd_object;
+               rr->rr_flags |= MRF_OPEN_RESEND;
+               rc = mdt_open_by_fid(info, ldlm_rep, lhc);
+               if (rc)
+                       lustre_msg_set_transno(req->rq_repmsg, 0);
+       } else {
+               /* We did not try to create, so we are a pure open */
+               rc = mdt_reint_open(info, lhc);
        }
-       /* We did not try to create, so we are a pure open */
-       rc = mdt_reint_open(info, lhc);
        EXIT;
 out:
        req->rq_status = rc;
@@ -751,7 +704,8 @@ out:
        LASSERT(ergo(rc < 0, lustre_msg_get_transno(req->rq_repmsg) == 0));
 }
 
-static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
+static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep,
+                          struct mdt_lock_handle *lhc)
 {
        u64 open_flags = info->mti_spec.sp_cr_flags;
        struct mdt_reint_record *rr = &info->mti_rr;
@@ -777,6 +731,7 @@ static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
                rc = 0;
        } else {
                if (mdt_object_exists(o)) {
+                       tgt_open_obj_set(info->mti_env, mdt_obj2dt(o));
                        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                                        DISP_LOOKUP_EXECD |
                                                        DISP_LOOKUP_POS));
@@ -787,14 +742,18 @@ static int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
 
                        mdt_prep_ma_buf_from_rep(info, o, ma);
                        rc = mdt_attr_get_complex(info, o, ma);
-                       if (rc == 0)
-                               rc = mdt_finish_open(info, NULL, o, open_flags,
-                                                    0, rep);
+                       if (rc)
+                               GOTO(out, rc);
+                       rc = mdt_finish_open(info, NULL, o, open_flags, rep);
+                       if (rc)
+                               GOTO(out, rc);
+                       mdt_pack_size2body(info, rr->rr_fid2, &lhc->mlh_reg_lh);
                } else {
                        rc = -ENOENT;
                }
        }
 
+out:
        mdt_object_put(info->mti_env, o);
        RETURN(rc);
 }
@@ -1159,7 +1118,8 @@ static int mdt_open_by_fid_lock(struct mdt_thread_info *info,
                }
        }
 
-       rc = mdt_finish_open(info, parent, o, open_flags, 0, rep);
+       tgt_open_obj_set(info->mti_env, mdt_obj2dt(o));
+       rc = mdt_finish_open(info, parent, o, open_flags, rep);
        if (!rc) {
                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
                if (open_flags & MDS_OPEN_LOCK)
@@ -1232,7 +1192,7 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (unlikely(rc))
                                GOTO(out, rc);
 
-                       rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep);
+                       rc = mdt_finish_open(info, NULL, o, open_flags, rep);
                } else {
                        /*
                         * Something is wrong here. lookup was positive but
@@ -1364,7 +1324,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                                        ldlm_rep, open_flags);
                GOTO(out, result);
        } else if (req_is_replay(req)) {
-               result = mdt_open_by_fid(info, ldlm_rep);
+               result = mdt_open_by_fid(info, ldlm_rep, lhc);
 
                if (result != -ENOENT)
                        GOTO(out, result);
@@ -1480,6 +1440,8 @@ again_pw:
        if (rc)
                GOTO(out_child, result = rc);
 
+       tgt_open_obj_set(info->mti_env, mdt_obj2dt(child));
+
        if (result == -ENOENT) {
                /* Create under OBF and .lustre is not permitted */
                if (!fid_is_md_operative(rr->rr_fid1) &&
@@ -1605,8 +1567,7 @@ again_pw:
                }
        }
        /* Try to open it now. */
-       rc = mdt_finish_open(info, parent, child, open_flags,
-                            created, ldlm_rep);
+       rc = mdt_finish_open(info, parent, child, open_flags, ldlm_rep);
        if (rc) {
                result = rc;
                /* openlock will be released if mdt_finish_open() failed */